|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#pragma once |
|
|
|
#include <array> |
|
#include <chrono> |
|
#include <cstdint> |
|
#include <cstring> |
|
#include <memory> |
|
#include <optional> |
|
#include <string> |
|
#include <vector> |
|
|
|
#include "parquet/column_reader.h" |
|
#include "parquet/file_reader.h" |
|
#include "parquet/stream_writer.h" |
|
|
|
namespace parquet { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PARQUET_EXPORT StreamReader { |
|
public: |
|
template <typename T> |
|
using optional = ::std::optional<T>; |
|
|
|
|
|
|
|
|
|
StreamReader() = default; |
|
|
|
explicit StreamReader(std::unique_ptr<ParquetFileReader> reader); |
|
|
|
~StreamReader() = default; |
|
|
|
bool eof() const { return eof_; } |
|
|
|
int current_column() const { return column_index_; } |
|
|
|
int64_t current_row() const { return current_row_; } |
|
|
|
int num_columns() const; |
|
|
|
int64_t num_rows() const; |
|
|
|
|
|
StreamReader(StreamReader&&) = default; |
|
StreamReader& operator=(StreamReader&&) = default; |
|
|
|
|
|
StreamReader(const StreamReader&) = delete; |
|
StreamReader& operator=(const StreamReader&) = delete; |
|
|
|
StreamReader& operator>>(bool& v); |
|
|
|
StreamReader& operator>>(int8_t& v); |
|
|
|
StreamReader& operator>>(uint8_t& v); |
|
|
|
StreamReader& operator>>(int16_t& v); |
|
|
|
StreamReader& operator>>(uint16_t& v); |
|
|
|
StreamReader& operator>>(int32_t& v); |
|
|
|
StreamReader& operator>>(uint32_t& v); |
|
|
|
StreamReader& operator>>(int64_t& v); |
|
|
|
StreamReader& operator>>(uint64_t& v); |
|
|
|
StreamReader& operator>>(std::chrono::milliseconds& v); |
|
|
|
StreamReader& operator>>(std::chrono::microseconds& v); |
|
|
|
StreamReader& operator>>(float& v); |
|
|
|
StreamReader& operator>>(double& v); |
|
|
|
StreamReader& operator>>(char& v); |
|
|
|
template <int N> |
|
StreamReader& operator>>(char (&v)[N]) { |
|
ReadFixedLength(v, N); |
|
return *this; |
|
} |
|
|
|
template <std::size_t N> |
|
StreamReader& operator>>(std::array<char, N>& v) { |
|
ReadFixedLength(v.data(), static_cast<int>(N)); |
|
return *this; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
StreamReader& operator>>(std::string& v); |
|
|
|
StreamReader& operator>>(::arrow::Decimal128& v); |
|
|
|
|
|
|
|
StreamReader& operator>>(optional<bool>& v); |
|
|
|
StreamReader& operator>>(optional<int8_t>& v); |
|
|
|
StreamReader& operator>>(optional<uint8_t>& v); |
|
|
|
StreamReader& operator>>(optional<int16_t>& v); |
|
|
|
StreamReader& operator>>(optional<uint16_t>& v); |
|
|
|
StreamReader& operator>>(optional<int32_t>& v); |
|
|
|
StreamReader& operator>>(optional<uint32_t>& v); |
|
|
|
StreamReader& operator>>(optional<int64_t>& v); |
|
|
|
StreamReader& operator>>(optional<uint64_t>& v); |
|
|
|
StreamReader& operator>>(optional<float>& v); |
|
|
|
StreamReader& operator>>(optional<double>& v); |
|
|
|
StreamReader& operator>>(optional<std::chrono::milliseconds>& v); |
|
|
|
StreamReader& operator>>(optional<std::chrono::microseconds>& v); |
|
|
|
StreamReader& operator>>(optional<char>& v); |
|
|
|
StreamReader& operator>>(optional<std::string>& v); |
|
|
|
StreamReader& operator>>(optional<::arrow::Decimal128>& v); |
|
|
|
template <std::size_t N> |
|
StreamReader& operator>>(optional<std::array<char, N>>& v) { |
|
CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE, N); |
|
FixedLenByteArray flba; |
|
if (ReadOptional(&flba)) { |
|
v = std::array<char, N>{}; |
|
std::memcpy(v->data(), flba.ptr, N); |
|
} else { |
|
v.reset(); |
|
} |
|
return *this; |
|
} |
|
|
|
|
|
|
|
|
|
void EndRow(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int64_t SkipColumns(int64_t num_columns_to_skip); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int64_t SkipRows(int64_t num_rows_to_skip); |
|
|
|
protected: |
|
[[noreturn]] void ThrowReadFailedException( |
|
const std::shared_ptr<schema::PrimitiveNode>& node); |
|
|
|
template <typename ReaderType, typename T> |
|
void Read(T* v) { |
|
const auto& node = nodes_[column_index_]; |
|
auto reader = static_cast<ReaderType*>(column_readers_[column_index_++].get()); |
|
int16_t def_level; |
|
int16_t rep_level; |
|
int64_t values_read; |
|
|
|
reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, v, &values_read); |
|
|
|
if (values_read != 1) { |
|
ThrowReadFailedException(node); |
|
} |
|
} |
|
|
|
template <typename ReaderType, typename ReadType, typename T> |
|
void Read(T* v) { |
|
const auto& node = nodes_[column_index_]; |
|
auto reader = static_cast<ReaderType*>(column_readers_[column_index_++].get()); |
|
int16_t def_level; |
|
int16_t rep_level; |
|
ReadType tmp; |
|
int64_t values_read; |
|
|
|
reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, &tmp, &values_read); |
|
|
|
if (values_read == 1) { |
|
*v = tmp; |
|
} else { |
|
ThrowReadFailedException(node); |
|
} |
|
} |
|
|
|
template <typename ReaderType, typename ReadType = typename ReaderType::T, typename T> |
|
void ReadOptional(optional<T>* v) { |
|
const auto& node = nodes_[column_index_]; |
|
auto reader = static_cast<ReaderType*>(column_readers_[column_index_++].get()); |
|
int16_t def_level; |
|
int16_t rep_level; |
|
ReadType tmp; |
|
int64_t values_read; |
|
|
|
reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, &tmp, &values_read); |
|
|
|
if (values_read == 1) { |
|
*v = T(tmp); |
|
} else if ((values_read == 0) && (def_level == 0)) { |
|
v->reset(); |
|
} else { |
|
ThrowReadFailedException(node); |
|
} |
|
} |
|
|
|
void ReadFixedLength(char* ptr, int len); |
|
|
|
void Read(ByteArray* v); |
|
|
|
void Read(FixedLenByteArray* v); |
|
|
|
bool ReadOptional(ByteArray* v); |
|
|
|
bool ReadOptional(FixedLenByteArray* v); |
|
|
|
void NextRowGroup(); |
|
|
|
void CheckColumn(Type::type physical_type, ConvertedType::type converted_type, |
|
int length = 0); |
|
|
|
void SkipRowsInColumn(ColumnReader* reader, int64_t num_rows_to_skip); |
|
|
|
void SetEof(); |
|
|
|
private: |
|
std::unique_ptr<ParquetFileReader> file_reader_; |
|
std::shared_ptr<FileMetaData> file_metadata_; |
|
std::shared_ptr<RowGroupReader> row_group_reader_; |
|
std::vector<std::shared_ptr<ColumnReader>> column_readers_; |
|
std::vector<std::shared_ptr<schema::PrimitiveNode>> nodes_; |
|
|
|
bool eof_{true}; |
|
int row_group_index_{0}; |
|
int column_index_{0}; |
|
int64_t current_row_{0}; |
|
int64_t row_group_row_offset_{0}; |
|
|
|
static constexpr int64_t kBatchSizeOne = 1; |
|
}; |
|
|
|
PARQUET_EXPORT |
|
StreamReader& operator>>(StreamReader&, EndRowType); |
|
|
|
} |
|
|