|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#pragma once |
|
|
|
#include <cstdint> |
|
#include <memory> |
|
#include <utility> |
|
|
|
#include "arrow/io/interfaces.h" |
|
#include "arrow/util/visibility.h" |
|
|
|
namespace arrow { |
|
|
|
class Buffer; |
|
class Status; |
|
|
|
namespace io { |
|
|
|
class ARROW_EXPORT LatencyGenerator { |
|
public: |
|
virtual ~LatencyGenerator(); |
|
|
|
void Sleep(); |
|
|
|
virtual double NextLatency() = 0; |
|
|
|
static std::shared_ptr<LatencyGenerator> Make(double average_latency); |
|
static std::shared_ptr<LatencyGenerator> Make(double average_latency, int32_t seed); |
|
}; |
|
|
|
|
|
|
|
template <class StreamType> |
|
class SlowInputStreamBase : public StreamType { |
|
public: |
|
SlowInputStreamBase(std::shared_ptr<StreamType> stream, |
|
std::shared_ptr<LatencyGenerator> latencies) |
|
: stream_(std::move(stream)), latencies_(std::move(latencies)) {} |
|
|
|
SlowInputStreamBase(std::shared_ptr<StreamType> stream, double average_latency) |
|
: stream_(std::move(stream)), latencies_(LatencyGenerator::Make(average_latency)) {} |
|
|
|
SlowInputStreamBase(std::shared_ptr<StreamType> stream, double average_latency, |
|
int32_t seed) |
|
: stream_(std::move(stream)), |
|
latencies_(LatencyGenerator::Make(average_latency, seed)) {} |
|
|
|
protected: |
|
std::shared_ptr<StreamType> stream_; |
|
std::shared_ptr<LatencyGenerator> latencies_; |
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ARROW_EXPORT SlowInputStream : public SlowInputStreamBase<InputStream> { |
|
public: |
|
~SlowInputStream() override; |
|
|
|
using SlowInputStreamBase<InputStream>::SlowInputStreamBase; |
|
|
|
Status Close() override; |
|
Status Abort() override; |
|
bool closed() const override; |
|
|
|
Result<int64_t> Read(int64_t nbytes, void* out) override; |
|
Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override; |
|
Result<std::string_view> Peek(int64_t nbytes) override; |
|
|
|
Result<int64_t> Tell() const override; |
|
}; |
|
|
|
|
|
|
|
|
|
class ARROW_EXPORT SlowRandomAccessFile : public SlowInputStreamBase<RandomAccessFile> { |
|
public: |
|
~SlowRandomAccessFile() override; |
|
|
|
using SlowInputStreamBase<RandomAccessFile>::SlowInputStreamBase; |
|
|
|
Status Close() override; |
|
Status Abort() override; |
|
bool closed() const override; |
|
|
|
Result<int64_t> Read(int64_t nbytes, void* out) override; |
|
Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override; |
|
Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override; |
|
Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) override; |
|
Result<std::string_view> Peek(int64_t nbytes) override; |
|
|
|
Result<int64_t> GetSize() override; |
|
Status Seek(int64_t position) override; |
|
Result<int64_t> Tell() const override; |
|
}; |
|
|
|
} |
|
} |
|
|