|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#pragma once |
|
|
|
#include <atomic> |
|
#include <cmath> |
|
#include <functional> |
|
#include <memory> |
|
#include <optional> |
|
#include <type_traits> |
|
#include <utility> |
|
#include <vector> |
|
|
|
#include "arrow/result.h" |
|
#include "arrow/status.h" |
|
#include "arrow/type_fwd.h" |
|
#include "arrow/type_traits.h" |
|
#include "arrow/util/config.h" |
|
#include "arrow/util/functional.h" |
|
#include "arrow/util/macros.h" |
|
#include "arrow/util/tracing.h" |
|
#include "arrow/util/type_fwd.h" |
|
#include "arrow/util/visibility.h" |
|
|
|
namespace arrow { |
|
|
|
template <typename> |
|
struct EnsureFuture; |
|
|
|
namespace detail { |
|
|
|
template <typename> |
|
struct is_future : std::false_type {}; |
|
|
|
template <typename T> |
|
struct is_future<Future<T>> : std::true_type {}; |
|
|
|
template <typename Signature, typename Enable = void> |
|
struct result_of; |
|
|
|
template <typename Fn, typename... A> |
|
struct result_of<Fn(A...), |
|
internal::void_t<decltype(std::declval<Fn>()(std::declval<A>()...))>> { |
|
using type = decltype(std::declval<Fn>()(std::declval<A>()...)); |
|
}; |
|
|
|
template <typename Signature> |
|
using result_of_t = typename result_of<Signature>::type; |
|
|
|
|
|
template <typename T> |
|
struct SyncType { |
|
using type = Result<T>; |
|
}; |
|
|
|
template <> |
|
struct SyncType<internal::Empty> { |
|
using type = Status; |
|
}; |
|
|
|
template <typename Fn> |
|
using first_arg_is_status = |
|
std::is_same<typename std::decay<internal::call_traits::argument_type<0, Fn>>::type, |
|
Status>; |
|
|
|
template <typename Fn, typename Then, typename Else, |
|
typename Count = internal::call_traits::argument_count<Fn>> |
|
using if_has_no_args = typename std::conditional<Count::value == 0, Then, Else>::type; |
|
|
|
|
|
template <typename Source, typename Dest, bool SourceEmpty = Source::is_empty, |
|
bool DestEmpty = Dest::is_empty> |
|
struct MarkNextFinished {}; |
|
|
|
|
|
template <typename Source, typename Dest> |
|
struct MarkNextFinished<Source, Dest, true, true> { |
|
void operator()(const Status& status) && { next.MarkFinished(status); } |
|
Dest next; |
|
}; |
|
|
|
|
|
|
|
template <typename Source, typename Dest> |
|
struct MarkNextFinished<Source, Dest, false, true> { |
|
void operator()(const Result<typename Source::ValueType>& res) && { |
|
next.MarkFinished(internal::Empty::ToResult(res.status())); |
|
} |
|
Dest next; |
|
}; |
|
|
|
|
|
template <typename Source, typename Dest> |
|
struct MarkNextFinished<Source, Dest, false, false> { |
|
void operator()(const Result<typename Source::ValueType>& res) && { |
|
next.MarkFinished(res); |
|
} |
|
Dest next; |
|
}; |
|
|
|
|
|
struct ContinueFuture { |
|
template <typename Return> |
|
struct ForReturnImpl; |
|
|
|
template <typename Return> |
|
using ForReturn = typename ForReturnImpl<Return>::type; |
|
|
|
template <typename Signature> |
|
using ForSignature = ForReturn<result_of_t<Signature>>; |
|
|
|
|
|
template <typename ContinueFunc, typename... Args, |
|
typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>, |
|
typename NextFuture = ForReturn<ContinueResult>> |
|
typename std::enable_if<std::is_void<ContinueResult>::value>::type operator()( |
|
NextFuture next, ContinueFunc&& f, Args&&... a) const { |
|
std::forward<ContinueFunc>(f)(std::forward<Args>(a)...); |
|
next.MarkFinished(); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template <typename ContinueFunc, typename... Args, |
|
typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>, |
|
typename NextFuture = ForReturn<ContinueResult>> |
|
typename std::enable_if< |
|
!std::is_void<ContinueResult>::value && !is_future<ContinueResult>::value && |
|
(!NextFuture::is_empty || std::is_same<ContinueResult, Status>::value)>::type |
|
operator()(NextFuture next, ContinueFunc&& f, Args&&... a) const { |
|
next.MarkFinished(std::forward<ContinueFunc>(f)(std::forward<Args>(a)...)); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template <typename ContinueFunc, typename... Args, |
|
typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>, |
|
typename NextFuture = ForReturn<ContinueResult>> |
|
typename std::enable_if<!std::is_void<ContinueResult>::value && |
|
!is_future<ContinueResult>::value && NextFuture::is_empty && |
|
!std::is_same<ContinueResult, Status>::value>::type |
|
operator()(NextFuture next, ContinueFunc&& f, Args&&... a) const { |
|
next.MarkFinished(std::forward<ContinueFunc>(f)(std::forward<Args>(a)...).status()); |
|
} |
|
|
|
|
|
|
|
|
|
template <typename ContinueFunc, typename... Args, |
|
typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>, |
|
typename NextFuture = ForReturn<ContinueResult>> |
|
typename std::enable_if<is_future<ContinueResult>::value>::type operator()( |
|
NextFuture next, ContinueFunc&& f, Args&&... a) const { |
|
ContinueResult signal_to_complete_next = |
|
std::forward<ContinueFunc>(f)(std::forward<Args>(a)...); |
|
MarkNextFinished<ContinueResult, NextFuture> callback{std::move(next)}; |
|
signal_to_complete_next.AddCallback(std::move(callback)); |
|
} |
|
|
|
|
|
template <typename ContinueFunc, typename NextFuture, typename... Args> |
|
void IgnoringArgsIf(std::true_type, NextFuture&& next, ContinueFunc&& f, |
|
Args&&...) const { |
|
operator()(std::forward<NextFuture>(next), std::forward<ContinueFunc>(f)); |
|
} |
|
template <typename ContinueFunc, typename NextFuture, typename... Args> |
|
void IgnoringArgsIf(std::false_type, NextFuture&& next, ContinueFunc&& f, |
|
Args&&... a) const { |
|
operator()(std::forward<NextFuture>(next), std::forward<ContinueFunc>(f), |
|
std::forward<Args>(a)...); |
|
} |
|
}; |
|
|
|
|
|
|
|
template <> |
|
struct ContinueFuture::ForReturnImpl<void> { |
|
using type = Future<>; |
|
}; |
|
|
|
template <> |
|
struct ContinueFuture::ForReturnImpl<Status> { |
|
using type = Future<>; |
|
}; |
|
|
|
template <typename R> |
|
struct ContinueFuture::ForReturnImpl { |
|
using type = Future<R>; |
|
}; |
|
|
|
template <typename T> |
|
struct ContinueFuture::ForReturnImpl<Result<T>> { |
|
using type = Future<T>; |
|
}; |
|
|
|
template <typename T> |
|
struct ContinueFuture::ForReturnImpl<Future<T>> { |
|
using type = Future<T>; |
|
}; |
|
|
|
} |
|
|
|
|
|
enum class FutureState : int8_t { PENDING, SUCCESS, FAILURE }; |
|
|
|
inline bool IsFutureFinished(FutureState state) { return state != FutureState::PENDING; } |
|
|
|
|
|
enum class ShouldSchedule { |
|
|
|
Never = 0, |
|
|
|
|
|
IfUnfinished = 1, |
|
|
|
Always = 2, |
|
|
|
|
|
IfDifferentExecutor = 3, |
|
}; |
|
|
|
|
|
struct CallbackOptions { |
|
|
|
ShouldSchedule should_schedule = ShouldSchedule::Never; |
|
|
|
|
|
internal::Executor* executor = NULLPTR; |
|
|
|
static CallbackOptions Defaults() { return {}; } |
|
}; |
|
|
|
|
|
class ARROW_EXPORT FutureImpl : public std::enable_shared_from_this<FutureImpl> { |
|
public: |
|
FutureImpl(); |
|
virtual ~FutureImpl() = default; |
|
|
|
FutureState state() { return state_.load(); } |
|
|
|
static std::unique_ptr<FutureImpl> Make(); |
|
static std::unique_ptr<FutureImpl> MakeFinished(FutureState state); |
|
|
|
#ifdef ARROW_WITH_OPENTELEMETRY |
|
void SetSpan(util::tracing::Span* span) { span_ = span; } |
|
#endif |
|
|
|
|
|
void MarkFinished(); |
|
void MarkFailed(); |
|
void Wait(); |
|
bool Wait(double seconds); |
|
template <typename ValueType> |
|
Result<ValueType>* CastResult() const { |
|
return static_cast<Result<ValueType>*>(result_.get()); |
|
} |
|
|
|
using Callback = internal::FnOnce<void(const FutureImpl& impl)>; |
|
void AddCallback(Callback callback, CallbackOptions opts); |
|
bool TryAddCallback(const std::function<Callback()>& callback_factory, |
|
CallbackOptions opts); |
|
|
|
std::atomic<FutureState> state_{FutureState::PENDING}; |
|
|
|
|
|
|
|
using Storage = std::unique_ptr<void, void (*)(void*)>; |
|
Storage result_{NULLPTR, NULLPTR}; |
|
|
|
struct CallbackRecord { |
|
Callback callback; |
|
CallbackOptions options; |
|
}; |
|
std::vector<CallbackRecord> callbacks_; |
|
#ifdef ARROW_WITH_OPENTELEMETRY |
|
util::tracing::Span* span_ = NULLPTR; |
|
#endif |
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template <typename T> |
|
class [[nodiscard]] Future { |
|
public: |
|
using ValueType = T; |
|
using SyncType = typename detail::SyncType<T>::type; |
|
static constexpr bool is_empty = std::is_same<T, internal::Empty>::value; |
|
|
|
|
|
|
|
Future() = default; |
|
|
|
#ifdef ARROW_WITH_OPENTELEMETRY |
|
void SetSpan(util::tracing::Span* span) { impl_->SetSpan(span); } |
|
#endif |
|
|
|
|
|
|
|
bool is_valid() const { return impl_ != NULLPTR; } |
|
|
|
|
|
|
|
|
|
|
|
FutureState state() const { |
|
CheckValid(); |
|
return impl_->state(); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
bool is_finished() const { |
|
CheckValid(); |
|
return IsFutureFinished(impl_->state()); |
|
} |
|
|
|
|
|
const Result<ValueType>& result() const& { |
|
Wait(); |
|
return *GetResult(); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Result<ValueType>&& MoveResult() { |
|
Wait(); |
|
return std::move(*GetResult()); |
|
} |
|
|
|
|
|
const Status& status() const { return result().status(); } |
|
|
|
|
|
|
|
explicit operator Future<>() const { |
|
Future<> status_future; |
|
status_future.impl_ = impl_; |
|
return status_future; |
|
} |
|
|
|
|
|
void Wait() const { |
|
CheckValid(); |
|
impl_->Wait(); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
bool Wait(double seconds) const { |
|
CheckValid(); |
|
return impl_->Wait(seconds); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
void MarkFinished(Result<ValueType> res) { DoMarkFinished(std::move(res)); } |
|
|
|
|
|
template <typename E = ValueType, typename = typename std::enable_if< |
|
std::is_same<E, internal::Empty>::value>::type> |
|
void MarkFinished(Status s = Status::OK()) { |
|
return DoMarkFinished(E::ToResult(std::move(s))); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static Future Make() { |
|
Future fut; |
|
fut.impl_ = FutureImpl::Make(); |
|
return fut; |
|
} |
|
|
|
|
|
static Future<ValueType> MakeFinished(Result<ValueType> res) { |
|
Future<ValueType> fut; |
|
fut.InitializeFromResult(std::move(res)); |
|
return fut; |
|
} |
|
|
|
|
|
template <typename E = ValueType, typename = typename std::enable_if< |
|
std::is_same<E, internal::Empty>::value>::type> |
|
static Future<> MakeFinished(Status s = Status::OK()) { |
|
return MakeFinished(E::ToResult(std::move(s))); |
|
} |
|
|
|
struct WrapResultOnComplete { |
|
template <typename OnComplete> |
|
struct Callback { |
|
void operator()(const FutureImpl& impl) && { |
|
std::move(on_complete)(*impl.CastResult<ValueType>()); |
|
} |
|
OnComplete on_complete; |
|
}; |
|
}; |
|
|
|
struct WrapStatusyOnComplete { |
|
template <typename OnComplete> |
|
struct Callback { |
|
static_assert(std::is_same<internal::Empty, ValueType>::value, |
|
"Only callbacks for Future<> should accept Status and not Result"); |
|
|
|
void operator()(const FutureImpl& impl) && { |
|
std::move(on_complete)(impl.CastResult<ValueType>()->status()); |
|
} |
|
OnComplete on_complete; |
|
}; |
|
}; |
|
|
|
template <typename OnComplete> |
|
using WrapOnComplete = typename std::conditional< |
|
detail::first_arg_is_status<OnComplete>::value, WrapStatusyOnComplete, |
|
WrapResultOnComplete>::type::template Callback<OnComplete>; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template <typename OnComplete, typename Callback = WrapOnComplete<OnComplete>> |
|
void AddCallback(OnComplete on_complete, |
|
CallbackOptions opts = CallbackOptions::Defaults()) const { |
|
|
|
|
|
|
|
impl_->AddCallback(Callback{std::move(on_complete)}, opts); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template <typename CallbackFactory, |
|
typename OnComplete = detail::result_of_t<CallbackFactory()>, |
|
typename Callback = WrapOnComplete<OnComplete>> |
|
bool TryAddCallback(CallbackFactory callback_factory, |
|
CallbackOptions opts = CallbackOptions::Defaults()) const { |
|
return impl_->TryAddCallback([&]() { return Callback{callback_factory()}; }, opts); |
|
} |
|
|
|
template <typename OnSuccess, typename OnFailure> |
|
struct ThenOnComplete { |
|
static constexpr bool has_no_args = |
|
internal::call_traits::argument_count<OnSuccess>::value == 0; |
|
|
|
using ContinuedFuture = detail::ContinueFuture::ForSignature< |
|
detail::if_has_no_args<OnSuccess, OnSuccess && (), OnSuccess && (const T&)>>; |
|
|
|
static_assert( |
|
std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const Status&)>, |
|
ContinuedFuture>::value, |
|
"OnSuccess and OnFailure must continue with the same future type"); |
|
|
|
struct DummyOnSuccess { |
|
void operator()(const T&); |
|
}; |
|
using OnSuccessArg = typename std::decay<internal::call_traits::argument_type< |
|
0, detail::if_has_no_args<OnSuccess, DummyOnSuccess, OnSuccess>>>::type; |
|
|
|
static_assert( |
|
!std::is_same<OnSuccessArg, typename EnsureResult<OnSuccessArg>::type>::value, |
|
"OnSuccess' argument should not be a Result"); |
|
|
|
void operator()(const Result<T>& result) && { |
|
detail::ContinueFuture continue_future; |
|
if (ARROW_PREDICT_TRUE(result.ok())) { |
|
|
|
ARROW_UNUSED(OnFailure(std::move(on_failure))); |
|
continue_future.IgnoringArgsIf( |
|
detail::if_has_no_args<OnSuccess, std::true_type, std::false_type>{}, |
|
std::move(next), std::move(on_success), result.ValueOrDie()); |
|
} else { |
|
ARROW_UNUSED(OnSuccess(std::move(on_success))); |
|
continue_future(std::move(next), std::move(on_failure), result.status()); |
|
} |
|
} |
|
|
|
OnSuccess on_success; |
|
OnFailure on_failure; |
|
ContinuedFuture next; |
|
}; |
|
|
|
template <typename OnSuccess> |
|
struct PassthruOnFailure { |
|
using ContinuedFuture = detail::ContinueFuture::ForSignature< |
|
detail::if_has_no_args<OnSuccess, OnSuccess && (), OnSuccess && (const T&)>>; |
|
|
|
Result<typename ContinuedFuture::ValueType> operator()(const Status& s) { return s; } |
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template <typename OnSuccess, typename OnFailure = PassthruOnFailure<OnSuccess>, |
|
typename OnComplete = ThenOnComplete<OnSuccess, OnFailure>, |
|
typename ContinuedFuture = typename OnComplete::ContinuedFuture> |
|
ContinuedFuture Then(OnSuccess on_success, OnFailure on_failure = {}, |
|
CallbackOptions options = CallbackOptions::Defaults()) const { |
|
auto next = ContinuedFuture::Make(); |
|
AddCallback(OnComplete{std::forward<OnSuccess>(on_success), |
|
std::forward<OnFailure>(on_failure), next}, |
|
options); |
|
return next; |
|
} |
|
|
|
|
|
Future(ValueType val) : Future() { |
|
impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS); |
|
SetResult(std::move(val)); |
|
} |
|
|
|
|
|
|
|
Future(Result<ValueType> res) : Future() { |
|
if (ARROW_PREDICT_TRUE(res.ok())) { |
|
impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS); |
|
} else { |
|
impl_ = FutureImpl::MakeFinished(FutureState::FAILURE); |
|
} |
|
SetResult(std::move(res)); |
|
} |
|
|
|
|
|
|
|
Future(Status s) |
|
: Future(Result<ValueType>(std::move(s))) {} |
|
|
|
protected: |
|
void InitializeFromResult(Result<ValueType> res) { |
|
if (ARROW_PREDICT_TRUE(res.ok())) { |
|
impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS); |
|
} else { |
|
impl_ = FutureImpl::MakeFinished(FutureState::FAILURE); |
|
} |
|
SetResult(std::move(res)); |
|
} |
|
|
|
void Initialize() { impl_ = FutureImpl::Make(); } |
|
|
|
Result<ValueType>* GetResult() const { return impl_->CastResult<ValueType>(); } |
|
|
|
void SetResult(Result<ValueType> res) { |
|
impl_->result_ = {new Result<ValueType>(std::move(res)), |
|
[](void* p) { delete static_cast<Result<ValueType>*>(p); }}; |
|
} |
|
|
|
void DoMarkFinished(Result<ValueType> res) { |
|
SetResult(std::move(res)); |
|
|
|
if (ARROW_PREDICT_TRUE(GetResult()->ok())) { |
|
impl_->MarkFinished(); |
|
} else { |
|
impl_->MarkFailed(); |
|
} |
|
} |
|
|
|
void CheckValid() const { |
|
#ifndef NDEBUG |
|
if (!is_valid()) { |
|
Status::Invalid("Invalid Future (default-initialized?)").Abort(); |
|
} |
|
#endif |
|
} |
|
|
|
explicit Future(std::shared_ptr<FutureImpl> impl) : impl_(std::move(impl)) {} |
|
|
|
std::shared_ptr<FutureImpl> impl_; |
|
|
|
friend struct detail::ContinueFuture; |
|
|
|
template <typename U> |
|
friend class Future; |
|
friend class WeakFuture<T>; |
|
|
|
FRIEND_TEST(FutureRefTest, ChainRemoved); |
|
FRIEND_TEST(FutureRefTest, TailRemoved); |
|
FRIEND_TEST(FutureRefTest, HeadRemoved); |
|
}; |
|
|
|
template <typename T> |
|
typename Future<T>::SyncType FutureToSync(const Future<T>& fut) { |
|
return fut.result(); |
|
} |
|
|
|
template <> |
|
inline typename Future<internal::Empty>::SyncType FutureToSync<internal::Empty>( |
|
const Future<internal::Empty>& fut) { |
|
return fut.status(); |
|
} |
|
|
|
template <> |
|
inline Future<>::Future(Status s) : Future(internal::Empty::ToResult(std::move(s))) {} |
|
|
|
template <typename T> |
|
class WeakFuture { |
|
public: |
|
explicit WeakFuture(const Future<T>& future) : impl_(future.impl_) {} |
|
|
|
Future<T> get() { return Future<T>{impl_.lock()}; } |
|
|
|
private: |
|
std::weak_ptr<FutureImpl> impl_; |
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
template <typename T> |
|
static Future<T> DeferNotOk(Result<Future<T>> maybe_future) { |
|
if (ARROW_PREDICT_FALSE(!maybe_future.ok())) { |
|
return Future<T>::MakeFinished(std::move(maybe_future).status()); |
|
} |
|
return std::move(maybe_future).MoveValueUnsafe(); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
template <typename T> |
|
Future<std::vector<Result<T>>> All(std::vector<Future<T>> futures) { |
|
struct State { |
|
explicit State(std::vector<Future<T>> f) |
|
: futures(std::move(f)), n_remaining(futures.size()) {} |
|
|
|
std::vector<Future<T>> futures; |
|
std::atomic<size_t> n_remaining; |
|
}; |
|
|
|
if (futures.size() == 0) { |
|
return {std::vector<Result<T>>{}}; |
|
} |
|
|
|
auto state = std::make_shared<State>(std::move(futures)); |
|
|
|
auto out = Future<std::vector<Result<T>>>::Make(); |
|
for (const Future<T>& future : state->futures) { |
|
future.AddCallback([state, out](const Result<T>&) mutable { |
|
if (state->n_remaining.fetch_sub(1) != 1) return; |
|
|
|
std::vector<Result<T>> results(state->futures.size()); |
|
for (size_t i = 0; i < results.size(); ++i) { |
|
results[i] = state->futures[i].result(); |
|
} |
|
out.MarkFinished(std::move(results)); |
|
}); |
|
} |
|
return out; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
ARROW_EXPORT |
|
Future<> AllComplete(const std::vector<Future<>>& futures); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ARROW_EXPORT |
|
Future<> AllFinished(const std::vector<Future<>>& futures); |
|
|
|
|
|
|
|
struct Continue { |
|
template <typename T> |
|
operator std::optional<T>() && { |
|
return {}; |
|
} |
|
}; |
|
|
|
template <typename T = internal::Empty> |
|
std::optional<T> Break(T break_value = {}) { |
|
return std::optional<T>{std::move(break_value)}; |
|
} |
|
|
|
template <typename T = internal::Empty> |
|
using ControlFlow = std::optional<T>; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template <typename Iterate, |
|
typename Control = typename detail::result_of_t<Iterate()>::ValueType, |
|
typename BreakValueType = typename Control::value_type> |
|
Future<BreakValueType> Loop(Iterate iterate) { |
|
struct Callback { |
|
bool CheckForTermination(const Result<Control>& control_res) { |
|
if (!control_res.ok()) { |
|
break_fut.MarkFinished(control_res.status()); |
|
return true; |
|
} |
|
if (control_res->has_value()) { |
|
break_fut.MarkFinished(**control_res); |
|
return true; |
|
} |
|
return false; |
|
} |
|
|
|
void operator()(const Result<Control>& maybe_control) && { |
|
if (CheckForTermination(maybe_control)) return; |
|
|
|
auto control_fut = iterate(); |
|
while (true) { |
|
if (control_fut.TryAddCallback([this]() { return *this; })) { |
|
|
|
|
|
return; |
|
} |
|
|
|
|
|
|
|
if (CheckForTermination(control_fut.result())) return; |
|
|
|
control_fut = iterate(); |
|
} |
|
} |
|
|
|
Iterate iterate; |
|
|
|
|
|
|
|
|
|
|
|
|
|
Future<BreakValueType> break_fut; |
|
}; |
|
|
|
auto break_fut = Future<BreakValueType>::Make(); |
|
auto control_fut = iterate(); |
|
control_fut.AddCallback(Callback{std::move(iterate), break_fut}); |
|
|
|
return break_fut; |
|
} |
|
|
|
inline Future<> ToFuture(Status status) { |
|
return Future<>::MakeFinished(std::move(status)); |
|
} |
|
|
|
template <typename T> |
|
Future<T> ToFuture(T value) { |
|
return Future<T>::MakeFinished(std::move(value)); |
|
} |
|
|
|
template <typename T> |
|
Future<T> ToFuture(Result<T> maybe_value) { |
|
return Future<T>::MakeFinished(std::move(maybe_value)); |
|
} |
|
|
|
template <typename T> |
|
Future<T> ToFuture(Future<T> fut) { |
|
return fut; |
|
} |
|
|
|
template <typename T> |
|
struct EnsureFuture { |
|
using type = decltype(ToFuture(std::declval<T>())); |
|
}; |
|
|
|
} |
|
|