File size: 11,236 Bytes
9c6594c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

/// Logic for automatically determining the structure of multi-file
/// dataset with possible partitioning according to available
/// partitioning

// This API is EXPERIMENTAL.

#pragma once

#include <memory>
#include <string>
#include <variant>
#include <vector>

#include "arrow/dataset/partition.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/filesystem/type_fwd.h"
#include "arrow/result.h"
#include "arrow/util/macros.h"

namespace arrow {
namespace dataset {

/// \defgroup dataset-discovery Discovery API
///
/// @{

struct InspectOptions {
  /// See `fragments` property.
  static constexpr int kInspectAllFragments = -1;

  /// Indicate how many fragments should be inspected to infer the unified dataset
  /// schema. Limiting the number of fragments accessed improves the latency of
  /// the discovery process when dealing with a high number of fragments and/or
  /// high latency file systems.
  ///
  /// The default value of `1` inspects the schema of the first (in no particular
  /// order) fragment only. If the dataset has a uniform schema for all fragments,
  /// this default is the optimal value. In order to inspect all fragments and
  /// robustly unify their potentially varying schemas, set this option to
  /// `kInspectAllFragments`. A value of `0` disables inspection of fragments
  /// altogether so only the partitioning schema will be inspected.
  int fragments = 1;

  /// Control how to unify types. By default, types are merged strictly (the
  /// type must match exactly, except nulls can be merged with other types).
  Field::MergeOptions field_merge_options = Field::MergeOptions::Defaults();
};

struct FinishOptions {
  /// Finalize the dataset with this given schema. If the schema is not
  /// provided, infer the schema via the Inspect, see the `inspect_options`
  /// property.
  std::shared_ptr<Schema> schema = NULLPTR;

  /// If the schema is not provided, it will be discovered by passing the
  /// following options to `DatasetDiscovery::Inspect`.
  InspectOptions inspect_options{};

  /// Indicate if the given Schema (when specified), should be validated against
  /// the fragments' schemas. `inspect_options` will control how many fragments
  /// are checked.
  bool validate_fragments = false;
};

/// \brief DatasetFactory provides a way to inspect/discover a Dataset's expected
/// schema before materializing said Dataset.
class ARROW_DS_EXPORT DatasetFactory {
 public:
  /// \brief Get the schemas of the Fragments and Partitioning.
  virtual Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
      InspectOptions options) = 0;

  /// \brief Get unified schema for the resulting Dataset.
  Result<std::shared_ptr<Schema>> Inspect(InspectOptions options = {});

  /// \brief Create a Dataset
  Result<std::shared_ptr<Dataset>> Finish();
  /// \brief Create a Dataset with the given schema (see \a InspectOptions::schema)
  Result<std::shared_ptr<Dataset>> Finish(std::shared_ptr<Schema> schema);
  /// \brief Create a Dataset with the given options
  virtual Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) = 0;

  /// \brief Optional root partition for the resulting Dataset.
  const compute::Expression& root_partition() const { return root_partition_; }
  /// \brief Set the root partition for the resulting Dataset.
  Status SetRootPartition(compute::Expression partition) {
    root_partition_ = std::move(partition);
    return Status::OK();
  }

  virtual ~DatasetFactory() = default;

 protected:
  DatasetFactory();

  compute::Expression root_partition_;
};

/// @}

/// \brief DatasetFactory provides a way to inspect/discover a Dataset's
/// expected schema before materialization.
/// \ingroup dataset-implementations
class ARROW_DS_EXPORT UnionDatasetFactory : public DatasetFactory {
 public:
  static Result<std::shared_ptr<DatasetFactory>> Make(
      std::vector<std::shared_ptr<DatasetFactory>> factories);

  /// \brief Return the list of child DatasetFactory
  const std::vector<std::shared_ptr<DatasetFactory>>& factories() const {
    return factories_;
  }

  /// \brief Get the schemas of the Datasets.
  ///
  /// Instead of applying options globally, it applies at each child factory.
  /// This will not respect `options.fragments` exactly, but will respect the
  /// spirit of peeking the first fragments or all of them.
  Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
      InspectOptions options) override;

  /// \brief Create a Dataset.
  Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override;

 protected:
  explicit UnionDatasetFactory(std::vector<std::shared_ptr<DatasetFactory>> factories);

  std::vector<std::shared_ptr<DatasetFactory>> factories_;
};

/// \ingroup dataset-filesystem
struct FileSystemFactoryOptions {
  /// Either an explicit Partitioning or a PartitioningFactory to discover one.
  ///
  /// If a factory is provided, it will be used to infer a schema for partition fields
  /// based on file and directory paths then construct a Partitioning. The default
  /// is a Partitioning which will yield no partition information.
  ///
  /// The (explicit or discovered) partitioning will be applied to discovered files
  /// and the resulting partition information embedded in the Dataset.
  PartitioningOrFactory partitioning{Partitioning::Default()};

  /// For the purposes of applying the partitioning, paths will be stripped
  /// of the partition_base_dir. Files not matching the partition_base_dir
  /// prefix will be skipped for partition discovery. The ignored files will still
  /// be part of the Dataset, but will not have partition information.
  ///
  /// Example:
  /// partition_base_dir = "/dataset";
  ///
  /// - "/dataset/US/sales.csv" -> "US/sales.csv" will be given to the partitioning
  ///
  /// - "/home/john/late_sales.csv" -> Will be ignored for partition discovery.
  ///
  /// This is useful for partitioning which parses directory when ordering
  /// is important, e.g. DirectoryPartitioning.
  std::string partition_base_dir;

  /// Invalid files (via selector or explicitly) will be excluded by checking
  /// with the FileFormat::IsSupported method.  This will incur IO for each files
  /// in a serial and single threaded fashion. Disabling this feature will skip the
  /// IO, but unsupported files may be present in the Dataset
  /// (resulting in an error at scan time).
  bool exclude_invalid_files = false;

  /// When discovering from a Selector (and not from an explicit file list), ignore
  /// files and directories matching any of these prefixes.
  ///
  /// Example (with selector = "/dataset/**"):
  /// selector_ignore_prefixes = {"_", ".DS_STORE" };
  ///
  /// - "/dataset/data.csv" -> not ignored
  /// - "/dataset/_metadata" -> ignored
  /// - "/dataset/.DS_STORE" -> ignored
  /// - "/dataset/_hidden/dat" -> ignored
  /// - "/dataset/nested/.DS_STORE" -> ignored
  std::vector<std::string> selector_ignore_prefixes = {
      ".",
      "_",
  };
};

/// \brief FileSystemDatasetFactory creates a Dataset from a vector of
/// fs::FileInfo or a fs::FileSelector.
/// \ingroup dataset-filesystem
class ARROW_DS_EXPORT FileSystemDatasetFactory : public DatasetFactory {
 public:
  /// \brief Build a FileSystemDatasetFactory from an explicit list of
  /// paths.
  ///
  /// \param[in] filesystem passed to FileSystemDataset
  /// \param[in] paths passed to FileSystemDataset
  /// \param[in] format passed to FileSystemDataset
  /// \param[in] options see FileSystemFactoryOptions for more information.
  static Result<std::shared_ptr<DatasetFactory>> Make(
      std::shared_ptr<fs::FileSystem> filesystem, const std::vector<std::string>& paths,
      std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options);

  /// \brief Build a FileSystemDatasetFactory from a fs::FileSelector.
  ///
  /// The selector will expand to a vector of FileInfo. The expansion/crawling
  /// is performed in this function call. Thus, the finalized Dataset is
  /// working with a snapshot of the filesystem.
  //
  /// If options.partition_base_dir is not provided, it will be overwritten
  /// with selector.base_dir.
  ///
  /// \param[in] filesystem passed to FileSystemDataset
  /// \param[in] selector used to crawl and search files
  /// \param[in] format passed to FileSystemDataset
  /// \param[in] options see FileSystemFactoryOptions for more information.
  static Result<std::shared_ptr<DatasetFactory>> Make(
      std::shared_ptr<fs::FileSystem> filesystem, fs::FileSelector selector,
      std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options);

  /// \brief Build a FileSystemDatasetFactory from an uri including filesystem
  /// information.
  ///
  /// \param[in] uri passed to FileSystemDataset
  /// \param[in] format passed to FileSystemDataset
  /// \param[in] options see FileSystemFactoryOptions for more information.
  static Result<std::shared_ptr<DatasetFactory>> Make(std::string uri,
                                                      std::shared_ptr<FileFormat> format,
                                                      FileSystemFactoryOptions options);

  /// \brief Build a FileSystemDatasetFactory from an explicit list of
  /// file information.
  ///
  /// \param[in] filesystem passed to FileSystemDataset
  /// \param[in] files passed to FileSystemDataset
  /// \param[in] format passed to FileSystemDataset
  /// \param[in] options see FileSystemFactoryOptions for more information.
  static Result<std::shared_ptr<DatasetFactory>> Make(
      std::shared_ptr<fs::FileSystem> filesystem, const std::vector<fs::FileInfo>& files,
      std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options);

  Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
      InspectOptions options) override;

  Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override;

 protected:
  FileSystemDatasetFactory(std::vector<fs::FileInfo> files,
                           std::shared_ptr<fs::FileSystem> filesystem,
                           std::shared_ptr<FileFormat> format,
                           FileSystemFactoryOptions options);

  Result<std::shared_ptr<Schema>> PartitionSchema();

  std::vector<fs::FileInfo> files_;
  std::shared_ptr<fs::FileSystem> fs_;
  std::shared_ptr<FileFormat> format_;
  FileSystemFactoryOptions options_;
};

}  // namespace dataset
}  // namespace arrow