|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#pragma once |
|
|
|
#include <functional> |
|
#include <iosfwd> |
|
#include <memory> |
|
#include <optional> |
|
#include <string> |
|
#include <unordered_map> |
|
#include <utility> |
|
#include <vector> |
|
|
|
#include "arrow/compute/expression.h" |
|
#include "arrow/dataset/type_fwd.h" |
|
#include "arrow/dataset/visibility.h" |
|
#include "arrow/util/compare.h" |
|
|
|
namespace arrow { |
|
|
|
namespace dataset { |
|
|
|
constexpr char kFilenamePartitionSep = '_'; |
|
|
|
struct ARROW_DS_EXPORT PartitionPathFormat { |
|
std::string directory, filename; |
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ARROW_DS_EXPORT Partitioning : public util::EqualityComparable<Partitioning> { |
|
public: |
|
virtual ~Partitioning() = default; |
|
|
|
|
|
virtual std::string type_name() const = 0; |
|
|
|
|
|
virtual bool Equals(const Partitioning& other) const { |
|
return schema_->Equals(other.schema_, false); |
|
} |
|
|
|
|
|
|
|
struct PartitionedBatches { |
|
RecordBatchVector batches; |
|
std::vector<compute::Expression> expressions; |
|
}; |
|
virtual Result<PartitionedBatches> Partition( |
|
const std::shared_ptr<RecordBatch>& batch) const = 0; |
|
|
|
|
|
virtual Result<compute::Expression> Parse(const std::string& path) const = 0; |
|
|
|
virtual Result<PartitionPathFormat> Format(const compute::Expression& expr) const = 0; |
|
|
|
|
|
|
|
static std::shared_ptr<Partitioning> Default(); |
|
|
|
|
|
const std::shared_ptr<Schema>& schema() const { return schema_; } |
|
|
|
protected: |
|
explicit Partitioning(std::shared_ptr<Schema> schema) : schema_(std::move(schema)) {} |
|
|
|
std::shared_ptr<Schema> schema_; |
|
}; |
|
|
|
|
|
enum class SegmentEncoding : int8_t { |
|
|
|
None = 0, |
|
|
|
Uri = 1, |
|
}; |
|
|
|
ARROW_DS_EXPORT |
|
std::ostream& operator<<(std::ostream& os, SegmentEncoding segment_encoding); |
|
|
|
|
|
struct ARROW_DS_EXPORT KeyValuePartitioningOptions { |
|
|
|
|
|
SegmentEncoding segment_encoding = SegmentEncoding::Uri; |
|
}; |
|
|
|
|
|
struct ARROW_DS_EXPORT PartitioningFactoryOptions { |
|
|
|
|
|
|
|
|
|
bool infer_dictionary = false; |
|
|
|
|
|
|
|
std::shared_ptr<Schema> schema; |
|
|
|
|
|
SegmentEncoding segment_encoding = SegmentEncoding::Uri; |
|
|
|
KeyValuePartitioningOptions AsPartitioningOptions() const; |
|
}; |
|
|
|
|
|
struct ARROW_DS_EXPORT HivePartitioningFactoryOptions : PartitioningFactoryOptions { |
|
|
|
std::string null_fallback; |
|
|
|
HivePartitioningOptions AsHivePartitioningOptions() const; |
|
}; |
|
|
|
|
|
|
|
class ARROW_DS_EXPORT PartitioningFactory { |
|
public: |
|
virtual ~PartitioningFactory() = default; |
|
|
|
|
|
virtual std::string type_name() const = 0; |
|
|
|
|
|
|
|
virtual Result<std::shared_ptr<Schema>> Inspect( |
|
const std::vector<std::string>& paths) = 0; |
|
|
|
|
|
|
|
virtual Result<std::shared_ptr<Partitioning>> Finish( |
|
const std::shared_ptr<Schema>& schema) const = 0; |
|
}; |
|
|
|
|
|
|
|
class ARROW_DS_EXPORT KeyValuePartitioning : public Partitioning { |
|
public: |
|
|
|
|
|
struct Key { |
|
std::string name; |
|
std::optional<std::string> value; |
|
}; |
|
|
|
Result<PartitionedBatches> Partition( |
|
const std::shared_ptr<RecordBatch>& batch) const override; |
|
|
|
Result<compute::Expression> Parse(const std::string& path) const override; |
|
|
|
Result<PartitionPathFormat> Format(const compute::Expression& expr) const override; |
|
|
|
const ArrayVector& dictionaries() const { return dictionaries_; } |
|
|
|
SegmentEncoding segment_encoding() const { return options_.segment_encoding; } |
|
|
|
bool Equals(const Partitioning& other) const override; |
|
|
|
protected: |
|
KeyValuePartitioning(std::shared_ptr<Schema> schema, ArrayVector dictionaries, |
|
KeyValuePartitioningOptions options) |
|
: Partitioning(std::move(schema)), |
|
dictionaries_(std::move(dictionaries)), |
|
options_(options) { |
|
if (dictionaries_.empty()) { |
|
dictionaries_.resize(schema_->num_fields()); |
|
} |
|
} |
|
|
|
virtual Result<std::vector<Key>> ParseKeys(const std::string& path) const = 0; |
|
|
|
virtual Result<PartitionPathFormat> FormatValues(const ScalarVector& values) const = 0; |
|
|
|
|
|
Result<compute::Expression> ConvertKey(const Key& key) const; |
|
|
|
Result<std::vector<std::string>> FormatPartitionSegments( |
|
const ScalarVector& values) const; |
|
Result<std::vector<Key>> ParsePartitionSegments( |
|
const std::vector<std::string>& segments) const; |
|
|
|
ArrayVector dictionaries_; |
|
KeyValuePartitioningOptions options_; |
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ARROW_DS_EXPORT DirectoryPartitioning : public KeyValuePartitioning { |
|
public: |
|
|
|
|
|
explicit DirectoryPartitioning(std::shared_ptr<Schema> schema, |
|
ArrayVector dictionaries = {}, |
|
KeyValuePartitioningOptions options = {}); |
|
|
|
std::string type_name() const override { return "directory"; } |
|
|
|
bool Equals(const Partitioning& other) const override; |
|
|
|
|
|
|
|
|
|
|
|
static std::shared_ptr<PartitioningFactory> MakeFactory( |
|
std::vector<std::string> field_names, PartitioningFactoryOptions = {}); |
|
|
|
private: |
|
Result<std::vector<Key>> ParseKeys(const std::string& path) const override; |
|
|
|
Result<PartitionPathFormat> FormatValues(const ScalarVector& values) const override; |
|
}; |
|
|
|
|
|
static constexpr char kDefaultHiveNullFallback[] = "__HIVE_DEFAULT_PARTITION__"; |
|
|
|
struct ARROW_DS_EXPORT HivePartitioningOptions : public KeyValuePartitioningOptions { |
|
std::string null_fallback = kDefaultHiveNullFallback; |
|
|
|
static HivePartitioningOptions DefaultsWithNullFallback(std::string fallback) { |
|
HivePartitioningOptions options; |
|
options.null_fallback = std::move(fallback); |
|
return options; |
|
} |
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ARROW_DS_EXPORT HivePartitioning : public KeyValuePartitioning { |
|
public: |
|
|
|
|
|
explicit HivePartitioning(std::shared_ptr<Schema> schema, ArrayVector dictionaries = {}, |
|
std::string null_fallback = kDefaultHiveNullFallback) |
|
: KeyValuePartitioning(std::move(schema), std::move(dictionaries), |
|
KeyValuePartitioningOptions()), |
|
hive_options_( |
|
HivePartitioningOptions::DefaultsWithNullFallback(std::move(null_fallback))) { |
|
} |
|
|
|
explicit HivePartitioning(std::shared_ptr<Schema> schema, ArrayVector dictionaries, |
|
HivePartitioningOptions options) |
|
: KeyValuePartitioning(std::move(schema), std::move(dictionaries), options), |
|
hive_options_(options) {} |
|
|
|
std::string type_name() const override { return "hive"; } |
|
std::string null_fallback() const { return hive_options_.null_fallback; } |
|
const HivePartitioningOptions& options() const { return hive_options_; } |
|
|
|
static Result<std::optional<Key>> ParseKey(const std::string& segment, |
|
const HivePartitioningOptions& options); |
|
|
|
bool Equals(const Partitioning& other) const override; |
|
|
|
|
|
static std::shared_ptr<PartitioningFactory> MakeFactory( |
|
HivePartitioningFactoryOptions = {}); |
|
|
|
private: |
|
const HivePartitioningOptions hive_options_; |
|
Result<std::vector<Key>> ParseKeys(const std::string& path) const override; |
|
|
|
Result<PartitionPathFormat> FormatValues(const ScalarVector& values) const override; |
|
}; |
|
|
|
|
|
class ARROW_DS_EXPORT FunctionPartitioning : public Partitioning { |
|
public: |
|
using ParseImpl = std::function<Result<compute::Expression>(const std::string&)>; |
|
|
|
using FormatImpl = |
|
std::function<Result<PartitionPathFormat>(const compute::Expression&)>; |
|
|
|
FunctionPartitioning(std::shared_ptr<Schema> schema, ParseImpl parse_impl, |
|
FormatImpl format_impl = NULLPTR, std::string name = "function") |
|
: Partitioning(std::move(schema)), |
|
parse_impl_(std::move(parse_impl)), |
|
format_impl_(std::move(format_impl)), |
|
name_(std::move(name)) {} |
|
|
|
std::string type_name() const override { return name_; } |
|
|
|
bool Equals(const Partitioning& other) const override { return false; } |
|
|
|
Result<compute::Expression> Parse(const std::string& path) const override { |
|
return parse_impl_(path); |
|
} |
|
|
|
Result<PartitionPathFormat> Format(const compute::Expression& expr) const override { |
|
if (format_impl_) { |
|
return format_impl_(expr); |
|
} |
|
return Status::NotImplemented("formatting paths from ", type_name(), " Partitioning"); |
|
} |
|
|
|
Result<PartitionedBatches> Partition( |
|
const std::shared_ptr<RecordBatch>& batch) const override { |
|
return Status::NotImplemented("partitioning batches from ", type_name(), |
|
" Partitioning"); |
|
} |
|
|
|
private: |
|
ParseImpl parse_impl_; |
|
FormatImpl format_impl_; |
|
std::string name_; |
|
}; |
|
|
|
class ARROW_DS_EXPORT FilenamePartitioning : public KeyValuePartitioning { |
|
public: |
|
|
|
|
|
|
|
|
|
explicit FilenamePartitioning(std::shared_ptr<Schema> schema, |
|
ArrayVector dictionaries = {}, |
|
KeyValuePartitioningOptions options = {}); |
|
|
|
std::string type_name() const override { return "filename"; } |
|
|
|
|
|
|
|
|
|
|
|
static std::shared_ptr<PartitioningFactory> MakeFactory( |
|
std::vector<std::string> field_names, PartitioningFactoryOptions = {}); |
|
|
|
bool Equals(const Partitioning& other) const override; |
|
|
|
private: |
|
Result<std::vector<Key>> ParseKeys(const std::string& path) const override; |
|
|
|
Result<PartitionPathFormat> FormatValues(const ScalarVector& values) const override; |
|
}; |
|
|
|
ARROW_DS_EXPORT std::string StripPrefix(const std::string& path, |
|
const std::string& prefix); |
|
|
|
|
|
|
|
|
|
|
|
ARROW_DS_EXPORT std::string StripPrefixAndFilename(const std::string& path, |
|
const std::string& prefix); |
|
|
|
|
|
ARROW_DS_EXPORT std::vector<std::string> StripPrefixAndFilename( |
|
const std::vector<std::string>& paths, const std::string& prefix); |
|
|
|
|
|
ARROW_DS_EXPORT std::vector<std::string> StripPrefixAndFilename( |
|
const std::vector<fs::FileInfo>& files, const std::string& prefix); |
|
|
|
|
|
class ARROW_DS_EXPORT PartitioningOrFactory { |
|
public: |
|
explicit PartitioningOrFactory(std::shared_ptr<Partitioning> partitioning) |
|
: partitioning_(std::move(partitioning)) {} |
|
|
|
explicit PartitioningOrFactory(std::shared_ptr<PartitioningFactory> factory) |
|
: factory_(std::move(factory)) {} |
|
|
|
PartitioningOrFactory& operator=(std::shared_ptr<Partitioning> partitioning) { |
|
return *this = PartitioningOrFactory(std::move(partitioning)); |
|
} |
|
|
|
PartitioningOrFactory& operator=(std::shared_ptr<PartitioningFactory> factory) { |
|
return *this = PartitioningOrFactory(std::move(factory)); |
|
} |
|
|
|
|
|
const std::shared_ptr<Partitioning>& partitioning() const { return partitioning_; } |
|
|
|
|
|
const std::shared_ptr<PartitioningFactory>& factory() const { return factory_; } |
|
|
|
|
|
Result<std::shared_ptr<Schema>> GetOrInferSchema(const std::vector<std::string>& paths); |
|
|
|
private: |
|
std::shared_ptr<PartitioningFactory> factory_; |
|
std::shared_ptr<Partitioning> partitioning_; |
|
}; |
|
|
|
|
|
|
|
} |
|
} |
|
|