File size: 11,185 Bytes
9c6594c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstdint>
#include <memory>
#include <string>
#include <vector>

#include "arrow/io/caching.h"
#include "arrow/util/type_fwd.h"
#include "parquet/metadata.h"  // IWYU pragma: keep
#include "parquet/platform.h"
#include "parquet/properties.h"

namespace parquet {

class ColumnReader;
class FileMetaData;
class PageIndexReader;
class BloomFilterReader;
class PageReader;
class RowGroupMetaData;

namespace internal {
class RecordReader;
}

class PARQUET_EXPORT RowGroupReader {
 public:
  // Forward declare a virtual class 'Contents' to aid dependency injection and more
  // easily create test fixtures
  // An implementation of the Contents class is defined in the .cc file
  struct Contents {
    virtual ~Contents() {}
    virtual std::unique_ptr<PageReader> GetColumnPageReader(int i) = 0;
    virtual const RowGroupMetaData* metadata() const = 0;
    virtual const ReaderProperties* properties() const = 0;
  };

  explicit RowGroupReader(std::unique_ptr<Contents> contents);

  // Returns the rowgroup metadata
  const RowGroupMetaData* metadata() const;

  // Construct a ColumnReader for the indicated row group-relative
  // column. Ownership is shared with the RowGroupReader.
  std::shared_ptr<ColumnReader> Column(int i);

  // EXPERIMENTAL: Construct a RecordReader for the indicated column of the row group.
  // Ownership is shared with the RowGroupReader.
  std::shared_ptr<internal::RecordReader> RecordReader(int i,
                                                       bool read_dictionary = false);

  // Construct a ColumnReader, trying to enable exposed encoding.
  //
  // For dictionary encoding, currently we only support column chunks that are fully
  // dictionary encoded, i.e., all data pages in the column chunk are dictionary encoded.
  // If a column chunk uses dictionary encoding but then falls back to plain encoding, the
  // encoding will not be exposed.
  //
  // The returned column reader provides an API GetExposedEncoding() for the
  // users to check the exposed encoding and determine how to read the batches.
  //
  // \note API EXPERIMENTAL
  std::shared_ptr<ColumnReader> ColumnWithExposeEncoding(
      int i, ExposedEncoding encoding_to_expose);

  // Construct a RecordReader, trying to enable exposed encoding.
  //
  // For dictionary encoding, currently we only support column chunks that are
  // fully dictionary encoded byte arrays. The caller should verify if the reader can read
  // and expose the dictionary by checking the reader's read_dictionary(). If a column
  // chunk uses dictionary encoding but then falls back to plain encoding, the returned
  // reader will read decoded data without exposing the dictionary.
  //
  // \note API EXPERIMENTAL
  std::shared_ptr<internal::RecordReader> RecordReaderWithExposeEncoding(
      int i, ExposedEncoding encoding_to_expose);

  std::unique_ptr<PageReader> GetColumnPageReader(int i);

 private:
  // Holds a pointer to an instance of Contents implementation
  std::unique_ptr<Contents> contents_;
};

class PARQUET_EXPORT ParquetFileReader {
 public:
  // Declare a virtual class 'Contents' to aid dependency injection and more
  // easily create test fixtures
  // An implementation of the Contents class is defined in the .cc file
  struct PARQUET_EXPORT Contents {
    static std::unique_ptr<Contents> Open(
        std::shared_ptr<::arrow::io::RandomAccessFile> source,
        const ReaderProperties& props = default_reader_properties(),
        std::shared_ptr<FileMetaData> metadata = NULLPTR);

    static ::arrow::Future<std::unique_ptr<Contents>> OpenAsync(
        std::shared_ptr<::arrow::io::RandomAccessFile> source,
        const ReaderProperties& props = default_reader_properties(),
        std::shared_ptr<FileMetaData> metadata = NULLPTR);

    virtual ~Contents() = default;
    // Perform any cleanup associated with the file contents
    virtual void Close() = 0;
    virtual std::shared_ptr<RowGroupReader> GetRowGroup(int i) = 0;
    virtual std::shared_ptr<FileMetaData> metadata() const = 0;
    virtual std::shared_ptr<PageIndexReader> GetPageIndexReader() = 0;
    virtual BloomFilterReader& GetBloomFilterReader() = 0;
  };

  ParquetFileReader();
  ~ParquetFileReader();

  // Create a file reader instance from an Arrow file object. Thread-safety is
  // the responsibility of the file implementation
  static std::unique_ptr<ParquetFileReader> Open(
      std::shared_ptr<::arrow::io::RandomAccessFile> source,
      const ReaderProperties& props = default_reader_properties(),
      std::shared_ptr<FileMetaData> metadata = NULLPTR);

  // API Convenience to open a serialized Parquet file on disk, using Arrow IO
  // interfaces.
  static std::unique_ptr<ParquetFileReader> OpenFile(
      const std::string& path, bool memory_map = false,
      const ReaderProperties& props = default_reader_properties(),
      std::shared_ptr<FileMetaData> metadata = NULLPTR);

  // Asynchronously open a file reader from an Arrow file object.
  // Does not throw - all errors are reported through the Future.
  static ::arrow::Future<std::unique_ptr<ParquetFileReader>> OpenAsync(
      std::shared_ptr<::arrow::io::RandomAccessFile> source,
      const ReaderProperties& props = default_reader_properties(),
      std::shared_ptr<FileMetaData> metadata = NULLPTR);

  void Open(std::unique_ptr<Contents> contents);
  void Close();

  // The RowGroupReader is owned by the FileReader
  std::shared_ptr<RowGroupReader> RowGroup(int i);

  // Returns the file metadata. Only one instance is ever created
  std::shared_ptr<FileMetaData> metadata() const;

  /// Returns the PageIndexReader. Only one instance is ever created.
  ///
  /// If the file does not have the page index, nullptr may be returned.
  /// Because it pays to check existence of page index in the file, it
  /// is possible to return a non null value even if page index does
  /// not exist. It is the caller's responsibility to check the return
  /// value and follow-up calls to PageIndexReader.
  ///
  /// WARNING: The returned PageIndexReader must not outlive the ParquetFileReader.
  /// Initialize GetPageIndexReader() is not thread-safety.
  std::shared_ptr<PageIndexReader> GetPageIndexReader();

  /// Returns the BloomFilterReader. Only one instance is ever created.
  ///
  /// WARNING: The returned BloomFilterReader must not outlive the ParquetFileReader.
  /// Initialize GetBloomFilterReader() is not thread-safety.
  BloomFilterReader& GetBloomFilterReader();

  /// Pre-buffer the specified column indices in all row groups.
  ///
  /// Readers can optionally call this to cache the necessary slices
  /// of the file in-memory before deserialization. Arrow readers can
  /// automatically do this via an option. This is intended to
  /// increase performance when reading from high-latency filesystems
  /// (e.g. Amazon S3).
  ///
  /// After calling this, creating readers for row groups/column
  /// indices that were not buffered may fail. Creating multiple
  /// readers for the a subset of the buffered regions is
  /// acceptable. This may be called again to buffer a different set
  /// of row groups/columns.
  ///
  /// If memory usage is a concern, note that data will remain
  /// buffered in memory until either \a PreBuffer() is called again,
  /// or the reader itself is destructed. Reading - and buffering -
  /// only one row group at a time may be useful.
  ///
  /// This method may throw.
  void PreBuffer(const std::vector<int>& row_groups,
                 const std::vector<int>& column_indices,
                 const ::arrow::io::IOContext& ctx,
                 const ::arrow::io::CacheOptions& options);

  /// Retrieve the list of byte ranges that would need to be read to retrieve
  /// the data for the specified row groups and column indices.
  ///
  /// A reader can optionally call this if they wish to handle their own
  /// caching and management of file reads (or offload them to other readers).
  /// Unlike PreBuffer, this method will not perform any actual caching or
  /// reads, instead just using the file metadata to determine the byte ranges
  /// that would need to be read if you were to consume the entirety of the column
  /// chunks for the provided columns in the specified row groups.
  ///
  /// If row_groups or column_indices are empty, then the result of this will be empty.
  ///
  /// hole_size_limit represents the maximum distance, in bytes, between two
  /// consecutive ranges; beyond this value, ranges will not be combined. The default
  /// value is 1MB.
  ///
  /// range_size_limit is the maximum size in bytes of a combined range; if combining
  /// two consecutive ranges would produce a range larger than this, they are not
  /// combined. The default values is 64MB. This *must* be larger than hole_size_limit.
  ///
  /// This will not take into account page indexes or any other predicate push down
  /// benefits that may be available.
  ::arrow::Result<std::vector<::arrow::io::ReadRange>> GetReadRanges(
      const std::vector<int>& row_groups, const std::vector<int>& column_indices,
      int64_t hole_size_limit = 1024 * 1024, int64_t range_size_limit = 64 * 1024 * 1024);

  /// Wait for the specified row groups and column indices to be pre-buffered.
  ///
  /// After the returned Future completes, reading the specified row
  /// groups/columns will not block.
  ///
  /// PreBuffer must be called first. This method does not throw.
  ::arrow::Future<> WhenBuffered(const std::vector<int>& row_groups,
                                 const std::vector<int>& column_indices) const;

 private:
  // Holds a pointer to an instance of Contents implementation
  std::unique_ptr<Contents> contents_;
};

// Read only Parquet file metadata
std::shared_ptr<FileMetaData> PARQUET_EXPORT
ReadMetaData(const std::shared_ptr<::arrow::io::RandomAccessFile>& source);

/// \brief Scan all values in file. Useful for performance testing
/// \param[in] columns the column numbers to scan. If empty scans all
/// \param[in] column_batch_size number of values to read at a time when scanning column
/// \param[in] reader a ParquetFileReader instance
/// \return number of semantic rows in file
PARQUET_EXPORT
int64_t ScanFileContents(std::vector<int> columns, const int32_t column_batch_size,
                         ParquetFileReader* reader);

}  // namespace parquet