stdexec: pull in async_scope
Add the `async_scope` support from stdexec as a replacement for our
custom `scope` class.
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I069604c6525b7864bceaa9e3f0f276bac81367d1
diff --git a/include/sdbusplus/async/stdexec/async_scope.hpp b/include/sdbusplus/async/stdexec/async_scope.hpp
new file mode 100644
index 0000000..1f84717
--- /dev/null
+++ b/include/sdbusplus/async/stdexec/async_scope.hpp
@@ -0,0 +1,844 @@
+/*
+ * Copyright (c) 2021-2022 NVIDIA Corporation
+ *
+ * Licensed under the Apache License Version 2.0 with LLVM Exceptions
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://llvm.org/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "../stdexec/__detail/__intrusive_queue.hpp"
+#include "../stdexec/execution.hpp"
+#include "env.hpp"
+
+namespace exec
+{
+/////////////////////////////////////////////////////////////////////////////
+// async_scope
+namespace __scope
+{
+using namespace stdexec;
+
+struct __impl;
+struct async_scope;
+
+struct __task : __immovable
+{
+ const __impl* __scope_;
+ void (*__notify_waiter)(__task*) noexcept;
+ __task* __next_ = nullptr;
+};
+
+template <class _BaseEnv>
+using __env_t =
+ make_env_t<_BaseEnv, with_t<get_stop_token_t, in_place_stop_token>>;
+
+struct __impl
+{
+ in_place_stop_source __stop_source_{};
+ mutable std::mutex __lock_{};
+ mutable std::ptrdiff_t __active_ = 0;
+ mutable __intrusive_queue<&__task::__next_> __waiters_{};
+
+ ~__impl()
+ {
+ std::unique_lock __guard{__lock_};
+ STDEXEC_ASSERT(__active_ == 0);
+ STDEXEC_ASSERT(__waiters_.empty());
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////
+// async_scope::when_empty implementation
+template <class _ReceiverId>
+struct __when_empty_op_base : __task
+{
+ using _Receiver = __t<_ReceiverId>;
+ STDEXEC_NO_UNIQUE_ADDRESS _Receiver __rcvr_;
+};
+
+template <class _ConstrainedId, class _ReceiverId>
+struct __when_empty_op : __task
+{
+ using _Constrained = __t<_ConstrainedId>;
+ using _Receiver = __t<_ReceiverId>;
+
+ explicit __when_empty_op(const __impl* __scope, _Constrained&& __sndr,
+ _Receiver __rcvr) :
+ __task{{}, __scope, __notify_waiter},
+ __op_(connect((_Constrained&&)__sndr, (_Receiver&&)__rcvr))
+ {}
+
+ private:
+ static void __notify_waiter(__task* __self) noexcept
+ {
+ start(static_cast<__when_empty_op*>(__self)->__op_);
+ }
+
+ void __start_() noexcept
+ {
+ std::unique_lock __guard{this->__scope_->__lock_};
+ auto& __active = this->__scope_->__active_;
+ auto& __waiters = this->__scope_->__waiters_;
+ if (__active != 0)
+ {
+ __waiters.push_back(this);
+ return;
+ }
+ __guard.unlock();
+ start(this->__op_);
+ }
+
+ friend void tag_invoke(start_t, __when_empty_op& __self) noexcept
+ {
+ return __self.__start_();
+ }
+
+ STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
+ connect_result_t<_Constrained, _Receiver> __op_;
+};
+
+template <class _ConstrainedId>
+struct __when_empty_sender
+{
+ using _Constrained = __t<_ConstrainedId>;
+ using is_sender = void;
+
+ template <class _Self, class _Receiver>
+ using __when_empty_op_t =
+ __when_empty_op<__x<__copy_cvref_t<_Self, _Constrained>>,
+ __x<_Receiver>>;
+
+ template <__decays_to<__when_empty_sender> _Self, receiver _Receiver>
+ requires sender_to<__copy_cvref_t<_Self, _Constrained>, _Receiver>
+ [[nodiscard]] friend __when_empty_op_t<_Self, _Receiver>
+ tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
+ {
+ return __when_empty_op_t<_Self, _Receiver>{
+ __self.__scope_, ((_Self&&)__self).__c_, (_Receiver&&)__rcvr};
+ }
+
+ template <__decays_to<__when_empty_sender> _Self, class _Env>
+ friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
+ -> completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
+ __env_t<_Env>>;
+
+ friend empty_env tag_invoke(get_env_t, const __when_empty_sender&) noexcept
+ {
+ return {};
+ }
+
+ const __impl* __scope_;
+ STDEXEC_NO_UNIQUE_ADDRESS _Constrained __c_;
+};
+
+template <class _Constrained>
+using __when_empty_sender_t = __when_empty_sender<__x<__decay_t<_Constrained>>>;
+
+////////////////////////////////////////////////////////////////////////////
+// async_scope::nest implementation
+template <class _ReceiverId>
+struct __nest_op_base : __immovable
+{
+ using _Receiver = __t<_ReceiverId>;
+ const __impl* __scope_;
+ STDEXEC_NO_UNIQUE_ADDRESS _Receiver __rcvr_;
+};
+
+template <class _ReceiverId>
+struct __nest_rcvr
+{
+ using _Receiver = __t<_ReceiverId>;
+ __nest_op_base<_ReceiverId>* __op_;
+
+ static void __complete(const __impl* __scope) noexcept
+ {
+ std::unique_lock __guard{__scope->__lock_};
+ auto& __active = __scope->__active_;
+ if (--__active == 0)
+ {
+ auto __local = std::move(__scope->__waiters_);
+ __guard.unlock();
+ __scope = nullptr;
+ // do not access __scope
+ while (!__local.empty())
+ {
+ auto* __next = __local.pop_front();
+ __next->__notify_waiter(__next);
+ // __scope must be considered deleted
+ }
+ }
+ }
+
+ template <__completion_tag _Tag, class... _As>
+ requires __callable<_Tag, _Receiver, _As...>
+ friend void tag_invoke(_Tag, __nest_rcvr&& __self, _As&&... __as) noexcept
+ {
+ auto __scope = __self.__op_->__scope_;
+ _Tag{}(std::move(__self.__op_->__rcvr_), (_As&&)__as...);
+ // do not access __op_
+ // do not access this
+ __complete(__scope);
+ }
+
+ friend __env_t<env_of_t<_Receiver>>
+ tag_invoke(get_env_t, const __nest_rcvr& __self) noexcept
+ {
+ return make_env(
+ get_env(__self.__op_->__rcvr_),
+ with(get_stop_token,
+ __self.__op_->__scope_->__stop_source_.get_token()));
+ }
+};
+
+template <class _ConstrainedId, class _ReceiverId>
+struct __nest_op : __nest_op_base<_ReceiverId>
+{
+ using _Constrained = __t<_ConstrainedId>;
+ using _Receiver = __t<_ReceiverId>;
+ STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
+ connect_result_t<_Constrained, __nest_rcvr<_ReceiverId>> __op_;
+
+ template <__decays_to<_Constrained> _Sender, __decays_to<_Receiver> _Rcvr>
+ explicit __nest_op(const __impl* __scope, _Sender&& __c, _Rcvr&& __rcvr) :
+ __nest_op_base<_ReceiverId>{{}, __scope, (_Rcvr&&)__rcvr},
+ __op_(connect((_Sender&&)__c, __nest_rcvr<_ReceiverId>{this}))
+ {}
+
+ private:
+ void __start_() noexcept
+ {
+ STDEXEC_ASSERT(this->__scope_);
+ std::unique_lock __guard{this->__scope_->__lock_};
+ auto& __active = this->__scope_->__active_;
+ ++__active;
+ __guard.unlock();
+ start(__op_);
+ }
+
+ friend void tag_invoke(start_t, __nest_op& __self) noexcept
+ {
+ return __self.__start_();
+ }
+};
+
+template <class _ConstrainedId>
+struct __nest_sender
+{
+ using _Constrained = __t<_ConstrainedId>;
+ using is_sender = void;
+
+ const __impl* __scope_;
+ STDEXEC_NO_UNIQUE_ADDRESS _Constrained __c_;
+
+ template <class _Receiver>
+ using __nest_operation_t = __nest_op<_ConstrainedId, __x<_Receiver>>;
+ template <class _Receiver>
+ using __nest_receiver_t = __nest_rcvr<__x<_Receiver>>;
+
+ template <__decays_to<__nest_sender> _Self, receiver _Receiver>
+ requires sender_to<__copy_cvref_t<_Self, _Constrained>,
+ __nest_receiver_t<_Receiver>>
+ [[nodiscard]] friend __nest_operation_t<_Receiver>
+ tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
+ {
+ return __nest_operation_t<_Receiver>{
+ __self.__scope_, ((_Self&&)__self).__c_, (_Receiver&&)__rcvr};
+ }
+ template <__decays_to<__nest_sender> _Self, class _Env>
+ friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
+ -> completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
+ __env_t<_Env>>;
+
+ friend empty_env tag_invoke(get_env_t, const __nest_sender&) noexcept
+ {
+ return {};
+ }
+};
+
+template <class _Constrained>
+using __nest_sender_t = __nest_sender<__x<__decay_t<_Constrained>>>;
+
+////////////////////////////////////////////////////////////////////////////
+// async_scope::spawn_future implementation
+enum class __future_step
+{
+ __invalid = 0,
+ __created,
+ __future,
+ __no_future,
+ __deleted
+};
+
+template <class _Sender, class _Env>
+struct __future_state;
+
+struct __forward_stopped
+{
+ in_place_stop_source* __stop_source_;
+
+ void operator()() noexcept
+ {
+ __stop_source_->request_stop();
+ }
+};
+
+struct __subscription : __immovable
+{
+ void (*__complete_)(__subscription*) noexcept = nullptr;
+
+ void __complete() noexcept
+ {
+ __complete_(this);
+ }
+
+ __subscription* __next_ = nullptr;
+};
+
+template <class _SenderId, class _EnvId, class _ReceiverId>
+class __future_op : __subscription
+{
+ using _Sender = __t<_SenderId>;
+ using _Env = __t<_EnvId>;
+ using _Receiver = __t<_ReceiverId>;
+
+ using __forward_consumer = typename stop_token_of_t<
+ env_of_t<_Receiver>>::template callback_type<__forward_stopped>;
+
+ friend void tag_invoke(start_t, __future_op& __self) noexcept
+ {
+ __self.__start_();
+ }
+
+ void __complete_() noexcept
+ {
+ try
+ {
+ auto __state = std::move(__state_);
+ STDEXEC_ASSERT(__state != nullptr);
+ std::unique_lock __guard{__state->__mutex_};
+ // either the future is still in use or it has passed ownership to
+ // __state->__no_future_
+ if (__state->__no_future_.get() != nullptr ||
+ __state->__step_ != __future_step::__future)
+ {
+ // invalid state - there is a code bug in the state machine
+ std::terminate();
+ }
+ else if (get_stop_token(get_env(__rcvr_)).stop_requested())
+ {
+ __guard.unlock();
+ set_stopped((_Receiver&&)__rcvr_);
+ __guard.lock();
+ }
+ else
+ {
+ std::visit(
+ [this, &__guard]<class _Tup>(_Tup& __tup) {
+ if constexpr (same_as<_Tup, std::monostate>)
+ {
+ std::terminate();
+ }
+ else
+ {
+ std::apply(
+ [this, &__guard]<class... _As>(auto tag,
+ _As&... __as) {
+ __guard.unlock();
+ tag((_Receiver&&)__rcvr_, (_As&&)__as...);
+ __guard.lock();
+ },
+ __tup);
+ }
+ },
+ __state->__data_);
+ }
+ }
+ catch (...)
+ {
+ set_error((_Receiver&&)__rcvr_, std::current_exception());
+ }
+ }
+
+ void __start_() noexcept
+ {
+ try
+ {
+ if (!!__state_)
+ {
+ std::unique_lock __guard{__state_->__mutex_};
+ if (__state_->__data_.index() != 0)
+ {
+ __guard.unlock();
+ __complete_();
+ }
+ else
+ {
+ __state_->__subscribers_.push_back(this);
+ }
+ }
+ }
+ catch (...)
+ {
+ set_error((_Receiver&&)__rcvr_, std::current_exception());
+ }
+ }
+
+ STDEXEC_NO_UNIQUE_ADDRESS _Receiver __rcvr_;
+ std::unique_ptr<__future_state<_Sender, _Env>> __state_;
+ STDEXEC_NO_UNIQUE_ADDRESS __forward_consumer __forward_consumer_;
+
+ public:
+ ~__future_op() noexcept
+ {
+ if (__state_ != nullptr)
+ {
+ auto __raw_state = __state_.get();
+ std::unique_lock __guard{__raw_state->__mutex_};
+ if (__raw_state->__data_.index() > 0)
+ {
+ // completed given sender
+ // state is no longer needed
+ return;
+ }
+ __raw_state->__no_future_ = std::move(__state_);
+ __raw_state->__step_from_to_(__guard, __future_step::__future,
+ __future_step::__no_future);
+ }
+ }
+
+ template <class _Receiver2>
+ explicit __future_op(
+ _Receiver2&& __rcvr,
+ std::unique_ptr<__future_state<_Sender, _Env>> __state) :
+ __subscription{{},
+ [](__subscription* __self) noexcept -> void {
+ static_cast<__future_op*>(__self)->__complete_();
+ }},
+ __rcvr_((_Receiver2&&)__rcvr), __state_(std::move(__state)),
+ __forward_consumer_(get_stop_token(get_env(__rcvr_)),
+ __forward_stopped{&__state_->__stop_source_})
+ {}
+};
+
+#if STDEXEC_NVHPC()
+template <class _Fn>
+struct __completion_as_tuple2_;
+
+template <class _Tag, class... _Ts>
+struct __completion_as_tuple2_<_Tag(_Ts&&...)>
+{
+ using __t = std::tuple<_Tag, _Ts...>;
+};
+template <class _Fn>
+using __completion_as_tuple_t = __t<__completion_as_tuple2_<_Fn>>;
+
+#else
+
+template <class _Tag, class... _Ts>
+std::tuple<_Tag, _Ts...> __completion_as_tuple_(_Tag (*)(_Ts&&...));
+template <class _Fn>
+using __completion_as_tuple_t =
+ decltype(__scope::__completion_as_tuple_((_Fn*)nullptr));
+#endif
+
+template <class... _Ts>
+using __decay_values_t =
+ completion_signatures<set_value_t(__decay_t<_Ts>&&...)>;
+
+template <class _Ty>
+using __decay_error_t = completion_signatures<set_error_t(__decay_t<_Ty>&&)>;
+
+template <class _Sender, class _Env>
+using __future_completions_t = //
+ make_completion_signatures<
+ _Sender, __env_t<_Env>,
+ completion_signatures<set_stopped_t(),
+ set_error_t(std::exception_ptr&&)>,
+ __decay_values_t, __decay_error_t>;
+
+template <class _Completions>
+using __completions_as_variant = //
+ __mapply<__transform<__q<__completion_as_tuple_t>,
+ __mbind_front_q<std::variant, std::monostate>>,
+ _Completions>;
+
+template <class _Ty>
+struct __dynamic_delete
+{
+ __dynamic_delete() : __delete_([](_Ty* __p) { delete __p; }) {}
+
+ template <class _Uy>
+ requires convertible_to<_Uy*, _Ty*>
+ __dynamic_delete(std::default_delete<_Uy>) :
+ __delete_([](_Ty* __p) { delete static_cast<_Uy*>(__p); })
+ {}
+
+ template <class _Uy>
+ requires convertible_to<_Uy*, _Ty*>
+ __dynamic_delete& operator=(std::default_delete<_Uy> __d)
+ {
+ __delete_ = __dynamic_delete{__d}.__delete_;
+ return *this;
+ }
+
+ void operator()(_Ty* __p)
+ {
+ __delete_(__p);
+ }
+
+ void (*__delete_)(_Ty*);
+};
+
+template <class _Completions, class _Env>
+struct __future_state_base
+{
+ __future_state_base(_Env __env, const __impl* __scope) :
+ __forward_scope_{std::in_place, __scope->__stop_source_.get_token(),
+ __forward_stopped{&__stop_source_}},
+ __env_(
+ make_env((_Env&&)__env,
+ with(get_stop_token, __scope->__stop_source_.get_token())))
+ {}
+
+ ~__future_state_base()
+ {
+ std::unique_lock __guard{__mutex_};
+ if (__step_ == __future_step::__created)
+ {
+ // exception during connect() will end up here
+ __step_from_to_(__guard, __future_step::__created,
+ __future_step::__deleted);
+ }
+ else if (__step_ != __future_step::__deleted)
+ {
+ // completing the given sender before the future is dropped will end
+ // here
+ __step_from_to_(__guard, __future_step::__future,
+ __future_step::__deleted);
+ }
+ }
+
+ void __step_from_to_(std::unique_lock<std::mutex>& __guard,
+ __future_step __from, __future_step __to)
+ {
+ STDEXEC_ASSERT(__guard.owns_lock());
+ auto actual = std::exchange(__step_, __to);
+ STDEXEC_ASSERT(actual == __from);
+ }
+
+ in_place_stop_source __stop_source_;
+ std::optional<in_place_stop_callback<__forward_stopped>> __forward_scope_;
+ std::mutex __mutex_;
+ __future_step __step_ = __future_step::__created;
+ std::unique_ptr<__future_state_base, __dynamic_delete<__future_state_base>>
+ __no_future_;
+ __completions_as_variant<_Completions> __data_;
+ __intrusive_queue<&__subscription::__next_> __subscribers_;
+ __env_t<_Env> __env_;
+};
+
+template <class _CompletionsId, class _EnvId>
+struct __future_rcvr
+{
+ using _Completions = __t<_CompletionsId>;
+ using _Env = __t<_EnvId>;
+ __future_state_base<_Completions, _Env>* __state_;
+ const __impl* __scope_;
+
+ void __dispatch_result_() noexcept
+ {
+ auto& __state = *__state_;
+ std::unique_lock __guard{__state.__mutex_};
+ auto __local = std::move(__state.__subscribers_);
+ __state.__forward_scope_ = std::nullopt;
+ if (__state.__no_future_.get() != nullptr)
+ {
+ // nobody is waiting for the results
+ // delete this and return
+ __state.__step_from_to_(__guard, __future_step::__no_future,
+ __future_step::__deleted);
+ __guard.unlock();
+ __state.__no_future_.reset();
+ return;
+ }
+ __guard.unlock();
+ while (!__local.empty())
+ {
+ auto* __sub = __local.pop_front();
+ __sub->__complete();
+ }
+ }
+
+ template <__completion_tag _Tag, __movable_value... _As>
+ friend void tag_invoke(_Tag, __future_rcvr&& __self, _As&&... __as) noexcept
+ {
+ auto& __state = *__self.__state_;
+ try
+ {
+ std::unique_lock __guard{__state.__mutex_};
+ using _Tuple = __decayed_tuple<_Tag, _As...>;
+ __state.__data_.template emplace<_Tuple>(_Tag{}, (_As&&)__as...);
+ __guard.unlock();
+ __self.__dispatch_result_();
+ }
+ catch (...)
+ {
+ using _Tuple = std::tuple<set_error_t, std::exception_ptr>;
+ __state.__data_.template emplace<_Tuple>(set_error_t{},
+ std::current_exception());
+ }
+ }
+
+ friend const __env_t<_Env>& tag_invoke(get_env_t,
+ const __future_rcvr& __self) noexcept
+ {
+ return __self.__state_->__env_;
+ }
+};
+
+template <class _Sender, class _Env>
+using __future_receiver_t =
+ __future_rcvr<__x<__future_completions_t<_Sender, _Env>>, __x<_Env>>;
+
+template <class _Sender, class _Env>
+struct __future_state :
+ __future_state_base<__future_completions_t<_Sender, _Env>, _Env>
+{
+ using _Completions = __future_completions_t<_Sender, _Env>;
+
+ __future_state(_Sender __sndr, _Env __env, const __impl* __scope) :
+ __future_state_base<_Completions, _Env>((_Env&&)__env, __scope),
+ __op_(connect((_Sender&&)__sndr,
+ __future_receiver_t<_Sender, _Env>{this, __scope}))
+ {}
+
+ connect_result_t<_Sender, __future_receiver_t<_Sender, _Env>> __op_;
+};
+
+template <class _SenderId, class _EnvId>
+class __future
+{
+ using _Sender = __t<_SenderId>;
+ using _Env = __t<_EnvId>;
+ friend struct async_scope;
+
+ public:
+ using is_sender = void;
+
+ __future(__future&&) = default;
+ __future& operator=(__future&&) = default;
+
+ ~__future() noexcept
+ {
+ if (__state_ != nullptr)
+ {
+ auto __raw_state = __state_.get();
+ std::unique_lock __guard{__raw_state->__mutex_};
+ if (__raw_state->__data_.index() != 0)
+ {
+ // completed given sender
+ // state is no longer needed
+ return;
+ }
+ __raw_state->__no_future_ = std::move(__state_);
+ __raw_state->__step_from_to_(__guard, __future_step::__future,
+ __future_step::__no_future);
+ }
+ }
+
+ private:
+ template <class _Self>
+ using __completions_t =
+ __future_completions_t<__mfront<_Sender, _Self>, _Env>;
+
+ explicit __future(
+ std::unique_ptr<__future_state<_Sender, _Env>> __state) noexcept :
+ __state_(std::move(__state))
+ {
+ std::unique_lock __guard{__state_->__mutex_};
+ __state_->__step_from_to_(__guard, __future_step::__created,
+ __future_step::__future);
+ }
+
+ template <__decays_to<__future> _Self, receiver _Receiver>
+ requires receiver_of<_Receiver, __completions_t<_Self>>
+ friend __future_op<_SenderId, _EnvId, __x<_Receiver>>
+ tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
+ {
+ return __future_op<_SenderId, _EnvId, __x<_Receiver>>{
+ (_Receiver&&)__rcvr, std::move(__self.__state_)};
+ }
+
+ template <__decays_to<__future> _Self, class _OtherEnv>
+ friend auto tag_invoke(get_completion_signatures_t, _Self&&, _OtherEnv&&)
+ -> __completions_t<_Self>;
+
+ friend empty_env tag_invoke(get_env_t, const __future&) noexcept
+ {
+ return {};
+ }
+
+ std::unique_ptr<__future_state<_Sender, _Env>> __state_;
+};
+
+template <class _Sender, class _Env>
+using __future_t =
+ __future<__x<__nest_sender_t<_Sender>>, __x<__decay_t<_Env>>>;
+
+////////////////////////////////////////////////////////////////////////////
+// async_scope::spawn implementation
+template <class _EnvId>
+struct __spawn_op_base
+{
+ using _Env = __t<_EnvId>;
+ __env_t<_Env> __env_;
+ void (*__delete_)(__spawn_op_base*);
+};
+
+template <class _EnvId>
+struct __spawn_rcvr
+{
+ using _Env = __t<_EnvId>;
+ __spawn_op_base<_EnvId>* __op_;
+ const __impl* __scope_;
+
+ template <__one_of<set_value_t, set_stopped_t> _Tag>
+ friend void tag_invoke(_Tag, __spawn_rcvr&& __self) noexcept
+ {
+ __self.__op_->__delete_(__self.__op_);
+ }
+
+ // BUGBUG NOT TO SPEC spawn shouldn't accept senders that can fail.
+ template <same_as<set_error_t> _Tag>
+ [[noreturn]] friend void tag_invoke(_Tag, __spawn_rcvr&&,
+ const std::exception_ptr&) noexcept
+ {
+ std::terminate();
+ }
+
+ friend const __env_t<_Env>& tag_invoke(get_env_t,
+ const __spawn_rcvr& __self) noexcept
+ {
+ return __self.__op_->__env_;
+ }
+};
+
+template <class _Env>
+using __spawn_receiver_t = __spawn_rcvr<__x<_Env>>;
+
+template <class _SenderId, class _EnvId>
+struct __spawn_op : __spawn_op_base<_EnvId>
+{
+ using _Env = __t<_EnvId>;
+ using _Sender = __t<_SenderId>;
+
+ template <__decays_to<_Sender> _Sndr>
+ __spawn_op(_Sndr&& __sndr, _Env __env, const __impl* __scope) :
+ __spawn_op_base<_EnvId>{
+ make_env((_Env&&)__env,
+ with(get_stop_token, __scope->__stop_source_.get_token())),
+ [](__spawn_op_base<_EnvId>* __op) {
+ delete static_cast<__spawn_op*>(__op);
+ }},
+ __op_(connect((_Sndr&&)__sndr, __spawn_receiver_t<_Env>{this, __scope}))
+ {}
+
+ void __start_() noexcept
+ {
+ start(__op_);
+ }
+
+ friend void tag_invoke(start_t, __spawn_op& __self) noexcept
+ {
+ return __self.__start_();
+ }
+
+ connect_result_t<_Sender, __spawn_receiver_t<_Env>> __op_;
+};
+
+template <class _Sender, class _Env>
+using __spawn_operation_t = __spawn_op<__x<_Sender>, __x<_Env>>;
+
+////////////////////////////////////////////////////////////////////////////
+// async_scope
+struct async_scope : __immovable
+{
+ async_scope() = default;
+
+ template <sender _Constrained>
+ [[nodiscard]] __when_empty_sender_t<_Constrained>
+ when_empty(_Constrained&& __c) const
+ {
+ return __when_empty_sender_t<_Constrained>{&__impl_,
+ (_Constrained&&)__c};
+ }
+
+ [[nodiscard]] auto on_empty() const
+ {
+ return when_empty(just());
+ }
+
+ template <sender _Constrained>
+ using nest_result_t = __nest_sender_t<_Constrained>;
+
+ template <sender _Constrained>
+ [[nodiscard]] nest_result_t<_Constrained> nest(_Constrained&& __c)
+ {
+ return nest_result_t<_Constrained>{&__impl_, (_Constrained&&)__c};
+ }
+
+ template <__movable_value _Env = empty_env,
+ sender_in<__env_t<_Env>> _Sender>
+ requires sender_to<nest_result_t<_Sender>, __spawn_receiver_t<_Env>>
+ void spawn(_Sender&& __sndr, _Env __env = {})
+ {
+ using __op_t = __spawn_operation_t<nest_result_t<_Sender>, _Env>;
+ // start is noexcept so we can assume that the operation will complete
+ // after this, which means we can rely on its self-ownership to ensure
+ // that it is eventually deleted
+ stdexec::start(
+ *new __op_t{nest((_Sender&&)__sndr), (_Env&&)__env, &__impl_});
+ }
+
+ template <__movable_value _Env = empty_env,
+ sender_in<__env_t<_Env>> _Sender>
+ __future_t<_Sender, _Env> spawn_future(_Sender&& __sndr, _Env __env = {})
+ {
+ using __state_t = __future_state<nest_result_t<_Sender>, _Env>;
+ auto __state = std::make_unique<__state_t>(nest((_Sender&&)__sndr),
+ (_Env&&)__env, &__impl_);
+ stdexec::start(__state->__op_);
+ return __future_t<_Sender, _Env>{std::move(__state)};
+ }
+
+ in_place_stop_source& get_stop_source() noexcept
+ {
+ return __impl_.__stop_source_;
+ }
+
+ in_place_stop_token get_stop_token() const noexcept
+ {
+ return __impl_.__stop_source_.get_token();
+ }
+
+ bool request_stop() noexcept
+ {
+ return __impl_.__stop_source_.request_stop();
+ }
+
+ private:
+ __impl __impl_;
+};
+} // namespace __scope
+
+using __scope::async_scope;
+} // namespace exec
diff --git a/include/sdbusplus/async/stdexec/env.hpp b/include/sdbusplus/async/stdexec/env.hpp
new file mode 100644
index 0000000..1ec7a6e
--- /dev/null
+++ b/include/sdbusplus/async/stdexec/env.hpp
@@ -0,0 +1,272 @@
+/*
+ * Copyright (c) 2021-2022 NVIDIA Corporation
+ *
+ * Licensed under the Apache License Version 2.0 with LLVM Exceptions
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://llvm.org/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "../stdexec/execution.hpp"
+
+#ifdef __EDG__
+#pragma diagnostic push
+#pragma diag_suppress 1302
+#endif
+
+namespace exec
+{
+template <class... _TagValue>
+using with_t = stdexec::__with<_TagValue...>;
+
+namespace __detail
+{
+struct __with_t
+{
+ template <class _Tag, class _Value>
+ with_t<_Tag, _Value> operator()(_Tag, _Value&& __val) const
+ {
+ return stdexec::__with_(_Tag(), (_Value&&)__val);
+ }
+
+ template <class _Tag>
+ with_t<_Tag> operator()(_Tag) const
+ {
+ return stdexec::__with_(_Tag());
+ }
+};
+} // namespace __detail
+
+inline constexpr __detail::__with_t with{};
+
+inline constexpr stdexec::__env::__make_env_t make_env{};
+
+template <class... _Ts>
+using make_env_t = stdexec::__make_env_t<_Ts...>;
+
+namespace __read_with_default
+{
+using namespace stdexec;
+
+struct read_with_default_t;
+
+template <class _Tag, class _DefaultId, class _ReceiverId>
+struct __operation : __immovable
+{
+ using _Default = __t<_DefaultId>;
+ using _Receiver = __t<_ReceiverId>;
+
+ STDEXEC_NO_UNIQUE_ADDRESS _Default __default_;
+ _Receiver __rcvr_;
+
+ friend void tag_invoke(start_t, __operation& __self) noexcept
+ {
+ try
+ {
+ if constexpr (__callable<_Tag, env_of_t<_Receiver>>)
+ {
+ const auto& __env = get_env(__self.__rcvr_);
+ set_value(std::move(__self.__rcvr_), _Tag{}(__env));
+ }
+ else
+ {
+ set_value(std::move(__self.__rcvr_),
+ std::move(__self.__default_));
+ }
+ }
+ catch (...)
+ {
+ set_error(std::move(__self.__rcvr_), std::current_exception());
+ }
+ }
+};
+
+template <class _Tag, class _DefaultId>
+struct __sender
+{
+ using _Default = __t<_DefaultId>;
+ using is_sender = void;
+ STDEXEC_NO_UNIQUE_ADDRESS _Default __default_;
+
+ template <class _Env>
+ using __value_t = __minvoke<
+ __with_default<__mbind_back_q<__call_result_t, _Env>, _Default>, _Tag>;
+ template <class _Env>
+ using __default_t = __if_c<__callable<_Tag, _Env>, __ignore, _Default>;
+ template <class _Env>
+ using __completions_t =
+ completion_signatures<set_value_t(__value_t<_Env>),
+ set_error_t(std::exception_ptr)>;
+
+ template <__decays_to<__sender> _Self, class _Receiver>
+ requires receiver_of<_Receiver, __completions_t<env_of_t<_Receiver>>>
+ friend auto tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr) //
+ noexcept(std::is_nothrow_move_constructible_v<_Receiver>)
+ -> __operation<_Tag, __x<__default_t<env_of_t<_Receiver>>>,
+ __x<_Receiver>>
+ {
+ return {{}, ((_Self&&)__self).__default_, (_Receiver&&)__rcvr};
+ }
+
+ friend auto tag_invoke(get_completion_signatures_t, __sender, no_env)
+ -> dependent_completion_signatures<no_env>;
+ template <__none_of<no_env> _Env>
+ friend auto tag_invoke(get_completion_signatures_t, __sender, _Env&&)
+ -> __completions_t<_Env>;
+};
+
+struct __read_with_default_t
+{
+ template <class _Tag, class _Default>
+ constexpr auto operator()(_Tag, _Default&& __default) const
+ -> __sender<_Tag, __x<__decay_t<_Default>>>
+ {
+ return {(_Default&&)__default};
+ }
+};
+} // namespace __read_with_default
+
+inline constexpr __read_with_default::__read_with_default_t read_with_default{};
+
+namespace __write
+{
+using namespace stdexec;
+
+struct __write_t;
+
+template <class _ReceiverId, class _Env>
+struct __operation_base
+{
+ using _Receiver = __t<_ReceiverId>;
+ _Receiver __rcvr_;
+ const _Env __env_;
+};
+
+template <class _ReceiverId, class _Env>
+struct __receiver
+{
+ using _Receiver = stdexec::__t<_ReceiverId>;
+
+ struct __t : receiver_adaptor<__t>
+ {
+ _Receiver&& base() && noexcept
+ {
+ return (_Receiver&&)__op_->__rcvr_;
+ }
+
+ const _Receiver& base() const& noexcept
+ {
+ return __op_->__rcvr_;
+ }
+
+ auto get_env() const noexcept
+ -> __env::__env_join_t<const _Env&, env_of_t<_Receiver>>
+ {
+ return __env::__join_env(__op_->__env_, stdexec::get_env(base()));
+ }
+
+ __operation_base<_ReceiverId, _Env>* __op_;
+ };
+};
+
+template <class _SenderId, class _ReceiverId, class _Env>
+struct __operation : __operation_base<_ReceiverId, _Env>
+{
+ using _Sender = __t<_SenderId>;
+ using __base_t = __operation_base<_ReceiverId, _Env>;
+ using __receiver_t = __t<__receiver<_ReceiverId, _Env>>;
+ connect_result_t<_Sender, __receiver_t> __state_;
+
+ __operation(_Sender&& __sndr, auto&& __rcvr, auto&& __env) :
+ __base_t{(decltype(__rcvr))__rcvr, (decltype(__env))__env},
+ __state_{connect((_Sender&&)__sndr, __receiver_t{{}, this})}
+ {}
+
+ friend void tag_invoke(start_t, __operation& __self) noexcept
+ {
+ start(__self.__state_);
+ }
+};
+
+template <class _SenderId, class _Env>
+struct __sender
+{
+ using _Sender = stdexec::__t<_SenderId>;
+ using is_sender = void;
+
+ template <class _Receiver>
+ using __receiver_t = stdexec::__t<__receiver<__id<_Receiver>, _Env>>;
+ template <class _Self, class _Receiver>
+ using __operation_t = __operation<__id<__copy_cvref_t<_Self, _Sender>>,
+ __id<_Receiver>, _Env>;
+
+ struct __t
+ {
+ using __id = __sender;
+ _Sender __sndr_;
+ _Env __env_;
+
+ template <__decays_to<__t> _Self, receiver _Receiver>
+ requires sender_to<__copy_cvref_t<_Self, _Sender>,
+ __receiver_t<_Receiver>>
+ friend auto tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
+ -> __operation_t<_Self, _Receiver>
+ {
+ return {((_Self&&)__self).__sndr_, (_Receiver&&)__rcvr,
+ ((_Self&&)__self).__env_};
+ }
+
+ friend auto tag_invoke(stdexec::get_env_t, const __t& __self) //
+ noexcept(
+ stdexec::__nothrow_callable<stdexec::get_env_t, const _Sender&>)
+ -> stdexec::env_of_t<const _Sender&>
+ {
+ return stdexec::get_env(__self.__sndr_);
+ }
+
+ template <__decays_to<__t> _Self, class _BaseEnv>
+ friend auto tag_invoke(get_completion_signatures_t, _Self&&, _BaseEnv&&)
+ -> stdexec::__completion_signatures_of_t<
+ __copy_cvref_t<_Self, _Sender>,
+ __env::__env_join_t<_Env, _BaseEnv>>;
+ };
+};
+
+struct __write_t
+{
+ template <class _Sender, class... _Funs>
+ using __sender_t =
+ __t<__sender<__id<__decay_t<_Sender>>,
+ __env::__env_join_t<__env::__env_fn<_Funs>...>>>;
+
+ template <__is_not_instance_of<__env::__env_fn> _Sender, class... _Funs>
+ requires sender<_Sender>
+ auto operator()(_Sender&& __sndr, __env::__env_fn<_Funs>... __withs) const
+ -> __sender_t<_Sender, _Funs...>
+ {
+ return {(_Sender&&)__sndr, __env::__join_env(std::move(__withs)...)};
+ }
+
+ template <class... _Funs>
+ auto operator()(__env::__env_fn<_Funs>... __withs) const
+ -> __binder_back<__write_t, __env::__env_fn<_Funs>...>
+ {
+ return {{}, {}, {std::move(__withs)...}};
+ }
+};
+} // namespace __write
+
+inline constexpr __write::__write_t write{};
+} // namespace exec
+
+#ifdef __EDG__
+#pragma diagnostic pop
+#endif
diff --git a/include/sdbusplus/async/stdexec/import b/include/sdbusplus/async/stdexec/import
index 7fbd6c2..4ba6a80 100755
--- a/include/sdbusplus/async/stdexec/import
+++ b/include/sdbusplus/async/stdexec/import
@@ -4,7 +4,7 @@
git -C "${execution_dir}" rev-parse HEAD > commit.info
cp -r "${execution_dir}"/include/stdexec/* .
-cp "${execution_dir}"/include/exec/{any_sender_of,at_coroutine_exit,inline_scheduler,scope,task}.hpp .
+cp "${execution_dir}"/include/exec/{any_sender_of,async_scope,at_coroutine_exit,env,inline_scheduler,scope,task}.hpp .
(find . -name "*.hpp" -print0 || true) | while IFS= read -r -d '' f
do