File size: 25,917 Bytes
9c6594c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

// This API is EXPERIMENTAL.

#pragma once

#include <functional>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "arrow/acero/options.h"
#include "arrow/compute/expression.h"
#include "arrow/compute/type_fwd.h"
#include "arrow/dataset/dataset.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/io/interfaces.h"
#include "arrow/type_fwd.h"
#include "arrow/util/async_generator_fwd.h"
#include "arrow/util/iterator.h"
#include "arrow/util/type_fwd.h"

namespace arrow {

using RecordBatchGenerator = std::function<Future<std::shared_ptr<RecordBatch>>()>;

namespace dataset {

/// \defgroup dataset-scanning Scanning API
///
/// @{

constexpr int64_t kDefaultBatchSize = 1 << 17;  // 128Ki rows
// This will yield 64 batches ~ 8Mi rows
constexpr int32_t kDefaultBatchReadahead = 16;
constexpr int32_t kDefaultFragmentReadahead = 4;
constexpr int32_t kDefaultBytesReadahead = 1 << 25;  // 32MiB

/// Scan-specific options, which can be changed between scans of the same dataset.
struct ARROW_DS_EXPORT ScanOptions {
  /// A row filter (which will be pushed down to partitioning/reading if supported).
  compute::Expression filter = compute::literal(true);
  /// A projection expression (which can add/remove/rename columns).
  compute::Expression projection;

  /// Schema with which batches will be read from fragments. This is also known as the
  /// "reader schema" it will be used (for example) in constructing CSV file readers to
  /// identify column types for parsing. Usually only a subset of its fields (see
  /// MaterializedFields) will be materialized during a scan.
  std::shared_ptr<Schema> dataset_schema;

  /// Schema of projected record batches. This is independent of dataset_schema as its
  /// fields are derived from the projection. For example, let
  ///
  ///   dataset_schema = {"a": int32, "b": int32, "id": utf8}
  ///   projection = project({equal(field_ref("a"), field_ref("b"))}, {"a_plus_b"})
  ///
  /// (no filter specified). In this case, the projected_schema would be
  ///
  ///   {"a_plus_b": int32}
  std::shared_ptr<Schema> projected_schema;

  /// Maximum row count for scanned batches.
  int64_t batch_size = kDefaultBatchSize;

  /// How many batches to read ahead within a fragment.
  ///
  /// Set to 0 to disable batch readahead
  ///
  /// Note: May not be supported by all formats
  /// Note: Will be ignored if use_threads is set to false
  int32_t batch_readahead = kDefaultBatchReadahead;

  /// How many files to read ahead
  ///
  /// Set to 0 to disable fragment readahead
  ///
  /// Note: May not be enforced by all scanners
  /// Note: Will be ignored if use_threads is set to false
  int32_t fragment_readahead = kDefaultFragmentReadahead;

  /// A pool from which materialized and scanned arrays will be allocated.
  MemoryPool* pool = arrow::default_memory_pool();

  /// IOContext for any IO tasks
  ///
  /// Note: The IOContext executor will be ignored if use_threads is set to false
  io::IOContext io_context;

  /// If true the scanner will scan in parallel
  ///
  /// Note: If true, this will use threads from both the cpu_executor and the
  /// io_context.executor
  /// Note: This  must be true in order for any readahead to happen
  bool use_threads = false;

  /// If true the scanner will add augmented fields to the output schema.
  bool add_augmented_fields = true;

  /// Whether to cache metadata when scanning.
  ///
  /// Fragments may typically cache metadata to speed up repeated accesses.
  /// However, in use cases where a single scan is done, or if memory use
  /// is more critical than CPU time, setting this option to false can
  /// lessen memory use.
  bool cache_metadata = true;

  /// Fragment-specific scan options.
  std::shared_ptr<FragmentScanOptions> fragment_scan_options;

  /// Return a vector of FieldRefs that require materialization.
  ///
  /// This is usually the union of the fields referenced in the projection and the
  /// filter expression. Examples:
  ///
  /// - `SELECT a, b WHERE a < 2 && c > 1` => ["a", "b", "a", "c"]
  /// - `SELECT a + b < 3 WHERE a > 1` => ["a", "b", "a"]
  ///
  /// This is needed for expression where a field may not be directly
  /// used in the final projection but is still required to evaluate the
  /// expression.
  ///
  /// This is used by Fragment implementations to apply the column
  /// sub-selection optimization.
  std::vector<FieldRef> MaterializedFields() const;

  /// Parameters which control when the plan should pause for a slow consumer
  acero::BackpressureOptions backpressure =
      acero::BackpressureOptions::DefaultBackpressure();
};

/// Scan-specific options, which can be changed between scans of the same dataset.
///
/// A dataset consists of one or more individual fragments.  A fragment is anything
/// that is independently scannable, often a file.
///
/// Batches from all fragments will be converted to a single schema. This unified
/// schema is referred to as the "dataset schema" and is the output schema for
/// this node.
///
/// Individual fragments may have schemas that are different from the dataset
/// schema.  This is sometimes referred to as the physical or fragment schema.
/// Conversion from the fragment schema to the dataset schema is a process
/// known as evolution.
struct ARROW_DS_EXPORT ScanV2Options : public acero::ExecNodeOptions {
  explicit ScanV2Options(std::shared_ptr<Dataset> dataset)
      : dataset(std::move(dataset)) {}

  /// \brief The dataset to scan
  std::shared_ptr<Dataset> dataset;
  /// \brief A row filter
  ///
  /// The filter expression should be written against the dataset schema.
  /// The filter must be unbound.
  ///
  /// This is an opportunistic pushdown filter.  Filtering capabilities will
  /// vary between formats.  If a format is not capable of applying the filter
  /// then it will ignore it.
  ///
  /// Each fragment will do its best to filter the data based on the information
  /// (partitioning guarantees, statistics) available to it.  If it is able to
  /// apply some filtering then it will indicate what filtering it was able to
  /// apply by attaching a guarantee to the batch.
  ///
  /// For example, if a filter is x < 50 && y > 40 then a batch may be able to
  /// apply a guarantee x < 50.  Post-scan filtering would then only need to
  /// consider y > 40 (for this specific batch).  The next batch may not be able
  /// to attach any guarantee and both clauses would need to be applied to that batch.
  ///
  /// A single guarantee-aware filtering operation should generally be applied to all
  /// resulting batches.  The scan node is not responsible for this.
  ///
  /// Fields that are referenced by the filter should be included in the `columns` vector.
  /// The scan node will not automatically fetch fields referenced by the filter
  /// expression. \see AddFieldsNeededForFilter
  ///
  /// If the filter references fields that are not included in `columns` this may or may
  /// not be an error, depending on the format.
  compute::Expression filter = compute::literal(true);

  /// \brief The columns to scan
  ///
  /// This is not a simple list of top-level column indices but instead a set of paths
  /// allowing for partial selection of columns
  ///
  /// These paths refer to the dataset schema
  ///
  /// For example, consider the following dataset schema:
  ///   schema({
  ///     field("score", int32()),
  ///           "marker", struct_({
  ///              field("color", utf8()),
  ///              field("location", struct_({
  ///                  field("x", float64()),
  ///                  field("y", float64())
  ///              })
  ///          })
  ///   })
  ///
  /// If `columns` is {{0}, {1,1,0}} then the output schema is:
  ///   schema({field("score", int32()), field("x", float64())})
  ///
  /// If `columns` is {{1,1,1}, {1,1}} then the output schema is:
  ///   schema({
  ///       field("y", float64()),
  ///       field("location", struct_({
  ///           field("x", float64()),
  ///           field("y", float64())
  ///       })
  ///   })
  std::vector<FieldPath> columns;

  /// \brief Target number of bytes to read ahead in a fragment
  ///
  /// This limit involves some amount of estimation.  Formats typically only know
  /// batch boundaries in terms of rows (not decoded bytes) and so an estimation
  /// must be done to guess the average row size.  Other formats like CSV and JSON
  /// must make even more generalized guesses.
  ///
  /// This is a best-effort guide.  Some formats may need to read ahead further,
  /// for example, if scanning a parquet file that has batches with 100MiB of data
  /// then the actual readahead will be at least 100MiB
  ///
  /// Set to 0 to disable readahead.  When disabled, the scanner will read the
  /// dataset one batch at a time
  ///
  /// This limit applies across all fragments.  If the limit is 32MiB and the
  /// fragment readahead allows for 20 fragments to be read at once then the
  /// total readahead will still be 32MiB and NOT 20 * 32MiB.
  int32_t target_bytes_readahead = kDefaultBytesReadahead;

  /// \brief Number of fragments to read ahead
  ///
  /// Higher readahead will potentially lead to more efficient I/O but will lead
  /// to the scan operation using more RAM.  The default is fairly conservative
  /// and designed for fast local disks (or slow local spinning disks which cannot
  /// handle much parallelism anyways).  When using a highly parallel remote filesystem
  /// you will likely want to increase these values.
  ///
  /// Set to 0 to disable fragment readahead.  When disabled the dataset will be scanned
  /// one fragment at a time.
  int32_t fragment_readahead = kDefaultFragmentReadahead;
  /// \brief Options specific to the file format
  const FragmentScanOptions* format_options = NULLPTR;

  /// \brief Utility method to get a selection representing all columns in a dataset
  static std::vector<FieldPath> AllColumns(const Schema& dataset_schema);

  /// \brief Utility method to add fields needed for the current filter
  ///
  /// This method adds any fields that are needed by `filter` which are not already
  /// included in the list of columns.  Any new fields added will be added to the end
  /// in no particular order.
  static Status AddFieldsNeededForFilter(ScanV2Options* options);
};

/// \brief Describes a projection
struct ARROW_DS_EXPORT ProjectionDescr {
  /// \brief The projection expression itself
  /// This expression must be a call to make_struct
  compute::Expression expression;
  /// \brief The output schema of the projection.

  /// This can be calculated from the input schema and the expression but it
  /// is cached here for convenience.
  std::shared_ptr<Schema> schema;

  /// \brief Create a ProjectionDescr by binding an expression to the dataset schema
  ///
  /// expression must return a struct type
  static Result<ProjectionDescr> FromStructExpression(
      const compute::Expression& expression, const Schema& dataset_schema);

  /// \brief Create a ProjectionDescr from expressions/names for each field
  static Result<ProjectionDescr> FromExpressions(std::vector<compute::Expression> exprs,
                                                 std::vector<std::string> names,
                                                 const Schema& dataset_schema);

  /// \brief Create a default projection referencing fields in the dataset schema
  static Result<ProjectionDescr> FromNames(std::vector<std::string> names,
                                           const Schema& dataset_schema,
                                           bool add_augmented_fields = true);

  /// \brief Make a projection that projects every field in the dataset schema
  static Result<ProjectionDescr> Default(const Schema& dataset_schema,
                                         bool add_augmented_fields = true);
};

/// \brief Utility method to set the projection expression and schema
ARROW_DS_EXPORT void SetProjection(ScanOptions* options, ProjectionDescr projection);

/// \brief Combines a record batch with the fragment that the record batch originated
/// from
///
/// Knowing the source fragment can be useful for debugging & understanding loaded
/// data
struct TaggedRecordBatch {
  std::shared_ptr<RecordBatch> record_batch;
  std::shared_ptr<Fragment> fragment;

  friend inline bool operator==(const TaggedRecordBatch& left,
                                const TaggedRecordBatch& right) {
    return left.record_batch == right.record_batch && left.fragment == right.fragment;
  }
};

using TaggedRecordBatchGenerator = std::function<Future<TaggedRecordBatch>()>;
using TaggedRecordBatchIterator = Iterator<TaggedRecordBatch>;

/// \brief Combines a tagged batch with positional information
///
/// This is returned when scanning batches in an unordered fashion.  This information is
/// needed if you ever want to reassemble the batches in order
struct EnumeratedRecordBatch {
  Enumerated<std::shared_ptr<RecordBatch>> record_batch;
  Enumerated<std::shared_ptr<Fragment>> fragment;

  friend inline bool operator==(const EnumeratedRecordBatch& left,
                                const EnumeratedRecordBatch& right) {
    return left.record_batch == right.record_batch && left.fragment == right.fragment;
  }
};

using EnumeratedRecordBatchGenerator = std::function<Future<EnumeratedRecordBatch>()>;
using EnumeratedRecordBatchIterator = Iterator<EnumeratedRecordBatch>;

/// @}

}  // namespace dataset

template <>
struct IterationTraits<dataset::TaggedRecordBatch> {
  static dataset::TaggedRecordBatch End() {
    return dataset::TaggedRecordBatch{NULLPTR, NULLPTR};
  }
  static bool IsEnd(const dataset::TaggedRecordBatch& val) {
    return val.record_batch == NULLPTR;
  }
};

template <>
struct IterationTraits<dataset::EnumeratedRecordBatch> {
  static dataset::EnumeratedRecordBatch End() {
    return dataset::EnumeratedRecordBatch{
        IterationEnd<Enumerated<std::shared_ptr<RecordBatch>>>(),
        IterationEnd<Enumerated<std::shared_ptr<dataset::Fragment>>>()};
  }
  static bool IsEnd(const dataset::EnumeratedRecordBatch& val) {
    return IsIterationEnd(val.fragment);
  }
};

namespace dataset {

/// \defgroup dataset-scanning Scanning API
///
/// @{

/// \brief A scanner glues together several dataset classes to load in data.
/// The dataset contains a collection of fragments and partitioning rules.
///
/// The fragments identify independently loadable units of data (i.e. each fragment has
/// a potentially unique schema and possibly even format.  It should be possible to read
/// fragments in parallel if desired).
///
/// The fragment's format contains the logic necessary to actually create a task to load
/// the fragment into memory.  That task may or may not support parallel execution of
/// its own.
///
/// The scanner is then responsible for creating scan tasks from every fragment in the
/// dataset and (potentially) sequencing the loaded record batches together.
///
/// The scanner should not buffer the entire dataset in memory (unless asked) instead
/// yielding record batches as soon as they are ready to scan.  Various readahead
/// properties control how much data is allowed to be scanned before pausing to let a
/// slow consumer catchup.
///
/// Today the scanner also handles projection & filtering although that may change in
/// the future.
class ARROW_DS_EXPORT Scanner {
 public:
  virtual ~Scanner() = default;

  /// \brief Apply a visitor to each RecordBatch as it is scanned. If multiple threads
  /// are used (via use_threads), the visitor will be invoked from those threads and is
  /// responsible for any synchronization.
  virtual Status Scan(std::function<Status(TaggedRecordBatch)> visitor) = 0;
  /// \brief Convert a Scanner into a Table.
  ///
  /// Use this convenience utility with care. This will serially materialize the
  /// Scan result in memory before creating the Table.
  virtual Result<std::shared_ptr<Table>> ToTable() = 0;
  /// \brief Scan the dataset into a stream of record batches.  Each batch is tagged
  /// with the fragment it originated from.  The batches will arrive in order.  The
  /// order of fragments is determined by the dataset.
  ///
  /// Note: The scanner will perform some readahead but will avoid materializing too
  /// much in memory (this is goverended by the readahead options and use_threads option).
  /// If the readahead queue fills up then I/O will pause until the calling thread catches
  /// up.
  virtual Result<TaggedRecordBatchIterator> ScanBatches() = 0;
  virtual Result<TaggedRecordBatchGenerator> ScanBatchesAsync() = 0;
  virtual Result<TaggedRecordBatchGenerator> ScanBatchesAsync(
      ::arrow::internal::Executor* cpu_thread_pool) = 0;
  /// \brief Scan the dataset into a stream of record batches.  Unlike ScanBatches this
  /// method may allow record batches to be returned out of order.  This allows for more
  /// efficient scanning: some fragments may be accessed more quickly than others (e.g.
  /// may be cached in RAM or just happen to get scheduled earlier by the I/O)
  ///
  /// To make up for the out-of-order iteration each batch is further tagged with
  /// positional information.
  virtual Result<EnumeratedRecordBatchIterator> ScanBatchesUnordered() = 0;
  virtual Result<EnumeratedRecordBatchGenerator> ScanBatchesUnorderedAsync() = 0;
  virtual Result<EnumeratedRecordBatchGenerator> ScanBatchesUnorderedAsync(
      ::arrow::internal::Executor* cpu_thread_pool) = 0;
  /// \brief A convenience to synchronously load the given rows by index.
  ///
  /// Will only consume as many batches as needed from ScanBatches().
  virtual Result<std::shared_ptr<Table>> TakeRows(const Array& indices) = 0;
  /// \brief Get the first N rows.
  virtual Result<std::shared_ptr<Table>> Head(int64_t num_rows) = 0;
  /// \brief Count rows matching a predicate.
  ///
  /// This method will push down the predicate and compute the result based on fragment
  /// metadata if possible.
  virtual Result<int64_t> CountRows() = 0;
  virtual Future<int64_t> CountRowsAsync() = 0;
  /// \brief Convert the Scanner to a RecordBatchReader so it can be
  /// easily used with APIs that expect a reader.
  virtual Result<std::shared_ptr<RecordBatchReader>> ToRecordBatchReader() = 0;

  /// \brief Get the options for this scan.
  const std::shared_ptr<ScanOptions>& options() const { return scan_options_; }
  /// \brief Get the dataset that this scanner will scan
  virtual const std::shared_ptr<Dataset>& dataset() const = 0;

 protected:
  explicit Scanner(std::shared_ptr<ScanOptions> scan_options)
      : scan_options_(std::move(scan_options)) {}

  Result<EnumeratedRecordBatchIterator> AddPositioningToInOrderScan(
      TaggedRecordBatchIterator scan);

  const std::shared_ptr<ScanOptions> scan_options_;
};

/// \brief ScannerBuilder is a factory class to construct a Scanner. It is used
/// to pass information, notably a potential filter expression and a subset of
/// columns to materialize.
class ARROW_DS_EXPORT ScannerBuilder {
 public:
  explicit ScannerBuilder(std::shared_ptr<Dataset> dataset);

  ScannerBuilder(std::shared_ptr<Dataset> dataset,
                 std::shared_ptr<ScanOptions> scan_options);

  ScannerBuilder(std::shared_ptr<Schema> schema, std::shared_ptr<Fragment> fragment,
                 std::shared_ptr<ScanOptions> scan_options);

  /// \brief Make a scanner from a record batch reader.
  ///
  /// The resulting scanner can be scanned only once. This is intended
  /// to support writing data from streaming sources or other sources
  /// that can be iterated only once.
  static std::shared_ptr<ScannerBuilder> FromRecordBatchReader(
      std::shared_ptr<RecordBatchReader> reader);

  /// \brief Set the subset of columns to materialize.
  ///
  /// Columns which are not referenced may not be read from fragments.
  ///
  /// \param[in] columns list of columns to project. Order and duplicates will
  ///            be preserved.
  ///
  /// \return Failure if any column name does not exists in the dataset's
  ///         Schema.
  Status Project(std::vector<std::string> columns);

  /// \brief Set expressions which will be evaluated to produce the materialized
  /// columns.
  ///
  /// Columns which are not referenced may not be read from fragments.
  ///
  /// \param[in] exprs expressions to evaluate to produce columns.
  /// \param[in] names list of names for the resulting columns.
  ///
  /// \return Failure if any referenced column does not exists in the dataset's
  ///         Schema.
  Status Project(std::vector<compute::Expression> exprs, std::vector<std::string> names);

  /// \brief Set the filter expression to return only rows matching the filter.
  ///
  /// The predicate will be passed down to Sources and corresponding
  /// Fragments to exploit predicate pushdown if possible using
  /// partition information or Fragment internal metadata, e.g. Parquet statistics.
  /// Columns which are not referenced may not be read from fragments.
  ///
  /// \param[in] filter expression to filter rows with.
  ///
  /// \return Failure if any referenced columns does not exist in the dataset's
  ///         Schema.
  Status Filter(const compute::Expression& filter);

  /// \brief Indicate if the Scanner should make use of the available
  ///        ThreadPool found in ScanOptions;
  Status UseThreads(bool use_threads = true);

  /// \brief Indicate if metadata should be cached when scanning
  ///
  /// Fragments may typically cache metadata to speed up repeated accesses.
  /// However, in use cases where a single scan is done, or if memory use
  /// is more critical than CPU time, setting this option to false can
  /// lessen memory use.
  Status CacheMetadata(bool cache_metadata = true);

  /// \brief Set the maximum number of rows per RecordBatch.
  ///
  /// \param[in] batch_size the maximum number of rows.
  /// \returns An error if the number for batch is not greater than 0.
  ///
  /// This option provides a control limiting the memory owned by any RecordBatch.
  Status BatchSize(int64_t batch_size);

  /// \brief Set the number of batches to read ahead within a fragment.
  ///
  /// \param[in] batch_readahead How many batches to read ahead within a fragment
  /// \returns an error if this number is less than 0.
  ///
  /// This option provides a control on the RAM vs I/O tradeoff.
  /// It might not be supported by all file formats, in which case it will
  /// simply be ignored.
  Status BatchReadahead(int32_t batch_readahead);

  /// \brief Set the number of fragments to read ahead
  ///
  /// \param[in] fragment_readahead How many fragments to read ahead
  /// \returns an error if this number is less than 0.
  ///
  /// This option provides a control on the RAM vs I/O tradeoff.
  Status FragmentReadahead(int32_t fragment_readahead);

  /// \brief Set the pool from which materialized and scanned arrays will be allocated.
  Status Pool(MemoryPool* pool);

  /// \brief Set fragment-specific scan options.
  Status FragmentScanOptions(std::shared_ptr<FragmentScanOptions> fragment_scan_options);

  /// \brief Override default backpressure configuration
  Status Backpressure(acero::BackpressureOptions backpressure);

  /// \brief Return the current scan options for the builder.
  Result<std::shared_ptr<ScanOptions>> GetScanOptions();

  /// \brief Return the constructed now-immutable Scanner object
  Result<std::shared_ptr<Scanner>> Finish();

  const std::shared_ptr<Schema>& schema() const;
  const std::shared_ptr<Schema>& projected_schema() const;

 private:
  std::shared_ptr<Dataset> dataset_;
  std::shared_ptr<ScanOptions> scan_options_ = std::make_shared<ScanOptions>();
};

/// \brief Construct a source ExecNode which yields batches from a dataset scan.
///
/// Does not construct associated filter or project nodes.
///
/// Batches are yielded sequentially, like single-threaded,
/// when require_sequenced_output=true.
///
/// Yielded batches will be augmented with fragment/batch indices when
/// implicit_ordering=true to enable stable ordering for simple ExecPlans.
class ARROW_DS_EXPORT ScanNodeOptions : public acero::ExecNodeOptions {
 public:
  explicit ScanNodeOptions(std::shared_ptr<Dataset> dataset,
                           std::shared_ptr<ScanOptions> scan_options,
                           bool require_sequenced_output = false,
                           bool implicit_ordering = false)
      : dataset(std::move(dataset)),
        scan_options(std::move(scan_options)),
        require_sequenced_output(require_sequenced_output),
        implicit_ordering(implicit_ordering) {}

  std::shared_ptr<Dataset> dataset;
  std::shared_ptr<ScanOptions> scan_options;
  bool require_sequenced_output;
  bool implicit_ordering;
};

/// @}

namespace internal {
ARROW_DS_EXPORT void InitializeScanner(arrow::acero::ExecFactoryRegistry* registry);
ARROW_DS_EXPORT void InitializeScannerV2(arrow::acero::ExecFactoryRegistry* registry);
}  // namespace internal
}  // namespace dataset
}  // namespace arrow