|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#pragma once |
|
|
|
#include <cstdint> |
|
#include <cstring> |
|
#include <memory> |
|
|
|
#include "arrow/type_fwd.h" |
|
#include "arrow/util/compression.h" |
|
#include "parquet/exception.h" |
|
#include "parquet/platform.h" |
|
#include "parquet/types.h" |
|
|
|
namespace arrow { |
|
|
|
class Array; |
|
|
|
namespace bit_util { |
|
class BitWriter; |
|
} |
|
|
|
namespace util { |
|
class RleEncoder; |
|
class CodecOptions; |
|
} |
|
|
|
} |
|
|
|
namespace parquet { |
|
|
|
struct ArrowWriteContext; |
|
class ColumnChunkMetaDataBuilder; |
|
class ColumnDescriptor; |
|
class ColumnIndexBuilder; |
|
class DataPage; |
|
class DictionaryPage; |
|
class Encryptor; |
|
class OffsetIndexBuilder; |
|
class WriterProperties; |
|
|
|
class PARQUET_EXPORT LevelEncoder { |
|
public: |
|
LevelEncoder(); |
|
~LevelEncoder(); |
|
|
|
static int MaxBufferSize(Encoding::type encoding, int16_t max_level, |
|
int num_buffered_values); |
|
|
|
|
|
void Init(Encoding::type encoding, int16_t max_level, int num_buffered_values, |
|
uint8_t* data, int data_size); |
|
|
|
|
|
int Encode(int batch_size, const int16_t* levels); |
|
|
|
int32_t len() { |
|
if (encoding_ != Encoding::RLE) { |
|
throw ParquetException("Only implemented for RLE encoding"); |
|
} |
|
return rle_length_; |
|
} |
|
|
|
private: |
|
int bit_width_; |
|
int rle_length_; |
|
Encoding::type encoding_; |
|
std::unique_ptr<::arrow::util::RleEncoder> rle_encoder_; |
|
std::unique_ptr<::arrow::bit_util::BitWriter> bit_packed_encoder_; |
|
}; |
|
|
|
class PARQUET_EXPORT PageWriter { |
|
public: |
|
virtual ~PageWriter() {} |
|
|
|
static std::unique_ptr<PageWriter> Open( |
|
std::shared_ptr<ArrowOutputStream> sink, Compression::type codec, |
|
ColumnChunkMetaDataBuilder* metadata, int16_t row_group_ordinal = -1, |
|
int16_t column_chunk_ordinal = -1, |
|
::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), |
|
bool buffered_row_group = false, |
|
std::shared_ptr<Encryptor> header_encryptor = NULLPTR, |
|
std::shared_ptr<Encryptor> data_encryptor = NULLPTR, |
|
bool page_write_checksum_enabled = false, |
|
|
|
ColumnIndexBuilder* column_index_builder = NULLPTR, |
|
|
|
OffsetIndexBuilder* offset_index_builder = NULLPTR, |
|
const CodecOptions& codec_options = CodecOptions{}); |
|
|
|
|
|
|
|
|
|
virtual void Close(bool has_dictionary, bool fallback) = 0; |
|
|
|
|
|
virtual int64_t WriteDataPage(const DataPage& page) = 0; |
|
|
|
|
|
virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0; |
|
|
|
|
|
|
|
virtual int64_t total_compressed_bytes_written() const = 0; |
|
|
|
virtual bool has_compressor() = 0; |
|
|
|
virtual void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) = 0; |
|
}; |
|
|
|
class PARQUET_EXPORT ColumnWriter { |
|
public: |
|
virtual ~ColumnWriter() = default; |
|
|
|
static std::shared_ptr<ColumnWriter> Make(ColumnChunkMetaDataBuilder*, |
|
std::unique_ptr<PageWriter>, |
|
const WriterProperties* properties); |
|
|
|
|
|
|
|
virtual int64_t Close() = 0; |
|
|
|
|
|
virtual Type::type type() const = 0; |
|
|
|
|
|
virtual const ColumnDescriptor* descr() const = 0; |
|
|
|
|
|
virtual int64_t rows_written() const = 0; |
|
|
|
|
|
|
|
|
|
|
|
virtual int64_t total_compressed_bytes() const = 0; |
|
|
|
|
|
|
|
|
|
virtual int64_t total_bytes_written() const = 0; |
|
|
|
|
|
|
|
|
|
|
|
virtual int64_t total_compressed_bytes_written() const = 0; |
|
|
|
|
|
virtual int64_t estimated_buffered_value_bytes() const = 0; |
|
|
|
|
|
virtual const WriterProperties* properties() = 0; |
|
|
|
|
|
|
|
|
|
|
|
virtual void AddKeyValueMetadata( |
|
const std::shared_ptr<const ::arrow::KeyValueMetadata>& key_value_metadata) = 0; |
|
|
|
|
|
|
|
virtual void ResetKeyValueMetadata() = 0; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
virtual ::arrow::Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels, |
|
int64_t num_levels, const ::arrow::Array& leaf_array, |
|
ArrowWriteContext* ctx, |
|
bool leaf_field_nullable) = 0; |
|
}; |
|
|
|
|
|
template <typename DType> |
|
class TypedColumnWriter : public ColumnWriter { |
|
public: |
|
using T = typename DType::c_type; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
virtual int64_t WriteBatch(int64_t num_values, const int16_t* def_levels, |
|
const int16_t* rep_levels, const T* values) = 0; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
virtual void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels, |
|
const int16_t* rep_levels, const uint8_t* valid_bits, |
|
int64_t valid_bits_offset, const T* values) = 0; |
|
}; |
|
|
|
using BoolWriter = TypedColumnWriter<BooleanType>; |
|
using Int32Writer = TypedColumnWriter<Int32Type>; |
|
using Int64Writer = TypedColumnWriter<Int64Type>; |
|
using Int96Writer = TypedColumnWriter<Int96Type>; |
|
using FloatWriter = TypedColumnWriter<FloatType>; |
|
using DoubleWriter = TypedColumnWriter<DoubleType>; |
|
using ByteArrayWriter = TypedColumnWriter<ByteArrayType>; |
|
using FixedLenByteArrayWriter = TypedColumnWriter<FLBAType>; |
|
|
|
namespace internal { |
|
|
|
|
|
|
|
|
|
constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588); |
|
|
|
template <int64_t UnitPerDay, int64_t NanosecondsPerUnit> |
|
inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) { |
|
int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays; |
|
(*impala_timestamp).value[2] = (uint32_t)julian_days; |
|
|
|
int64_t last_day_units = time % UnitPerDay; |
|
auto last_day_nanos = last_day_units * NanosecondsPerUnit; |
|
|
|
|
|
std::memcpy(impala_timestamp, &last_day_nanos, sizeof(int64_t)); |
|
} |
|
|
|
constexpr int64_t kSecondsInNanos = INT64_C(1000000000); |
|
|
|
inline void SecondsToImpalaTimestamp(const int64_t seconds, Int96* impala_timestamp) { |
|
ArrowTimestampToImpalaTimestamp<kSecondsPerDay, kSecondsInNanos>(seconds, |
|
impala_timestamp); |
|
} |
|
|
|
constexpr int64_t kMillisecondsInNanos = kSecondsInNanos / INT64_C(1000); |
|
|
|
inline void MillisecondsToImpalaTimestamp(const int64_t milliseconds, |
|
Int96* impala_timestamp) { |
|
ArrowTimestampToImpalaTimestamp<kMillisecondsPerDay, kMillisecondsInNanos>( |
|
milliseconds, impala_timestamp); |
|
} |
|
|
|
constexpr int64_t kMicrosecondsInNanos = kMillisecondsInNanos / INT64_C(1000); |
|
|
|
inline void MicrosecondsToImpalaTimestamp(const int64_t microseconds, |
|
Int96* impala_timestamp) { |
|
ArrowTimestampToImpalaTimestamp<kMicrosecondsPerDay, kMicrosecondsInNanos>( |
|
microseconds, impala_timestamp); |
|
} |
|
|
|
constexpr int64_t kNanosecondsInNanos = INT64_C(1); |
|
|
|
inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds, |
|
Int96* impala_timestamp) { |
|
ArrowTimestampToImpalaTimestamp<kNanosecondsPerDay, kNanosecondsInNanos>( |
|
nanoseconds, impala_timestamp); |
|
} |
|
|
|
} |
|
} |
|
|