|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#pragma once |
|
|
|
#include <memory> |
|
|
|
#include "arrow/io/interfaces.h" |
|
#include "arrow/result.h" |
|
#include "arrow/status.h" |
|
#include "arrow/util/checked_cast.h" |
|
#include "arrow/util/macros.h" |
|
#include "arrow/util/visibility.h" |
|
|
|
namespace arrow { |
|
namespace io { |
|
namespace internal { |
|
|
|
template <class LockType> |
|
class SharedLockGuard { |
|
public: |
|
explicit SharedLockGuard(LockType* lock) : lock_(lock) { lock_->LockShared(); } |
|
|
|
~SharedLockGuard() { lock_->UnlockShared(); } |
|
|
|
protected: |
|
LockType* lock_; |
|
}; |
|
|
|
template <class LockType> |
|
class ExclusiveLockGuard { |
|
public: |
|
explicit ExclusiveLockGuard(LockType* lock) : lock_(lock) { lock_->LockExclusive(); } |
|
|
|
~ExclusiveLockGuard() { lock_->UnlockExclusive(); } |
|
|
|
protected: |
|
LockType* lock_; |
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ARROW_EXPORT SharedExclusiveChecker { |
|
public: |
|
SharedExclusiveChecker(); |
|
void LockShared(); |
|
void UnlockShared(); |
|
void LockExclusive(); |
|
void UnlockExclusive(); |
|
|
|
SharedLockGuard<SharedExclusiveChecker> shared_guard() { |
|
return SharedLockGuard<SharedExclusiveChecker>(this); |
|
} |
|
|
|
ExclusiveLockGuard<SharedExclusiveChecker> exclusive_guard() { |
|
return ExclusiveLockGuard<SharedExclusiveChecker>(this); |
|
} |
|
|
|
protected: |
|
struct Impl; |
|
std::shared_ptr<Impl> impl_; |
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template <class Derived> |
|
class InputStreamConcurrencyWrapper : public InputStream { |
|
public: |
|
Status Close() final { |
|
auto guard = lock_.exclusive_guard(); |
|
return derived()->DoClose(); |
|
} |
|
|
|
Status Abort() final { |
|
auto guard = lock_.exclusive_guard(); |
|
return derived()->DoAbort(); |
|
} |
|
|
|
Result<int64_t> Tell() const final { |
|
auto guard = lock_.exclusive_guard(); |
|
return derived()->DoTell(); |
|
} |
|
|
|
Result<int64_t> Read(int64_t nbytes, void* out) final { |
|
auto guard = lock_.exclusive_guard(); |
|
return derived()->DoRead(nbytes, out); |
|
} |
|
|
|
Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) final { |
|
auto guard = lock_.exclusive_guard(); |
|
return derived()->DoRead(nbytes); |
|
} |
|
|
|
Result<std::string_view> Peek(int64_t nbytes) final { |
|
auto guard = lock_.exclusive_guard(); |
|
return derived()->DoPeek(nbytes); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
protected: |
|
|
|
|
|
virtual Status DoAbort() { return derived()->DoClose(); } |
|
|
|
virtual Result<std::string_view> DoPeek(int64_t ARROW_ARG_UNUSED(nbytes)) { |
|
return Status::NotImplemented("Peek not implemented"); |
|
} |
|
|
|
Derived* derived() { return ::arrow::internal::checked_cast<Derived*>(this); } |
|
|
|
const Derived* derived() const { |
|
return ::arrow::internal::checked_cast<const Derived*>(this); |
|
} |
|
|
|
mutable SharedExclusiveChecker lock_; |
|
}; |
|
|
|
template <class Derived> |
|
class RandomAccessFileConcurrencyWrapper : public RandomAccessFile { |
|
public: |
|
Status Close() final { |
|
auto guard = lock_.exclusive_guard(); |
|
return derived()->DoClose(); |
|
} |
|
|
|
Status Abort() final { |
|
auto guard = lock_.exclusive_guard(); |
|
return derived()->DoAbort(); |
|
} |
|
|
|
Result<int64_t> Tell() const final { |
|
auto guard = lock_.exclusive_guard(); |
|
return derived()->DoTell(); |
|
} |
|
|
|
Result<int64_t> Read(int64_t nbytes, void* out) final { |
|
auto guard = lock_.exclusive_guard(); |
|
return derived()->DoRead(nbytes, out); |
|
} |
|
|
|
Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) final { |
|
auto guard = lock_.exclusive_guard(); |
|
return derived()->DoRead(nbytes); |
|
} |
|
|
|
Result<std::string_view> Peek(int64_t nbytes) final { |
|
auto guard = lock_.exclusive_guard(); |
|
return derived()->DoPeek(nbytes); |
|
} |
|
|
|
Status Seek(int64_t position) final { |
|
auto guard = lock_.exclusive_guard(); |
|
return derived()->DoSeek(position); |
|
} |
|
|
|
Result<int64_t> GetSize() final { |
|
auto guard = lock_.shared_guard(); |
|
return derived()->DoGetSize(); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) final { |
|
auto guard = lock_.shared_guard(); |
|
return derived()->DoReadAt(position, nbytes, out); |
|
} |
|
|
|
Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes) final { |
|
auto guard = lock_.shared_guard(); |
|
return derived()->DoReadAt(position, nbytes); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
protected: |
|
|
|
|
|
virtual Status DoAbort() { return derived()->DoClose(); } |
|
|
|
virtual Result<std::string_view> DoPeek(int64_t ARROW_ARG_UNUSED(nbytes)) { |
|
return Status::NotImplemented("Peek not implemented"); |
|
} |
|
|
|
Derived* derived() { return ::arrow::internal::checked_cast<Derived*>(this); } |
|
|
|
const Derived* derived() const { |
|
return ::arrow::internal::checked_cast<const Derived*>(this); |
|
} |
|
|
|
mutable SharedExclusiveChecker lock_; |
|
}; |
|
|
|
} |
|
} |
|
} |
|
|