|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#pragma once |
|
|
|
#include <algorithm> |
|
#include <cstddef> |
|
#include <cstdint> |
|
#include <memory> |
|
#include <string_view> |
|
#include <vector> |
|
|
|
#include "arrow/buffer.h" |
|
#include "arrow/csv/options.h" |
|
#include "arrow/csv/type_fwd.h" |
|
#include "arrow/status.h" |
|
#include "arrow/util/macros.h" |
|
#include "arrow/util/visibility.h" |
|
|
|
namespace arrow { |
|
|
|
class MemoryPool; |
|
|
|
namespace csv { |
|
|
|
|
|
|
|
|
|
ARROW_EXPORT |
|
int32_t SkipRows(const uint8_t* data, uint32_t size, int32_t num_rows, |
|
const uint8_t** out_data); |
|
|
|
class BlockParserImpl; |
|
|
|
namespace detail { |
|
|
|
struct ParsedValueDesc { |
|
uint32_t offset : 31; |
|
bool quoted : 1; |
|
}; |
|
|
|
class ARROW_EXPORT DataBatch { |
|
public: |
|
explicit DataBatch(int32_t num_cols) : num_cols_(num_cols) {} |
|
|
|
|
|
int32_t num_rows() const { return num_rows_; } |
|
|
|
int32_t num_cols() const { return num_cols_; } |
|
|
|
uint32_t num_bytes() const { return parsed_size_; } |
|
|
|
int32_t num_skipped_rows() const { return static_cast<int32_t>(skipped_rows_.size()); } |
|
|
|
template <typename Visitor> |
|
Status VisitColumn(int32_t col_index, int64_t first_row, Visitor&& visit) const { |
|
using detail::ParsedValueDesc; |
|
|
|
int32_t batch_row = 0; |
|
for (size_t buf_index = 0; buf_index < values_buffers_.size(); ++buf_index) { |
|
const auto& values_buffer = values_buffers_[buf_index]; |
|
const auto values = reinterpret_cast<const ParsedValueDesc*>(values_buffer->data()); |
|
const auto max_pos = |
|
static_cast<int32_t>(values_buffer->size() / sizeof(ParsedValueDesc)) - 1; |
|
for (int32_t pos = col_index; pos < max_pos; pos += num_cols_, ++batch_row) { |
|
auto start = values[pos].offset; |
|
auto stop = values[pos + 1].offset; |
|
auto quoted = values[pos + 1].quoted; |
|
Status status = visit(parsed_ + start, stop - start, quoted); |
|
if (ARROW_PREDICT_FALSE(!status.ok())) { |
|
return DecorateWithRowNumber(std::move(status), first_row, batch_row); |
|
} |
|
} |
|
} |
|
return Status::OK(); |
|
} |
|
|
|
template <typename Visitor> |
|
Status VisitLastRow(Visitor&& visit) const { |
|
using detail::ParsedValueDesc; |
|
|
|
const auto& values_buffer = values_buffers_.back(); |
|
const auto values = reinterpret_cast<const ParsedValueDesc*>(values_buffer->data()); |
|
const auto start_pos = |
|
static_cast<int32_t>(values_buffer->size() / sizeof(ParsedValueDesc)) - |
|
num_cols_ - 1; |
|
for (int32_t col_index = 0; col_index < num_cols_; ++col_index) { |
|
auto start = values[start_pos + col_index].offset; |
|
auto stop = values[start_pos + col_index + 1].offset; |
|
auto quoted = values[start_pos + col_index + 1].quoted; |
|
ARROW_RETURN_NOT_OK(visit(parsed_ + start, stop - start, quoted)); |
|
} |
|
return Status::OK(); |
|
} |
|
|
|
protected: |
|
Status DecorateWithRowNumber(Status&& status, int64_t first_row, |
|
int32_t batch_row) const { |
|
if (first_row >= 0) { |
|
|
|
|
|
const auto skips_before = |
|
std::upper_bound(skipped_rows_.begin(), skipped_rows_.end(), batch_row) - |
|
skipped_rows_.begin(); |
|
status = status.WithMessage("Row #", batch_row + skips_before + first_row, ": ", |
|
status.message()); |
|
} |
|
|
|
ARROW_RETURN_IF_(true, std::move(status), ARROW_STRINGIFY(status)); |
|
return std::move(status); |
|
} |
|
|
|
|
|
int32_t num_rows_ = 0; |
|
|
|
int32_t num_cols_ = 0; |
|
|
|
|
|
|
|
std::vector<std::shared_ptr<Buffer>> values_buffers_; |
|
std::shared_ptr<Buffer> parsed_buffer_; |
|
const uint8_t* parsed_ = NULLPTR; |
|
int32_t parsed_size_ = 0; |
|
|
|
|
|
std::vector<int32_t> skipped_rows_; |
|
|
|
friend class ::arrow::csv::BlockParserImpl; |
|
}; |
|
|
|
} |
|
|
|
constexpr int32_t kMaxParserNumRows = 100000; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ARROW_EXPORT BlockParser { |
|
public: |
|
explicit BlockParser(ParseOptions options, int32_t num_cols = -1, |
|
int64_t first_row = -1, int32_t max_num_rows = kMaxParserNumRows); |
|
explicit BlockParser(MemoryPool* pool, ParseOptions options, int32_t num_cols = -1, |
|
int64_t first_row = -1, int32_t max_num_rows = kMaxParserNumRows); |
|
~BlockParser(); |
|
|
|
|
|
|
|
|
|
|
|
Status Parse(std::string_view data, uint32_t* out_size); |
|
|
|
|
|
|
|
|
|
Status Parse(const std::vector<std::string_view>& data, uint32_t* out_size); |
|
|
|
|
|
|
|
|
|
|
|
Status ParseFinal(std::string_view data, uint32_t* out_size); |
|
|
|
|
|
|
|
|
|
Status ParseFinal(const std::vector<std::string_view>& data, uint32_t* out_size); |
|
|
|
|
|
int32_t num_rows() const { return parsed_batch().num_rows(); } |
|
|
|
int32_t num_cols() const { return parsed_batch().num_cols(); } |
|
|
|
uint32_t num_bytes() const { return parsed_batch().num_bytes(); } |
|
|
|
|
|
int32_t total_num_rows() const { |
|
return parsed_batch().num_rows() + parsed_batch().num_skipped_rows(); |
|
} |
|
|
|
|
|
int64_t first_row_num() const; |
|
|
|
|
|
|
|
|
|
|
|
template <typename Visitor> |
|
Status VisitColumn(int32_t col_index, Visitor&& visit) const { |
|
return parsed_batch().VisitColumn(col_index, first_row_num(), |
|
std::forward<Visitor>(visit)); |
|
} |
|
|
|
template <typename Visitor> |
|
Status VisitLastRow(Visitor&& visit) const { |
|
return parsed_batch().VisitLastRow(std::forward<Visitor>(visit)); |
|
} |
|
|
|
protected: |
|
std::unique_ptr<BlockParserImpl> impl_; |
|
|
|
const detail::DataBatch& parsed_batch() const; |
|
}; |
|
|
|
} |
|
} |
|
|