|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#pragma once |
|
|
|
#include <atomic> |
|
#include <cstdint> |
|
#include <optional> |
|
#include <thread> |
|
#include <unordered_map> |
|
#include <vector> |
|
|
|
#include "arrow/acero/options.h" |
|
#include "arrow/acero/type_fwd.h" |
|
#include "arrow/buffer.h" |
|
#include "arrow/compute/expression.h" |
|
#include "arrow/compute/util.h" |
|
#include "arrow/memory_pool.h" |
|
#include "arrow/result.h" |
|
#include "arrow/status.h" |
|
#include "arrow/util/bit_util.h" |
|
#include "arrow/util/cpu_info.h" |
|
#include "arrow/util/logging.h" |
|
#include "arrow/util/mutex.h" |
|
#include "arrow/util/thread_pool.h" |
|
#include "arrow/util/type_fwd.h" |
|
|
|
namespace arrow { |
|
|
|
namespace acero { |
|
|
|
ARROW_ACERO_EXPORT |
|
Status ValidateExecNodeInputs(ExecPlan* plan, const std::vector<ExecNode*>& inputs, |
|
int expected_num_inputs, const char* kind_name); |
|
|
|
ARROW_ACERO_EXPORT |
|
Result<std::shared_ptr<Table>> TableFromExecBatches( |
|
const std::shared_ptr<Schema>& schema, const std::vector<ExecBatch>& exec_batches); |
|
|
|
class ARROW_ACERO_EXPORT AtomicCounter { |
|
public: |
|
AtomicCounter() = default; |
|
|
|
int count() const { return count_.load(); } |
|
|
|
std::optional<int> total() const { |
|
int total = total_.load(); |
|
if (total == -1) return {}; |
|
return total; |
|
} |
|
|
|
|
|
bool Increment() { |
|
ARROW_DCHECK_NE(count_.load(), total_.load()); |
|
int count = count_.fetch_add(1) + 1; |
|
if (count != total_.load()) return false; |
|
return DoneOnce(); |
|
} |
|
|
|
|
|
bool SetTotal(int total) { |
|
total_.store(total); |
|
if (count_.load() != total) return false; |
|
return DoneOnce(); |
|
} |
|
|
|
|
|
bool Cancel() { return DoneOnce(); } |
|
|
|
|
|
bool Completed() { return complete_.load(); } |
|
|
|
private: |
|
|
|
bool DoneOnce() { |
|
bool expected = false; |
|
return complete_.compare_exchange_strong(expected, true); |
|
} |
|
|
|
std::atomic<int> count_{0}, total_{-1}; |
|
std::atomic<bool> complete_{false}; |
|
}; |
|
|
|
class ARROW_ACERO_EXPORT ThreadIndexer { |
|
public: |
|
size_t operator()(); |
|
|
|
static size_t Capacity(); |
|
|
|
private: |
|
static size_t Check(size_t thread_index); |
|
|
|
arrow::util::Mutex mutex_; |
|
std::unordered_map<std::thread::id, size_t> id_to_index_; |
|
}; |
|
|
|
|
|
struct ARROW_ACERO_EXPORT TableSinkNodeConsumer : public SinkNodeConsumer { |
|
public: |
|
TableSinkNodeConsumer(std::shared_ptr<Table>* out, MemoryPool* pool) |
|
: out_(out), pool_(pool) {} |
|
Status Init(const std::shared_ptr<Schema>& schema, |
|
BackpressureControl* backpressure_control, ExecPlan* plan) override; |
|
Status Consume(ExecBatch batch) override; |
|
Future<> Finish() override; |
|
|
|
private: |
|
std::shared_ptr<Table>* out_; |
|
MemoryPool* pool_; |
|
std::shared_ptr<Schema> schema_; |
|
std::vector<std::shared_ptr<RecordBatch>> batches_; |
|
arrow::util::Mutex consume_mutex_; |
|
}; |
|
|
|
class ARROW_ACERO_EXPORT NullSinkNodeConsumer : public SinkNodeConsumer { |
|
public: |
|
Status Init(const std::shared_ptr<Schema>&, BackpressureControl*, |
|
ExecPlan* plan) override { |
|
return Status::OK(); |
|
} |
|
Status Consume(ExecBatch exec_batch) override { return Status::OK(); } |
|
Future<> Finish() override { return Status::OK(); } |
|
|
|
public: |
|
static std::shared_ptr<NullSinkNodeConsumer> Make() { |
|
return std::make_shared<NullSinkNodeConsumer>(); |
|
} |
|
}; |
|
|
|
|
|
|
|
class ARROW_ACERO_EXPORT TracedNode { |
|
public: |
|
|
|
|
|
|
|
|
|
explicit TracedNode(ExecNode* node) : node_(node) {} |
|
|
|
|
|
[[nodiscard]] ::arrow::internal::tracing::Scope TraceStartProducing( |
|
std::string extra_details) const; |
|
|
|
|
|
void NoteStartProducing(std::string extra_details) const; |
|
|
|
|
|
|
|
|
|
|
|
|
|
[[nodiscard]] ::arrow::internal::tracing::Scope TraceInputReceived( |
|
const ExecBatch& batch) const; |
|
|
|
|
|
void NoteInputReceived(const ExecBatch& batch) const; |
|
|
|
|
|
|
|
|
|
|
|
|
|
[[nodiscard]] ::arrow::internal::tracing::Scope TraceFinish() const; |
|
|
|
private: |
|
ExecNode* node_; |
|
}; |
|
|
|
} |
|
} |
|
|