File size: 7,505 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <array>
#include <chrono>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <vector>
#include "parquet/column_writer.h"
#include "parquet/file_writer.h"
namespace parquet {
/// \brief A class for writing Parquet files using an output stream type API.
///
/// The values given must be of the correct type i.e. the type must
/// match the file schema exactly otherwise a ParquetException will be
/// thrown.
///
/// The user must explicitly indicate the end of the row using the
/// EndRow() function or EndRow output manipulator.
///
/// A maximum row group size can be configured, the default size is
/// 512MB. Alternatively the row group size can be set to zero and the
/// user can create new row groups by calling the EndRowGroup()
/// function or using the EndRowGroup output manipulator.
///
/// Required and optional fields are supported:
/// - Required fields are written using operator<<(T)
/// - Optional fields are written using
/// operator<<(std::optional<T>).
///
/// Note that operator<<(T) can be used to write optional fields.
///
/// Similarly, operator<<(std::optional<T>) can be used to
/// write required fields. However if the optional parameter does not
/// have a value (i.e. it is nullopt) then a ParquetException will be
/// raised.
///
/// Currently there is no support for repeated fields.
///
class PARQUET_EXPORT StreamWriter {
public:
template <typename T>
using optional = ::std::optional<T>;
// N.B. Default constructed objects are not usable. This
// constructor is provided so that the object may be move
// assigned afterwards.
StreamWriter() = default;
explicit StreamWriter(std::unique_ptr<ParquetFileWriter> writer);
~StreamWriter() = default;
static void SetDefaultMaxRowGroupSize(int64_t max_size);
void SetMaxRowGroupSize(int64_t max_size);
int current_column() const { return column_index_; }
int64_t current_row() const { return current_row_; }
int num_columns() const;
// Moving is possible.
StreamWriter(StreamWriter&&) = default;
StreamWriter& operator=(StreamWriter&&) = default;
// Copying is not allowed.
StreamWriter(const StreamWriter&) = delete;
StreamWriter& operator=(const StreamWriter&) = delete;
/// \brief Output operators for required fields.
/// These can also be used for optional fields when a value must be set.
StreamWriter& operator<<(bool v);
StreamWriter& operator<<(int8_t v);
StreamWriter& operator<<(uint8_t v);
StreamWriter& operator<<(int16_t v);
StreamWriter& operator<<(uint16_t v);
StreamWriter& operator<<(int32_t v);
StreamWriter& operator<<(uint32_t v);
StreamWriter& operator<<(int64_t v);
StreamWriter& operator<<(uint64_t v);
StreamWriter& operator<<(const std::chrono::milliseconds& v);
StreamWriter& operator<<(const std::chrono::microseconds& v);
StreamWriter& operator<<(float v);
StreamWriter& operator<<(double v);
StreamWriter& operator<<(char v);
/// \brief Helper class to write fixed length strings.
/// This is useful as the standard string view (such as
/// std::string_view) is for variable length data.
struct PARQUET_EXPORT FixedStringView {
FixedStringView() = default;
explicit FixedStringView(const char* data_ptr);
FixedStringView(const char* data_ptr, std::size_t data_len);
const char* data{NULLPTR};
std::size_t size{0};
};
/// \brief Output operators for fixed length strings.
template <int N>
StreamWriter& operator<<(const char (&v)[N]) {
return WriteFixedLength(v, N);
}
template <std::size_t N>
StreamWriter& operator<<(const std::array<char, N>& v) {
return WriteFixedLength(v.data(), N);
}
StreamWriter& operator<<(FixedStringView v);
/// \brief Output operators for variable length strings.
StreamWriter& operator<<(const char* v);
StreamWriter& operator<<(const std::string& v);
StreamWriter& operator<<(::std::string_view v);
/// \brief Output operator for optional fields.
template <typename T>
StreamWriter& operator<<(const optional<T>& v) {
if (v) {
return operator<<(*v);
}
SkipOptionalColumn();
return *this;
}
/// \brief Skip the next N columns of optional data. If there are
/// less than N columns remaining then the excess columns are
/// ignored.
/// \throws ParquetException if there is an attempt to skip any
/// required column.
/// \return Number of columns actually skipped.
int64_t SkipColumns(int num_columns_to_skip);
/// \brief Terminate the current row and advance to next one.
/// \throws ParquetException if all columns in the row were not
/// written or skipped.
void EndRow();
/// \brief Terminate the current row group and create new one.
void EndRowGroup();
protected:
template <typename WriterType, typename T>
StreamWriter& Write(const T v) {
auto writer = static_cast<WriterType*>(row_group_writer_->column(column_index_++));
writer->WriteBatch(kBatchSizeOne, &kDefLevelOne, &kRepLevelZero, &v);
if (max_row_group_size_ > 0) {
row_group_size_ += writer->estimated_buffered_value_bytes();
}
return *this;
}
StreamWriter& WriteVariableLength(const char* data_ptr, std::size_t data_len);
StreamWriter& WriteFixedLength(const char* data_ptr, std::size_t data_len);
void CheckColumn(Type::type physical_type, ConvertedType::type converted_type,
int length = -1);
/// \brief Skip the next column which must be optional.
/// \throws ParquetException if the next column does not exist or is
/// not optional.
void SkipOptionalColumn();
void WriteNullValue(ColumnWriter* writer);
private:
using node_ptr_type = std::shared_ptr<schema::PrimitiveNode>;
struct null_deleter {
void operator()(void*) {}
};
int32_t column_index_{0};
int64_t current_row_{0};
int64_t row_group_size_{0};
int64_t max_row_group_size_{default_row_group_size_};
std::unique_ptr<ParquetFileWriter> file_writer_;
std::unique_ptr<RowGroupWriter, null_deleter> row_group_writer_;
std::vector<node_ptr_type> nodes_;
static constexpr int16_t kDefLevelZero = 0;
static constexpr int16_t kDefLevelOne = 1;
static constexpr int16_t kRepLevelZero = 0;
static constexpr int64_t kBatchSizeOne = 1;
static int64_t default_row_group_size_;
};
struct PARQUET_EXPORT EndRowType {};
constexpr EndRowType EndRow = {};
struct PARQUET_EXPORT EndRowGroupType {};
constexpr EndRowGroupType EndRowGroup = {};
PARQUET_EXPORT
StreamWriter& operator<<(StreamWriter&, EndRowType);
PARQUET_EXPORT
StreamWriter& operator<<(StreamWriter&, EndRowGroupType);
} // namespace parquet
|