blob: 524f11c14311a6f248f4d93f82dc6cdaed39e50b [file] [log] [blame]
/*
* Copyright (c) 2021-2022 NVIDIA Corporation
*
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://llvm.org/LICENSE.txt
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "../stdexec/__detail/__intrusive_queue.hpp"
#include "../stdexec/execution.hpp"
#include "../stdexec/stop_token.hpp"
#include "env.hpp"
#include <mutex>
namespace exec
{
/////////////////////////////////////////////////////////////////////////////
// async_scope
namespace __scope
{
using namespace stdexec;
struct __impl;
struct async_scope;
struct __task : __immovable
{
const __impl* __scope_;
void (*__notify_waiter)(__task*) noexcept;
__task* __next_ = nullptr;
};
template <class _BaseEnv>
using __env_t =
make_env_t<_BaseEnv, with_t<get_stop_token_t, in_place_stop_token>>;
struct __impl
{
in_place_stop_source __stop_source_{};
mutable std::mutex __lock_{};
mutable std::ptrdiff_t __active_ = 0;
mutable __intrusive_queue<&__task::__next_> __waiters_{};
~__impl()
{
std::unique_lock __guard{__lock_};
STDEXEC_ASSERT(__active_ == 0);
STDEXEC_ASSERT(__waiters_.empty());
}
};
////////////////////////////////////////////////////////////////////////////
// async_scope::when_empty implementation
template <class _ConstrainedId, class _ReceiverId>
struct __when_empty_op
{
using _Constrained = __cvref_t<_ConstrainedId>;
using _Receiver = stdexec::__t<_ReceiverId>;
struct __t : __task
{
using __id = __when_empty_op;
explicit __t(const __impl* __scope, _Constrained&& __sndr,
_Receiver __rcvr) :
__task{{}, __scope, __notify_waiter},
__op_(stdexec::connect((_Constrained&&)__sndr, (_Receiver&&)__rcvr))
{}
private:
static void __notify_waiter(__task* __self) noexcept
{
start(static_cast<__t*>(__self)->__op_);
}
void __start_() noexcept
{
std::unique_lock __guard{this->__scope_->__lock_};
auto& __active = this->__scope_->__active_;
auto& __waiters = this->__scope_->__waiters_;
if (__active != 0)
{
__waiters.push_back(this);
return;
}
__guard.unlock();
start(this->__op_);
}
friend void tag_invoke(start_t, __t& __self) noexcept
{
return __self.__start_();
}
STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
connect_result_t<_Constrained, _Receiver> __op_;
};
};
template <class _ConstrainedId>
struct __when_empty_sender
{
using _Constrained = stdexec::__t<_ConstrainedId>;
struct __t
{
using __id = __when_empty_sender;
using sender_concept = stdexec::sender_t;
template <class _Self, class _Receiver>
using __when_empty_op_t =
stdexec::__t<__when_empty_op<__cvref_id<_Self, _Constrained>,
stdexec::__id<_Receiver>>>;
template <__decays_to<__t> _Self, receiver _Receiver>
requires sender_to<__copy_cvref_t<_Self, _Constrained>, _Receiver>
[[nodiscard]] friend __when_empty_op_t<_Self, _Receiver>
tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
{
return __when_empty_op_t<_Self, _Receiver>{
__self.__scope_, ((_Self&&)__self).__c_, (_Receiver&&)__rcvr};
}
template <__decays_to<__t> _Self, class _Env>
friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
-> completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
__env_t<_Env>>
{
return {};
}
friend empty_env tag_invoke(get_env_t, const __t&) noexcept
{
return {};
}
const __impl* __scope_;
STDEXEC_ATTRIBUTE((no_unique_address)) _Constrained __c_;
};
};
template <class _Constrained>
using __when_empty_sender_t =
stdexec::__t<__when_empty_sender<__id<__decay_t<_Constrained>>>>;
////////////////////////////////////////////////////////////////////////////
// async_scope::nest implementation
template <class _ReceiverId>
struct __nest_op_base : __immovable
{
using _Receiver = stdexec::__t<_ReceiverId>;
const __impl* __scope_;
STDEXEC_ATTRIBUTE((no_unique_address)) _Receiver __rcvr_;
};
template <class _ReceiverId>
struct __nest_rcvr
{
using _Receiver = stdexec::__t<_ReceiverId>;
struct __t
{
using __id = __nest_rcvr;
using receiver_concept = stdexec::receiver_t;
__nest_op_base<_ReceiverId>* __op_;
static void __complete(const __impl* __scope) noexcept
{
std::unique_lock __guard{__scope->__lock_};
auto& __active = __scope->__active_;
if (--__active == 0)
{
auto __local = std::move(__scope->__waiters_);
__guard.unlock();
__scope = nullptr;
// do not access __scope
while (!__local.empty())
{
auto* __next = __local.pop_front();
__next->__notify_waiter(__next);
// __scope must be considered deleted
}
}
}
template <__completion_tag _Tag, class... _As>
requires __callable<_Tag, _Receiver, _As...>
friend void tag_invoke(_Tag, __t&& __self, _As&&... __as) noexcept
{
auto __scope = __self.__op_->__scope_;
_Tag{}(std::move(__self.__op_->__rcvr_), (_As&&)__as...);
// do not access __op_
// do not access this
__complete(__scope);
}
friend __env_t<env_of_t<_Receiver>>
tag_invoke(get_env_t, const __t& __self) noexcept
{
return make_env(
get_env(__self.__op_->__rcvr_),
with(get_stop_token,
__self.__op_->__scope_->__stop_source_.get_token()));
}
};
};
template <class _ConstrainedId, class _ReceiverId>
struct __nest_op
{
using _Constrained = stdexec::__t<_ConstrainedId>;
using _Receiver = stdexec::__t<_ReceiverId>;
struct __t : __nest_op_base<_ReceiverId>
{
using __id = __nest_op;
using __nest_rcvr_t = stdexec::__t<__nest_rcvr<_ReceiverId>>;
STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
connect_result_t<_Constrained, __nest_rcvr_t> __op_;
template <__decays_to<_Constrained> _Sender,
__decays_to<_Receiver> _Rcvr>
explicit __t(const __impl* __scope, _Sender&& __c, _Rcvr&& __rcvr) :
__nest_op_base<_ReceiverId>{{}, __scope, (_Rcvr&&)__rcvr},
__op_(stdexec::connect((_Sender&&)__c, __nest_rcvr_t{this}))
{}
private:
void __start_() noexcept
{
STDEXEC_ASSERT(this->__scope_);
std::unique_lock __guard{this->__scope_->__lock_};
auto& __active = this->__scope_->__active_;
++__active;
__guard.unlock();
start(__op_);
}
friend void tag_invoke(start_t, __t& __self) noexcept
{
return __self.__start_();
}
};
};
template <class _ConstrainedId>
struct __nest_sender
{
using _Constrained = stdexec::__t<_ConstrainedId>;
struct __t
{
using __id = __nest_sender;
using sender_concept = stdexec::sender_t;
const __impl* __scope_;
STDEXEC_ATTRIBUTE((no_unique_address)) _Constrained __c_;
template <class _Receiver>
using __nest_operation_t =
stdexec::__t<__nest_op<_ConstrainedId, stdexec::__id<_Receiver>>>;
template <class _Receiver>
using __nest_receiver_t =
stdexec::__t<__nest_rcvr<stdexec::__id<_Receiver>>>;
template <__decays_to<__t> _Self, receiver _Receiver>
requires sender_to<__copy_cvref_t<_Self, _Constrained>,
__nest_receiver_t<_Receiver>>
[[nodiscard]] friend __nest_operation_t<_Receiver>
tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
{
return __nest_operation_t<_Receiver>{
__self.__scope_, ((_Self&&)__self).__c_, (_Receiver&&)__rcvr};
}
template <__decays_to<__t> _Self, class _Env>
friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
-> completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
__env_t<_Env>>
{
return {};
}
friend empty_env tag_invoke(get_env_t, const __t&) noexcept
{
return {};
}
};
};
template <class _Constrained>
using __nest_sender_t =
stdexec::__t<__nest_sender<__id<__decay_t<_Constrained>>>>;
////////////////////////////////////////////////////////////////////////////
// async_scope::spawn_future implementation
enum class __future_step
{
__invalid = 0,
__created,
__future,
__no_future,
__deleted
};
template <class _Sender, class _Env>
struct __future_state;
struct __forward_stopped
{
in_place_stop_source* __stop_source_;
void operator()() noexcept
{
__stop_source_->request_stop();
}
};
struct __subscription : __immovable
{
void (*__complete_)(__subscription*) noexcept = nullptr;
void __complete() noexcept
{
__complete_(this);
}
__subscription* __next_ = nullptr;
};
template <class _SenderId, class _EnvId, class _ReceiverId>
struct __future_op
{
using _Sender = stdexec::__t<_SenderId>;
using _Env = stdexec::__t<_EnvId>;
using _Receiver = stdexec::__t<_ReceiverId>;
class __t : __subscription
{
using __forward_consumer = typename stop_token_of_t<
env_of_t<_Receiver>>::template callback_type<__forward_stopped>;
friend void tag_invoke(start_t, __t& __self) noexcept
{
__self.__start_();
}
void __complete_() noexcept
{
try
{
auto __state = std::move(__state_);
STDEXEC_ASSERT(__state != nullptr);
std::unique_lock __guard{__state->__mutex_};
// either the future is still in use or it has passed ownership
// to __state->__no_future_
if (__state->__no_future_.get() != nullptr ||
__state->__step_ != __future_step::__future)
{
// invalid state - there is a code bug in the state machine
std::terminate();
}
else if (get_stop_token(get_env(__rcvr_)).stop_requested())
{
__guard.unlock();
set_stopped((_Receiver&&)__rcvr_);
__guard.lock();
}
else
{
std::visit(
[this, &__guard]<class _Tup>(_Tup& __tup) {
if constexpr (same_as<_Tup, std::monostate>)
{
std::terminate();
}
else
{
std::apply(
[this, &__guard]<class... _As>(auto tag,
_As&... __as) {
__guard.unlock();
tag((_Receiver&&)__rcvr_, (_As&&)__as...);
__guard.lock();
},
__tup);
}
},
__state->__data_);
}
}
catch (...)
{
set_error((_Receiver&&)__rcvr_, std::current_exception());
}
}
void __start_() noexcept
{
try
{
if (!!__state_)
{
std::unique_lock __guard{__state_->__mutex_};
if (__state_->__data_.index() != 0)
{
__guard.unlock();
__complete_();
}
else
{
__state_->__subscribers_.push_back(this);
}
}
}
catch (...)
{
set_error((_Receiver&&)__rcvr_, std::current_exception());
}
}
STDEXEC_ATTRIBUTE((no_unique_address)) _Receiver __rcvr_;
std::unique_ptr<__future_state<_Sender, _Env>> __state_;
STDEXEC_ATTRIBUTE((no_unique_address))
__forward_consumer __forward_consumer_;
public:
using __id = __future_op;
~__t() noexcept
{
if (__state_ != nullptr)
{
auto __raw_state = __state_.get();
std::unique_lock __guard{__raw_state->__mutex_};
if (__raw_state->__data_.index() > 0)
{
// completed given sender
// state is no longer needed
return;
}
__raw_state->__no_future_ = std::move(__state_);
__raw_state->__step_from_to_(__guard, __future_step::__future,
__future_step::__no_future);
}
}
template <class _Receiver2>
explicit __t(_Receiver2&& __rcvr,
std::unique_ptr<__future_state<_Sender, _Env>> __state) :
__subscription{{},
[](__subscription* __self) noexcept -> void {
static_cast<__t*>(__self)->__complete_();
}},
__rcvr_((_Receiver2&&)__rcvr), __state_(std::move(__state)),
__forward_consumer_(get_stop_token(get_env(__rcvr_)),
__forward_stopped{&__state_->__stop_source_})
{}
};
};
#if STDEXEC_NVHPC()
template <class _Fn>
struct __completion_as_tuple2_;
template <class _Tag, class... _Ts>
struct __completion_as_tuple2_<_Tag(_Ts&&...)>
{
using __t = std::tuple<_Tag, _Ts...>;
};
template <class _Fn>
using __completion_as_tuple_t = stdexec::__t<__completion_as_tuple2_<_Fn>>;
#else
template <class _Tag, class... _Ts>
std::tuple<_Tag, _Ts...> __completion_as_tuple_(_Tag (*)(_Ts&&...));
template <class _Fn>
using __completion_as_tuple_t =
decltype(__scope::__completion_as_tuple_((_Fn*)nullptr));
#endif
template <class... _Ts>
using __decay_values_t =
completion_signatures<set_value_t(__decay_t<_Ts>&&...)>;
template <class _Ty>
using __decay_error_t = completion_signatures<set_error_t(__decay_t<_Ty>&&)>;
template <class _Sender, class _Env>
using __future_completions_t = //
make_completion_signatures<
_Sender, __env_t<_Env>,
completion_signatures<set_stopped_t(),
set_error_t(std::exception_ptr&&)>,
__decay_values_t, __decay_error_t>;
template <class _Completions>
using __completions_as_variant = //
__mapply<__transform<__q<__completion_as_tuple_t>,
__mbind_front_q<std::variant, std::monostate>>,
_Completions>;
template <class _Ty>
struct __dynamic_delete
{
__dynamic_delete() : __delete_([](_Ty* __p) { delete __p; }) {}
template <class _Uy>
requires convertible_to<_Uy*, _Ty*>
__dynamic_delete(std::default_delete<_Uy>) :
__delete_([](_Ty* __p) { delete static_cast<_Uy*>(__p); })
{}
template <class _Uy>
requires convertible_to<_Uy*, _Ty*>
__dynamic_delete& operator=(std::default_delete<_Uy> __d)
{
__delete_ = __dynamic_delete{__d}.__delete_;
return *this;
}
void operator()(_Ty* __p)
{
__delete_(__p);
}
void (*__delete_)(_Ty*);
};
template <class _Completions, class _Env>
struct __future_state_base
{
__future_state_base(_Env __env, const __impl* __scope) :
__forward_scope_{std::in_place, __scope->__stop_source_.get_token(),
__forward_stopped{&__stop_source_}},
__env_(
make_env((_Env&&)__env,
with(get_stop_token, __scope->__stop_source_.get_token())))
{}
~__future_state_base()
{
std::unique_lock __guard{__mutex_};
if (__step_ == __future_step::__created)
{
// exception during connect() will end up here
__step_from_to_(__guard, __future_step::__created,
__future_step::__deleted);
}
else if (__step_ != __future_step::__deleted)
{
// completing the given sender before the future is dropped will end
// here
__step_from_to_(__guard, __future_step::__future,
__future_step::__deleted);
}
}
void __step_from_to_(std::unique_lock<std::mutex>& __guard,
__future_step __from, __future_step __to)
{
STDEXEC_ASSERT(__guard.owns_lock());
auto actual = std::exchange(__step_, __to);
STDEXEC_ASSERT(actual == __from);
}
in_place_stop_source __stop_source_;
std::optional<in_place_stop_callback<__forward_stopped>> __forward_scope_;
std::mutex __mutex_;
__future_step __step_ = __future_step::__created;
std::unique_ptr<__future_state_base, __dynamic_delete<__future_state_base>>
__no_future_;
__completions_as_variant<_Completions> __data_;
__intrusive_queue<&__subscription::__next_> __subscribers_;
__env_t<_Env> __env_;
};
template <class _Completions, class _EnvId>
struct __future_rcvr
{
using _Env = stdexec::__t<_EnvId>;
struct __t
{
using __id = __future_rcvr;
using receiver_concept = stdexec::receiver_t;
__future_state_base<_Completions, _Env>* __state_;
const __impl* __scope_;
void __dispatch_result_() noexcept
{
auto& __state = *__state_;
std::unique_lock __guard{__state.__mutex_};
auto __local = std::move(__state.__subscribers_);
__state.__forward_scope_ = std::nullopt;
if (__state.__no_future_.get() != nullptr)
{
// nobody is waiting for the results
// delete this and return
__state.__step_from_to_(__guard, __future_step::__no_future,
__future_step::__deleted);
__guard.unlock();
__state.__no_future_.reset();
return;
}
__guard.unlock();
while (!__local.empty())
{
auto* __sub = __local.pop_front();
__sub->__complete();
}
}
template <__completion_tag _Tag, __movable_value... _As>
friend void tag_invoke(_Tag, __t&& __self, _As&&... __as) noexcept
{
auto& __state = *__self.__state_;
try
{
std::unique_lock __guard{__state.__mutex_};
using _Tuple = __decayed_tuple<_Tag, _As...>;
__state.__data_.template emplace<_Tuple>(_Tag{},
(_As&&)__as...);
__guard.unlock();
__self.__dispatch_result_();
}
catch (...)
{
using _Tuple = std::tuple<set_error_t, std::exception_ptr>;
__state.__data_.template emplace<_Tuple>(
set_error_t{}, std::current_exception());
}
}
friend const __env_t<_Env>& tag_invoke(get_env_t,
const __t& __self) noexcept
{
return __self.__state_->__env_;
}
};
};
template <class _Sender, class _Env>
using __future_receiver_t =
__t<__future_rcvr<__future_completions_t<_Sender, _Env>, __id<_Env>>>;
template <class _Sender, class _Env>
struct __future_state :
__future_state_base<__future_completions_t<_Sender, _Env>, _Env>
{
using _Completions = __future_completions_t<_Sender, _Env>;
__future_state(_Sender __sndr, _Env __env, const __impl* __scope) :
__future_state_base<_Completions, _Env>((_Env&&)__env, __scope),
__op_(
stdexec::connect((_Sender&&)__sndr,
__future_receiver_t<_Sender, _Env>{this, __scope}))
{}
connect_result_t<_Sender, __future_receiver_t<_Sender, _Env>> __op_;
};
template <class _SenderId, class _EnvId>
struct __future
{
using _Sender = stdexec::__t<_SenderId>;
using _Env = stdexec::__t<_EnvId>;
struct __t
{
using __id = __future;
using sender_concept = stdexec::sender_t;
__t(__t&&) = default;
__t& operator=(__t&&) = default;
~__t() noexcept
{
if (__state_ != nullptr)
{
auto __raw_state = __state_.get();
std::unique_lock __guard{__raw_state->__mutex_};
if (__raw_state->__data_.index() != 0)
{
// completed given sender
// state is no longer needed
return;
}
__raw_state->__no_future_ = std::move(__state_);
__raw_state->__step_from_to_(__guard, __future_step::__future,
__future_step::__no_future);
}
}
private:
friend struct async_scope;
template <class _Self>
using __completions_t =
__future_completions_t<__mfront<_Sender, _Self>, _Env>;
template <class _Receiver>
using __future_op_t = stdexec::__t<
__future_op<_SenderId, _EnvId, stdexec::__id<_Receiver>>>;
explicit __t(
std::unique_ptr<__future_state<_Sender, _Env>> __state) noexcept :
__state_(std::move(__state))
{
std::unique_lock __guard{__state_->__mutex_};
__state_->__step_from_to_(__guard, __future_step::__created,
__future_step::__future);
}
template <__decays_to<__t> _Self, receiver _Receiver>
requires receiver_of<_Receiver, __completions_t<_Self>>
friend __future_op_t<_Receiver> tag_invoke(connect_t, _Self&& __self,
_Receiver __rcvr)
{
return __future_op_t<_Receiver>{(_Receiver&&)__rcvr,
std::move(__self.__state_)};
}
template <__decays_to<__t> _Self, class _OtherEnv>
friend auto tag_invoke(get_completion_signatures_t, _Self&&,
_OtherEnv&&) -> __completions_t<_Self>
{
return {};
}
friend empty_env tag_invoke(get_env_t, const __t&) noexcept
{
return {};
}
std::unique_ptr<__future_state<_Sender, _Env>> __state_;
};
};
template <class _Sender, class _Env>
using __future_t = stdexec::__t<
__future<__id<__nest_sender_t<_Sender>>, __id<__decay_t<_Env>>>>;
////////////////////////////////////////////////////////////////////////////
// async_scope::spawn implementation
template <class _Env>
using __spawn_env_t =
__env::__join_t<_Env, __env::__with<in_place_stop_token, get_stop_token_t>,
__env::__with<__inln::__scheduler, get_scheduler_t>>;
template <class _EnvId>
struct __spawn_op_base
{
using _Env = stdexec::__t<_EnvId>;
__spawn_env_t<_Env> __env_;
void (*__delete_)(__spawn_op_base*);
};
template <class _EnvId>
struct __spawn_rcvr
{
using _Env = stdexec::__t<_EnvId>;
struct __t
{
using __id = __spawn_rcvr;
using receiver_concept = stdexec::receiver_t;
__spawn_op_base<_EnvId>* __op_;
template <__one_of<set_value_t, set_stopped_t> _Tag>
friend void tag_invoke(_Tag, __t&& __self) noexcept
{
__self.__op_->__delete_(__self.__op_);
}
// BUGBUG NOT TO SPEC spawn shouldn't accept senders that can fail.
template <same_as<set_error_t> _Tag>
[[noreturn]] friend void tag_invoke(_Tag, __t&&,
std::exception_ptr __eptr) noexcept
{
std::rethrow_exception(std::move(__eptr));
}
friend const __spawn_env_t<_Env>& tag_invoke(get_env_t,
const __t& __self) noexcept
{
return __self.__op_->__env_;
}
};
};
template <class _Env>
using __spawn_receiver_t = stdexec::__t<__spawn_rcvr<__id<_Env>>>;
template <class _SenderId, class _EnvId>
struct __spawn_op
{
using _Env = stdexec::__t<_EnvId>;
using _Sender = stdexec::__t<_SenderId>;
struct __t : __spawn_op_base<_EnvId>
{
template <__decays_to<_Sender> _Sndr>
__t(_Sndr&& __sndr, _Env __env, const __impl* __scope) :
__spawn_op_base<_EnvId>{
__env::__join(
(_Env&&)__env,
__env::__with(__scope->__stop_source_.get_token(),
get_stop_token),
__env::__with(__inln::__scheduler{}, get_scheduler)),
[](__spawn_op_base<_EnvId>* __op) {
delete static_cast<__t*>(__op);
}},
__op_(stdexec::connect((_Sndr&&)__sndr,
__spawn_receiver_t<_Env>{this}))
{}
void __start_() noexcept
{
start(__op_);
}
friend void tag_invoke(start_t, __t& __self) noexcept
{
return __self.__start_();
}
connect_result_t<_Sender, __spawn_receiver_t<_Env>> __op_;
};
};
template <class _Sender, class _Env>
using __spawn_operation_t = stdexec::__t<__spawn_op<__id<_Sender>, __id<_Env>>>;
////////////////////////////////////////////////////////////////////////////
// async_scope
struct async_scope : __immovable
{
async_scope() = default;
template <sender _Constrained>
[[nodiscard]] __when_empty_sender_t<_Constrained>
when_empty(_Constrained&& __c) const
{
return __when_empty_sender_t<_Constrained>{&__impl_,
(_Constrained&&)__c};
}
[[nodiscard]] auto on_empty() const
{
return when_empty(just());
}
template <sender _Constrained>
using nest_result_t = __nest_sender_t<_Constrained>;
template <sender _Constrained>
[[nodiscard]] nest_result_t<_Constrained> nest(_Constrained&& __c)
{
return nest_result_t<_Constrained>{&__impl_, (_Constrained&&)__c};
}
template <__movable_value _Env = empty_env,
sender_in<__spawn_env_t<_Env>> _Sender>
requires sender_to<nest_result_t<_Sender>, __spawn_receiver_t<_Env>>
void spawn(_Sender&& __sndr, _Env __env = {})
{
using __op_t = __spawn_operation_t<nest_result_t<_Sender>, _Env>;
// start is noexcept so we can assume that the operation will complete
// after this, which means we can rely on its self-ownership to ensure
// that it is eventually deleted
stdexec::start(
*new __op_t{nest((_Sender&&)__sndr), (_Env&&)__env, &__impl_});
}
template <__movable_value _Env = empty_env,
sender_in<__env_t<_Env>> _Sender>
__future_t<_Sender, _Env> spawn_future(_Sender&& __sndr, _Env __env = {})
{
using __state_t = __future_state<nest_result_t<_Sender>, _Env>;
auto __state = std::make_unique<__state_t>(nest((_Sender&&)__sndr),
(_Env&&)__env, &__impl_);
stdexec::start(__state->__op_);
return __future_t<_Sender, _Env>{std::move(__state)};
}
in_place_stop_source& get_stop_source() noexcept
{
return __impl_.__stop_source_;
}
in_place_stop_token get_stop_token() const noexcept
{
return __impl_.__stop_source_.get_token();
}
bool request_stop() noexcept
{
return __impl_.__stop_source_.request_stop();
}
private:
__impl __impl_;
};
} // namespace __scope
using __scope::async_scope;
} // namespace exec