|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#pragma once |
|
|
|
#include <stdio.h> |
|
|
|
#include <cstdint> |
|
#include <memory> |
|
#include <ostream> |
|
#include <string> |
|
#include <utility> |
|
#include <vector> |
|
|
|
#include "parquet/column_reader.h" |
|
#include "parquet/exception.h" |
|
#include "parquet/platform.h" |
|
#include "parquet/schema.h" |
|
#include "parquet/types.h" |
|
|
|
namespace parquet { |
|
|
|
static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128; |
|
|
|
class PARQUET_EXPORT Scanner { |
|
public: |
|
explicit Scanner(std::shared_ptr<ColumnReader> reader, |
|
int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, |
|
::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) |
|
: batch_size_(batch_size), |
|
level_offset_(0), |
|
levels_buffered_(0), |
|
value_buffer_(AllocateBuffer(pool)), |
|
value_offset_(0), |
|
values_buffered_(0), |
|
reader_(std::move(reader)) { |
|
def_levels_.resize( |
|
descr()->max_definition_level() > 0 ? static_cast<size_t>(batch_size_) : 0); |
|
rep_levels_.resize( |
|
descr()->max_repetition_level() > 0 ? static_cast<size_t>(batch_size_) : 0); |
|
} |
|
|
|
virtual ~Scanner() {} |
|
|
|
static std::shared_ptr<Scanner> Make( |
|
std::shared_ptr<ColumnReader> col_reader, |
|
int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, |
|
::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); |
|
|
|
virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) = 0; |
|
|
|
bool HasNext() { return level_offset_ < levels_buffered_ || reader_->HasNext(); } |
|
|
|
const ColumnDescriptor* descr() const { return reader_->descr(); } |
|
|
|
int64_t batch_size() const { return batch_size_; } |
|
|
|
void SetBatchSize(int64_t batch_size) { batch_size_ = batch_size; } |
|
|
|
protected: |
|
int64_t batch_size_; |
|
|
|
std::vector<int16_t> def_levels_; |
|
std::vector<int16_t> rep_levels_; |
|
int level_offset_; |
|
int levels_buffered_; |
|
|
|
std::shared_ptr<ResizableBuffer> value_buffer_; |
|
int value_offset_; |
|
int64_t values_buffered_; |
|
std::shared_ptr<ColumnReader> reader_; |
|
}; |
|
|
|
template <typename DType> |
|
class PARQUET_TEMPLATE_CLASS_EXPORT TypedScanner : public Scanner { |
|
public: |
|
typedef typename DType::c_type T; |
|
|
|
explicit TypedScanner(std::shared_ptr<ColumnReader> reader, |
|
int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE, |
|
::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) |
|
: Scanner(std::move(reader), batch_size, pool) { |
|
typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader_.get()); |
|
int value_byte_size = type_traits<DType::type_num>::value_byte_size; |
|
PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size)); |
|
values_ = reinterpret_cast<T*>(value_buffer_->mutable_data()); |
|
} |
|
|
|
virtual ~TypedScanner() {} |
|
|
|
bool NextLevels(int16_t* def_level, int16_t* rep_level) { |
|
if (level_offset_ == levels_buffered_) { |
|
levels_buffered_ = static_cast<int>( |
|
typed_reader_->ReadBatch(static_cast<int>(batch_size_), def_levels_.data(), |
|
rep_levels_.data(), values_, &values_buffered_)); |
|
|
|
value_offset_ = 0; |
|
level_offset_ = 0; |
|
if (!levels_buffered_) { |
|
return false; |
|
} |
|
} |
|
*def_level = descr()->max_definition_level() > 0 ? def_levels_[level_offset_] : 0; |
|
*rep_level = descr()->max_repetition_level() > 0 ? rep_levels_[level_offset_] : 0; |
|
level_offset_++; |
|
return true; |
|
} |
|
|
|
bool Next(T* val, int16_t* def_level, int16_t* rep_level, bool* is_null) { |
|
if (level_offset_ == levels_buffered_) { |
|
if (!HasNext()) { |
|
|
|
return false; |
|
} |
|
} |
|
|
|
NextLevels(def_level, rep_level); |
|
*is_null = *def_level < descr()->max_definition_level(); |
|
|
|
if (*is_null) { |
|
return true; |
|
} |
|
|
|
if (value_offset_ == values_buffered_) { |
|
throw ParquetException("Value was non-null, but has not been buffered"); |
|
} |
|
*val = values_[value_offset_++]; |
|
return true; |
|
} |
|
|
|
|
|
bool NextValue(T* val, bool* is_null) { |
|
if (level_offset_ == levels_buffered_) { |
|
if (!HasNext()) { |
|
|
|
return false; |
|
} |
|
} |
|
|
|
|
|
int16_t def_level = -1; |
|
int16_t rep_level = -1; |
|
NextLevels(&def_level, &rep_level); |
|
*is_null = def_level < descr()->max_definition_level(); |
|
|
|
if (*is_null) { |
|
return true; |
|
} |
|
|
|
if (value_offset_ == values_buffered_) { |
|
throw ParquetException("Value was non-null, but has not been buffered"); |
|
} |
|
*val = values_[value_offset_++]; |
|
return true; |
|
} |
|
|
|
virtual void PrintNext(std::ostream& out, int width, bool with_levels = false) { |
|
T val{}; |
|
int16_t def_level = -1; |
|
int16_t rep_level = -1; |
|
bool is_null = false; |
|
char buffer[80]; |
|
|
|
if (!Next(&val, &def_level, &rep_level, &is_null)) { |
|
throw ParquetException("No more values buffered"); |
|
} |
|
|
|
if (with_levels) { |
|
out << " D:" << def_level << " R:" << rep_level << " "; |
|
if (!is_null) { |
|
out << "V:"; |
|
} |
|
} |
|
|
|
if (is_null) { |
|
std::string null_fmt = format_fwf<ByteArrayType>(width); |
|
snprintf(buffer, sizeof(buffer), null_fmt.c_str(), "NULL"); |
|
} else { |
|
FormatValue(&val, buffer, sizeof(buffer), width); |
|
} |
|
out << buffer; |
|
} |
|
|
|
private: |
|
|
|
TypedColumnReader<DType>* typed_reader_; |
|
|
|
inline void FormatValue(void* val, char* buffer, int bufsize, int width); |
|
|
|
T* values_; |
|
}; |
|
|
|
template <typename DType> |
|
inline void TypedScanner<DType>::FormatValue(void* val, char* buffer, int bufsize, |
|
int width) { |
|
std::string fmt = format_fwf<DType>(width); |
|
snprintf(buffer, bufsize, fmt.c_str(), *reinterpret_cast<T*>(val)); |
|
} |
|
|
|
template <> |
|
inline void TypedScanner<Int96Type>::FormatValue(void* val, char* buffer, int bufsize, |
|
int width) { |
|
std::string fmt = format_fwf<Int96Type>(width); |
|
std::string result = Int96ToString(*reinterpret_cast<Int96*>(val)); |
|
snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); |
|
} |
|
|
|
template <> |
|
inline void TypedScanner<ByteArrayType>::FormatValue(void* val, char* buffer, int bufsize, |
|
int width) { |
|
std::string fmt = format_fwf<ByteArrayType>(width); |
|
std::string result = ByteArrayToString(*reinterpret_cast<ByteArray*>(val)); |
|
snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); |
|
} |
|
|
|
template <> |
|
inline void TypedScanner<FLBAType>::FormatValue(void* val, char* buffer, int bufsize, |
|
int width) { |
|
std::string fmt = format_fwf<FLBAType>(width); |
|
std::string result = FixedLenByteArrayToString( |
|
*reinterpret_cast<FixedLenByteArray*>(val), descr()->type_length()); |
|
snprintf(buffer, bufsize, fmt.c_str(), result.c_str()); |
|
} |
|
|
|
typedef TypedScanner<BooleanType> BoolScanner; |
|
typedef TypedScanner<Int32Type> Int32Scanner; |
|
typedef TypedScanner<Int64Type> Int64Scanner; |
|
typedef TypedScanner<Int96Type> Int96Scanner; |
|
typedef TypedScanner<FloatType> FloatScanner; |
|
typedef TypedScanner<DoubleType> DoubleScanner; |
|
typedef TypedScanner<ByteArrayType> ByteArrayScanner; |
|
typedef TypedScanner<FLBAType> FixedLenByteArrayScanner; |
|
|
|
template <typename RType> |
|
int64_t ScanAll(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels, |
|
uint8_t* values, int64_t* values_buffered, |
|
parquet::ColumnReader* reader) { |
|
typedef typename RType::T Type; |
|
auto typed_reader = static_cast<RType*>(reader); |
|
auto vals = reinterpret_cast<Type*>(&values[0]); |
|
return typed_reader->ReadBatch(batch_size, def_levels, rep_levels, vals, |
|
values_buffered); |
|
} |
|
|
|
int64_t PARQUET_EXPORT ScanAllValues(int32_t batch_size, int16_t* def_levels, |
|
int16_t* rep_levels, uint8_t* values, |
|
int64_t* values_buffered, |
|
parquet::ColumnReader* reader); |
|
|
|
} |
|
|