File size: 11,236 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 |
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
/// Logic for automatically determining the structure of multi-file
/// dataset with possible partitioning according to available
/// partitioning
// This API is EXPERIMENTAL.
#pragma once
#include <memory>
#include <string>
#include <variant>
#include <vector>
#include "arrow/dataset/partition.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/filesystem/type_fwd.h"
#include "arrow/result.h"
#include "arrow/util/macros.h"
namespace arrow {
namespace dataset {
/// \defgroup dataset-discovery Discovery API
///
/// @{
struct InspectOptions {
/// See `fragments` property.
static constexpr int kInspectAllFragments = -1;
/// Indicate how many fragments should be inspected to infer the unified dataset
/// schema. Limiting the number of fragments accessed improves the latency of
/// the discovery process when dealing with a high number of fragments and/or
/// high latency file systems.
///
/// The default value of `1` inspects the schema of the first (in no particular
/// order) fragment only. If the dataset has a uniform schema for all fragments,
/// this default is the optimal value. In order to inspect all fragments and
/// robustly unify their potentially varying schemas, set this option to
/// `kInspectAllFragments`. A value of `0` disables inspection of fragments
/// altogether so only the partitioning schema will be inspected.
int fragments = 1;
/// Control how to unify types. By default, types are merged strictly (the
/// type must match exactly, except nulls can be merged with other types).
Field::MergeOptions field_merge_options = Field::MergeOptions::Defaults();
};
struct FinishOptions {
/// Finalize the dataset with this given schema. If the schema is not
/// provided, infer the schema via the Inspect, see the `inspect_options`
/// property.
std::shared_ptr<Schema> schema = NULLPTR;
/// If the schema is not provided, it will be discovered by passing the
/// following options to `DatasetDiscovery::Inspect`.
InspectOptions inspect_options{};
/// Indicate if the given Schema (when specified), should be validated against
/// the fragments' schemas. `inspect_options` will control how many fragments
/// are checked.
bool validate_fragments = false;
};
/// \brief DatasetFactory provides a way to inspect/discover a Dataset's expected
/// schema before materializing said Dataset.
class ARROW_DS_EXPORT DatasetFactory {
public:
/// \brief Get the schemas of the Fragments and Partitioning.
virtual Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
InspectOptions options) = 0;
/// \brief Get unified schema for the resulting Dataset.
Result<std::shared_ptr<Schema>> Inspect(InspectOptions options = {});
/// \brief Create a Dataset
Result<std::shared_ptr<Dataset>> Finish();
/// \brief Create a Dataset with the given schema (see \a InspectOptions::schema)
Result<std::shared_ptr<Dataset>> Finish(std::shared_ptr<Schema> schema);
/// \brief Create a Dataset with the given options
virtual Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) = 0;
/// \brief Optional root partition for the resulting Dataset.
const compute::Expression& root_partition() const { return root_partition_; }
/// \brief Set the root partition for the resulting Dataset.
Status SetRootPartition(compute::Expression partition) {
root_partition_ = std::move(partition);
return Status::OK();
}
virtual ~DatasetFactory() = default;
protected:
DatasetFactory();
compute::Expression root_partition_;
};
/// @}
/// \brief DatasetFactory provides a way to inspect/discover a Dataset's
/// expected schema before materialization.
/// \ingroup dataset-implementations
class ARROW_DS_EXPORT UnionDatasetFactory : public DatasetFactory {
public:
static Result<std::shared_ptr<DatasetFactory>> Make(
std::vector<std::shared_ptr<DatasetFactory>> factories);
/// \brief Return the list of child DatasetFactory
const std::vector<std::shared_ptr<DatasetFactory>>& factories() const {
return factories_;
}
/// \brief Get the schemas of the Datasets.
///
/// Instead of applying options globally, it applies at each child factory.
/// This will not respect `options.fragments` exactly, but will respect the
/// spirit of peeking the first fragments or all of them.
Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
InspectOptions options) override;
/// \brief Create a Dataset.
Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override;
protected:
explicit UnionDatasetFactory(std::vector<std::shared_ptr<DatasetFactory>> factories);
std::vector<std::shared_ptr<DatasetFactory>> factories_;
};
/// \ingroup dataset-filesystem
struct FileSystemFactoryOptions {
/// Either an explicit Partitioning or a PartitioningFactory to discover one.
///
/// If a factory is provided, it will be used to infer a schema for partition fields
/// based on file and directory paths then construct a Partitioning. The default
/// is a Partitioning which will yield no partition information.
///
/// The (explicit or discovered) partitioning will be applied to discovered files
/// and the resulting partition information embedded in the Dataset.
PartitioningOrFactory partitioning{Partitioning::Default()};
/// For the purposes of applying the partitioning, paths will be stripped
/// of the partition_base_dir. Files not matching the partition_base_dir
/// prefix will be skipped for partition discovery. The ignored files will still
/// be part of the Dataset, but will not have partition information.
///
/// Example:
/// partition_base_dir = "/dataset";
///
/// - "/dataset/US/sales.csv" -> "US/sales.csv" will be given to the partitioning
///
/// - "/home/john/late_sales.csv" -> Will be ignored for partition discovery.
///
/// This is useful for partitioning which parses directory when ordering
/// is important, e.g. DirectoryPartitioning.
std::string partition_base_dir;
/// Invalid files (via selector or explicitly) will be excluded by checking
/// with the FileFormat::IsSupported method. This will incur IO for each files
/// in a serial and single threaded fashion. Disabling this feature will skip the
/// IO, but unsupported files may be present in the Dataset
/// (resulting in an error at scan time).
bool exclude_invalid_files = false;
/// When discovering from a Selector (and not from an explicit file list), ignore
/// files and directories matching any of these prefixes.
///
/// Example (with selector = "/dataset/**"):
/// selector_ignore_prefixes = {"_", ".DS_STORE" };
///
/// - "/dataset/data.csv" -> not ignored
/// - "/dataset/_metadata" -> ignored
/// - "/dataset/.DS_STORE" -> ignored
/// - "/dataset/_hidden/dat" -> ignored
/// - "/dataset/nested/.DS_STORE" -> ignored
std::vector<std::string> selector_ignore_prefixes = {
".",
"_",
};
};
/// \brief FileSystemDatasetFactory creates a Dataset from a vector of
/// fs::FileInfo or a fs::FileSelector.
/// \ingroup dataset-filesystem
class ARROW_DS_EXPORT FileSystemDatasetFactory : public DatasetFactory {
public:
/// \brief Build a FileSystemDatasetFactory from an explicit list of
/// paths.
///
/// \param[in] filesystem passed to FileSystemDataset
/// \param[in] paths passed to FileSystemDataset
/// \param[in] format passed to FileSystemDataset
/// \param[in] options see FileSystemFactoryOptions for more information.
static Result<std::shared_ptr<DatasetFactory>> Make(
std::shared_ptr<fs::FileSystem> filesystem, const std::vector<std::string>& paths,
std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options);
/// \brief Build a FileSystemDatasetFactory from a fs::FileSelector.
///
/// The selector will expand to a vector of FileInfo. The expansion/crawling
/// is performed in this function call. Thus, the finalized Dataset is
/// working with a snapshot of the filesystem.
//
/// If options.partition_base_dir is not provided, it will be overwritten
/// with selector.base_dir.
///
/// \param[in] filesystem passed to FileSystemDataset
/// \param[in] selector used to crawl and search files
/// \param[in] format passed to FileSystemDataset
/// \param[in] options see FileSystemFactoryOptions for more information.
static Result<std::shared_ptr<DatasetFactory>> Make(
std::shared_ptr<fs::FileSystem> filesystem, fs::FileSelector selector,
std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options);
/// \brief Build a FileSystemDatasetFactory from an uri including filesystem
/// information.
///
/// \param[in] uri passed to FileSystemDataset
/// \param[in] format passed to FileSystemDataset
/// \param[in] options see FileSystemFactoryOptions for more information.
static Result<std::shared_ptr<DatasetFactory>> Make(std::string uri,
std::shared_ptr<FileFormat> format,
FileSystemFactoryOptions options);
/// \brief Build a FileSystemDatasetFactory from an explicit list of
/// file information.
///
/// \param[in] filesystem passed to FileSystemDataset
/// \param[in] files passed to FileSystemDataset
/// \param[in] format passed to FileSystemDataset
/// \param[in] options see FileSystemFactoryOptions for more information.
static Result<std::shared_ptr<DatasetFactory>> Make(
std::shared_ptr<fs::FileSystem> filesystem, const std::vector<fs::FileInfo>& files,
std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options);
Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
InspectOptions options) override;
Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override;
protected:
FileSystemDatasetFactory(std::vector<fs::FileInfo> files,
std::shared_ptr<fs::FileSystem> filesystem,
std::shared_ptr<FileFormat> format,
FileSystemFactoryOptions options);
Result<std::shared_ptr<Schema>> PartitionSchema();
std::vector<fs::FileInfo> files_;
std::shared_ptr<fs::FileSystem> fs_;
std::shared_ptr<FileFormat> format_;
FileSystemFactoryOptions options_;
};
} // namespace dataset
} // namespace arrow
|