stdexec: update to latest commit
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: Ica6de68697329c2aad3a030a7ffb45bdb6a30f11
diff --git a/include/sdbusplus/async/stdexec/__detail/__basic_sender.hpp b/include/sdbusplus/async/stdexec/__detail/__basic_sender.hpp
index 70a02ec..c71a879 100644
--- a/include/sdbusplus/async/stdexec/__detail/__basic_sender.hpp
+++ b/include/sdbusplus/async/stdexec/__detail/__basic_sender.hpp
@@ -163,16 +163,15 @@
__connect_fn<_Sexpr, _Receiver>> &&
__mvalid<__state_type_t, tag_of_t<_Sexpr>, _Sexpr, _Receiver>;
-// Note: This is UB. UBSAN allows it for now.
-template <class _Parent, class _Child>
-_Parent* __parent_from_child(_Child* __child,
- _Child _Parent::*__mbr_ptr) noexcept
-{
- alignas(_Parent) char __buf[sizeof(_Parent)];
- _Parent* __parent = (_Parent*)&__buf;
- const std::ptrdiff_t __offset = (char*)&(__parent->*__mbr_ptr) - __buf;
- return (_Parent*)((char*)__child - __offset);
-}
+// // Note: This is UB. UBSAN allows it for now.
+// template <class _Parent, class _Child>
+// _Parent* __parent_from_child(_Child* __child, _Child _Parent::*__mbr_ptr)
+// noexcept {
+// alignas(_Parent) char __buf[sizeof(_Parent)];
+// _Parent* __parent = (_Parent*) &__buf;
+// const std::ptrdiff_t __offset = (char*) &(__parent->*__mbr_ptr) - __buf;
+// return (_Parent*) ((char*) __child - __offset);
+// }
inline constexpr auto __get_attrs = //
[](__ignore, const auto&... __child) noexcept -> decltype(auto) {
@@ -243,17 +242,15 @@
// created with this receiver.
__parent_op_t* __op_;
- template <class _ChildSexpr, class _ChildReceiver>
- static __t __from_op_state(
- __op_state<_ChildSexpr, _ChildReceiver>* __child) noexcept
- {
- using __parent_op_t = __op_state<_Sexpr, _Receiver>;
- std::ptrdiff_t __offset =
- __parent_op_t::template __get_child_op_offset<__v<_Idx>>();
- __parent_op_t* __parent =
- (__parent_op_t*)((char*)__child - __offset);
- return __t{__parent};
- }
+ // template <class _ChildSexpr, class _ChildReceiver>
+ // static __t __from_op_state(__op_state<_ChildSexpr, _ChildReceiver>*
+ // __child) noexcept {
+ // using __parent_op_t = __op_state<_Sexpr, _Receiver>;
+ // std::ptrdiff_t __offset = __parent_op_t::template
+ // __get_child_op_offset<__v<_Idx>>();
+ // __parent_op_t* __parent = (__parent_op_t*) ((char*) __child -
+ // __offset); return __t{__parent};
+ // }
template <__completion_tag _Tag, class... _Args>
STDEXEC_ATTRIBUTE((always_inline))
@@ -292,12 +289,7 @@
__state_(__sexpr_impl<__tag_t>::get_state((_Sexpr&&)__sndr, __rcvr_))
{}
- _Receiver& __rcvr() noexcept
- {
- return __rcvr_;
- }
-
- const _Receiver& __rcvr() const noexcept
+ _Receiver& __rcvr() & noexcept
{
return __rcvr_;
}
@@ -341,16 +333,6 @@
__op_base_t* __base = (__op_base_t*)((char*)__derived - __offset);
return __base->__rcvr();
}
-
- decltype(auto) __receiver() const noexcept
- {
- using __derived_t = decltype(__op_base_t::__state_);
- const __derived_t* __derived = static_cast<const __derived_t*>(this);
- constexpr std::size_t __offset = offsetof(__op_base_t, __state_);
- const __op_base_t* __base =
- (const __op_base_t*)((const char*)__derived - __offset);
- return __base->__rcvr();
- }
};
STDEXEC_PRAGMA_POP()
@@ -401,29 +383,22 @@
using __data_t = typename __desc_t::__data;
using __children_t = typename __desc_t::__children;
using __state_t = typename __op_state::__state_t;
- using __connect_t = __connect_fn<_Sexpr, _Receiver>;
-
- static auto __connect(__op_state* __self, _Sexpr&& __sexpr)
- -> __result_of<__sexpr_apply, _Sexpr, __connect_t>
- {
- return __sexpr_apply((_Sexpr&&)__sexpr, __connect_t{__self});
- }
-
using __inner_ops_t =
- decltype(__op_state::__connect(nullptr, __declval<_Sexpr>()));
+ __result_of<__sexpr_apply, _Sexpr, __connect_fn<_Sexpr, _Receiver>>;
+
__inner_ops_t __inner_ops_;
- template <std::size_t _Idx>
- static std::ptrdiff_t __get_child_op_offset() noexcept
- {
- __op_state* __self = (__op_state*)&__self;
- return (std::ptrdiff_t)(
- (char*)&__tup::__get<_Idx>(__self->__inner_ops_) - (char*)__self);
- }
+ // template <std::size_t _Idx>
+ // static std::ptrdiff_t __get_child_op_offset() noexcept {
+ // __op_state* __self = (__op_state*) &__self;
+ // return (std::ptrdiff_t)((char*)
+ // &__tup::__get<_Idx>(__self->__inner_ops_) - (char*) __self);
+ // }
__op_state(_Sexpr&& __sexpr, _Receiver __rcvr) :
__op_state::__op_base{(_Sexpr&&)__sexpr, (_Receiver&&)__rcvr},
- __inner_ops_(__op_state::__connect(this, (_Sexpr&&)__sexpr))
+ __inner_ops_(__sexpr_apply((_Sexpr&&)__sexpr,
+ __connect_fn<_Sexpr, _Receiver>{this}))
{}
template <same_as<start_t> _Tag2>
diff --git a/include/sdbusplus/async/stdexec/__detail/__config.hpp b/include/sdbusplus/async/stdexec/__detail/__config.hpp
index d63c894..537de40 100644
--- a/include/sdbusplus/async/stdexec/__detail/__config.hpp
+++ b/include/sdbusplus/async/stdexec/__detail/__config.hpp
@@ -219,7 +219,12 @@
#define STDEXEC_PRAGMA_IGNORE_EDG(...) \
_Pragma(STDEXEC_STRINGIZE(diag_suppress __VA_ARGS__))
#elif STDEXEC_CLANG() || STDEXEC_GCC()
-#define STDEXEC_PRAGMA_PUSH() _Pragma("GCC diagnostic push")
+#define STDEXEC_PRAGMA_PUSH() \
+ _Pragma("GCC diagnostic push") STDEXEC_PRAGMA_IGNORE_GNU("-Wpragmas") \
+ STDEXEC_PRAGMA_IGNORE_GNU("-Wunknown-pragmas") \
+ STDEXEC_PRAGMA_IGNORE_GNU("-Wunknown-warning-option") \
+ STDEXEC_PRAGMA_IGNORE_GNU("-Wunknown-attributes") \
+ STDEXEC_PRAGMA_IGNORE_GNU("-Wattributes")
#define STDEXEC_PRAGMA_POP() _Pragma("GCC diagnostic pop")
#define STDEXEC_PRAGMA_IGNORE_GNU(...) \
_Pragma(STDEXEC_STRINGIZE(GCC diagnostic ignored __VA_ARGS__))
@@ -241,6 +246,12 @@
#define STDEXEC_HAS_BUILTIN(...) 0
#endif
+#if !STDEXEC_MSVC() && defined(__has_feature)
+#define STDEXEC_HAS_FEATURE __has_feature
+#else
+#define STDEXEC_HAS_FEATURE(...) 0
+#endif
+
#if STDEXEC_HAS_BUILTIN(__is_trivially_copyable) || STDEXEC_MSVC()
#define STDEXEC_IS_TRIVIALLY_COPYABLE(...) __is_trivially_copyable(__VA_ARGS__)
#else
@@ -307,6 +318,12 @@
#define STDEXEC_TERMINATE() std::terminate()
#endif
+#if STDEXEC_HAS_FEATURE(thread_sanitizer) || defined(__SANITIZE_THREAD__)
+#define STDEXEC_TSAN(...) STDEXEC_HEAD_OR_TAIL(1, __VA_ARGS__)
+#else
+#define STDEXEC_TSAN(...) STDEXEC_HEAD_OR_NULL(0, __VA_ARGS__)
+#endif
+
// Before clang-16, clang did not like libstdc++'s ranges implementation
#if __has_include(<ranges>) && \
(defined(__cpp_lib_ranges) && __cpp_lib_ranges >= 201911L) && \
diff --git a/include/sdbusplus/async/stdexec/__detail/__env.hpp b/include/sdbusplus/async/stdexec/__detail/__env.hpp
index 0b2e7f1..229de70 100644
--- a/include/sdbusplus/async/stdexec/__detail/__env.hpp
+++ b/include/sdbusplus/async/stdexec/__detail/__env.hpp
@@ -509,8 +509,11 @@
return tag_invoke(as_awaitable, (_Ty&&)__value, *this);
}
- friend auto tag_invoke(get_env_t, const __env_promise&) noexcept
- -> const _Env&;
+ template <same_as<get_env_t> _Tag>
+ friend auto tag_invoke(_Tag, const __env_promise&) noexcept -> const _Env&
+ {
+ std::terminate();
+ }
};
// For making an environment from key/value pairs and optionally
diff --git a/include/sdbusplus/async/stdexec/__detail/__execution_fwd.hpp b/include/sdbusplus/async/stdexec/__detail/__execution_fwd.hpp
index affffe9..70393e9 100644
--- a/include/sdbusplus/async/stdexec/__detail/__execution_fwd.hpp
+++ b/include/sdbusplus/async/stdexec/__detail/__execution_fwd.hpp
@@ -208,6 +208,26 @@
extern const bulk_t bulk;
//////////////////////////////////////////////////////////////////////////////////////////////////
+namespace __split
+{
+struct split_t;
+struct __split_t;
+} // namespace __split
+
+using __split::split_t;
+extern const split_t split;
+
+//////////////////////////////////////////////////////////////////////////////////////////////////
+namespace __ensure_started
+{
+struct ensure_started_t;
+struct __ensure_started_t;
+} // namespace __ensure_started
+
+using __ensure_started::ensure_started_t;
+extern const ensure_started_t ensure_started;
+
+//////////////////////////////////////////////////////////////////////////////////////////////////
namespace __on_v2
{
struct on_t;
diff --git a/include/sdbusplus/async/stdexec/__detail/__intrusive_ptr.hpp b/include/sdbusplus/async/stdexec/__detail/__intrusive_ptr.hpp
index feaaccc..74505f9 100644
--- a/include/sdbusplus/async/stdexec/__detail/__intrusive_ptr.hpp
+++ b/include/sdbusplus/async/stdexec/__detail/__intrusive_ptr.hpp
@@ -22,6 +22,10 @@
#include <memory>
#include <new>
+#if STDEXEC_TSAN()
+#include <sanitizer/tsan_interface.h>
+#endif
+
namespace stdexec
{
namespace __ptr
@@ -30,7 +34,22 @@
struct __make_intrusive_t;
template <class _Ty>
-struct __enable_intrusive_from_this;
+class __intrusive_ptr;
+
+template <class _Ty>
+struct __enable_intrusive_from_this
+{
+ __intrusive_ptr<_Ty> __intrusive_from_this() noexcept;
+ __intrusive_ptr<const _Ty> __intrusive_from_this() const noexcept;
+
+ private:
+ friend _Ty;
+ void __inc_ref() noexcept;
+ void __dec_ref() noexcept;
+};
+
+STDEXEC_PRAGMA_PUSH()
+STDEXEC_PRAGMA_IGNORE_GNU("-Wtsan")
template <class _Ty>
struct __control_block
@@ -58,8 +77,27 @@
{
return *(_Ty*)__value_;
}
+
+ void __inc_ref_() noexcept
+ {
+ __refcount_.fetch_add(1, std::memory_order_relaxed);
+ }
+
+ void __dec_ref_() noexcept
+ {
+ if (1u == __refcount_.fetch_sub(1, std::memory_order_release))
+ {
+ std::atomic_thread_fence(std::memory_order_acquire);
+ // TSan does not support std::atomic_thread_fence, so we
+ // need to use the TSan-specific __tsan_acquire instead:
+ STDEXEC_TSAN(__tsan_acquire(&__refcount_));
+ delete this;
+ }
+ }
};
+STDEXEC_PRAGMA_POP()
+
template <class _Ty>
class __intrusive_ptr
{
@@ -73,25 +111,25 @@
__data_(__data)
{}
- void __addref_() noexcept
+ void __inc_ref_() noexcept
{
if (__data_)
{
- __data_->__refcount_.fetch_add(1, std::memory_order_relaxed);
+ __data_->__inc_ref_();
}
}
- void __release_() noexcept
+ void __dec_ref_() noexcept
{
- if (__data_ &&
- 1u == __data_->__refcount_.fetch_sub(1, std::memory_order_release))
+ if (__data_)
{
- std::atomic_thread_fence(std::memory_order_acquire);
- delete __data_;
+ __data_->__dec_ref_();
}
}
public:
+ using element_type = _Ty;
+
__intrusive_ptr() = default;
__intrusive_ptr(__intrusive_ptr&& __that) noexcept :
@@ -101,9 +139,14 @@
__intrusive_ptr(const __intrusive_ptr& __that) noexcept :
__data_(__that.__data_)
{
- __addref_();
+ __inc_ref_();
}
+ __intrusive_ptr(__enable_intrusive_from_this<_Ty>* __that) noexcept :
+ __intrusive_ptr(__that ? __that->__intrusive_from_this()
+ : __intrusive_ptr())
+ {}
+
__intrusive_ptr& operator=(__intrusive_ptr&& __that) noexcept
{
[[maybe_unused]] __intrusive_ptr __old{
@@ -116,9 +159,16 @@
return operator=(__intrusive_ptr(__that));
}
+ __intrusive_ptr&
+ operator=(__enable_intrusive_from_this<_Ty>* __that) noexcept
+ {
+ return operator=(__that ? __that->__intrusive_from_this()
+ : __intrusive_ptr());
+ }
+
~__intrusive_ptr()
{
- __release_();
+ __dec_ref_();
}
void reset() noexcept
@@ -165,26 +215,36 @@
};
template <class _Ty>
-struct __enable_intrusive_from_this
+__intrusive_ptr<_Ty>
+ __enable_intrusive_from_this<_Ty>::__intrusive_from_this() noexcept
{
- __intrusive_ptr<_Ty> __intrusive_from_this() noexcept
- {
- static_assert(0 == offsetof(__control_block<_Ty>, __value_));
- _Ty* __this = static_cast<_Ty*>(this);
- __intrusive_ptr<_Ty> __p{(__control_block<_Ty>*)__this};
- __p.__addref_();
- return __p;
- }
+ auto* __data = (__control_block<_Ty>*)static_cast<_Ty*>(this);
+ __data->__inc_ref_();
+ return __intrusive_ptr<_Ty>{__data};
+}
- __intrusive_ptr<const _Ty> __intrusive_from_this() const noexcept
- {
- static_assert(0 == offsetof(__control_block<_Ty>, __value_));
- const _Ty* __this = static_cast<const _Ty*>(this);
- __intrusive_ptr<const _Ty> __p{(__control_block<_Ty>*)__this};
- __p.__addref_();
- return __p;
- }
-};
+template <class _Ty>
+__intrusive_ptr<const _Ty>
+ __enable_intrusive_from_this<_Ty>::__intrusive_from_this() const noexcept
+{
+ auto* __data = (__control_block<_Ty>*)static_cast<const _Ty*>(this);
+ __data->__inc_ref_();
+ return __intrusive_ptr<const _Ty>{__data};
+}
+
+template <class _Ty>
+void __enable_intrusive_from_this<_Ty>::__inc_ref() noexcept
+{
+ auto* __data = (__control_block<_Ty>*)static_cast<_Ty*>(this);
+ __data->__inc_ref_();
+}
+
+template <class _Ty>
+void __enable_intrusive_from_this<_Ty>::__dec_ref() noexcept
+{
+ auto* __data = (__control_block<_Ty>*)static_cast<_Ty*>(this);
+ __data->__dec_ref_();
+}
template <class _Ty>
struct __make_intrusive_t
diff --git a/include/sdbusplus/async/stdexec/__detail/__meta.hpp b/include/sdbusplus/async/stdexec/__detail/__meta.hpp
index 14c5bf2..8ddc0de 100644
--- a/include/sdbusplus/async/stdexec/__detail/__meta.hpp
+++ b/include/sdbusplus/async/stdexec/__detail/__meta.hpp
@@ -401,11 +401,11 @@
requires(sizeof...(_False) <= 1)
using __if_c = __minvoke<__if_::__<_Pred>, _True, _False...>;
-template <class _Pred, class _True, class... _False>
-using __minvoke_if = __minvoke<__if<_Pred, _True, _False...>>;
+template <class _Pred, class _True, class _False, class... _Args>
+using __minvoke_if = __minvoke<__if<_Pred, _True, _False>, _Args...>;
-template <bool _Pred, class _True, class... _False>
-using __minvoke_if_c = __minvoke<__if_c<_Pred, _True, _False...>>;
+template <bool _Pred, class _True, class _False, class... _Args>
+using __minvoke_if_c = __minvoke<__if_c<_Pred, _True, _False>, _Args...>;
template <class _Tp>
struct __mconst
@@ -744,23 +744,11 @@
using __f = __minvoke<_Continuation, _Ts...>;
};
-// For hiding a template type parameter from ADL
-template <class _Ty>
-struct _Xp
-{
- using __t = struct _Up
- {
- using __t = _Ty;
- };
-};
-template <class _Ty>
-using __x = __t<_Xp<_Ty>>;
-
template <class _Ty>
concept __has_id = requires { typename _Ty::__id; };
template <class _Ty>
-struct _Yp
+struct _Id
{
using __t = _Ty;
@@ -783,7 +771,7 @@
struct __id_<false>
{
template <class _Ty>
- using __f = _Yp<_Ty>;
+ using __f = _Id<_Ty>;
};
template <class _Ty>
using __id = __minvoke<__id_<__has_id<_Ty>>, _Ty>;
diff --git a/include/sdbusplus/async/stdexec/__detail/__utility.hpp b/include/sdbusplus/async/stdexec/__detail/__utility.hpp
new file mode 100644
index 0000000..4d6a1c1
--- /dev/null
+++ b/include/sdbusplus/async/stdexec/__detail/__utility.hpp
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2023 NVIDIA Corporation
+ *
+ * Licensed under the Apache License Version 2.0 with LLVM Exceptions
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://llvm.org/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "__config.hpp"
+#include "__type_traits.hpp"
+
+#include <type_traits>
+
+namespace stdexec
+{
+namespace __detail
+{
+template <class _Cpcvref>
+inline constexpr auto __forward_like =
+ []<class _Uy>(_Uy&& __uy) noexcept -> auto&& {
+ return static_cast<
+ typename _Cpcvref::template __f<std::remove_reference_t<_Uy>>>(__uy);
+};
+}
+
+template <class _Ty>
+inline constexpr const auto& __forward_like =
+ __detail::__forward_like<__copy_cvref_fn<_Ty&&>>;
+} // namespace stdexec
diff --git a/include/sdbusplus/async/stdexec/async_scope.hpp b/include/sdbusplus/async/stdexec/async_scope.hpp
index 4109299..8c2483f 100644
--- a/include/sdbusplus/async/stdexec/async_scope.hpp
+++ b/include/sdbusplus/async/stdexec/async_scope.hpp
@@ -58,100 +58,104 @@
////////////////////////////////////////////////////////////////////////////
// async_scope::when_empty implementation
-template <class _ReceiverId>
-struct __when_empty_op_base : __task
-{
- using _Receiver = __t<_ReceiverId>;
- STDEXEC_ATTRIBUTE((no_unique_address)) _Receiver __rcvr_;
-};
-
template <class _ConstrainedId, class _ReceiverId>
-struct __when_empty_op : __task
+struct __when_empty_op
{
- using _Constrained = __t<_ConstrainedId>;
- using _Receiver = __t<_ReceiverId>;
+ using _Constrained = __cvref_t<_ConstrainedId>;
+ using _Receiver = stdexec::__t<_ReceiverId>;
- explicit __when_empty_op(const __impl* __scope, _Constrained&& __sndr,
- _Receiver __rcvr) :
- __task{{}, __scope, __notify_waiter},
- __op_(stdexec::connect((_Constrained&&)__sndr, (_Receiver&&)__rcvr))
- {}
-
- private:
- static void __notify_waiter(__task* __self) noexcept
+ struct __t : __task
{
- start(static_cast<__when_empty_op*>(__self)->__op_);
- }
+ using __id = __when_empty_op;
- void __start_() noexcept
- {
- std::unique_lock __guard{this->__scope_->__lock_};
- auto& __active = this->__scope_->__active_;
- auto& __waiters = this->__scope_->__waiters_;
- if (__active != 0)
+ explicit __t(const __impl* __scope, _Constrained&& __sndr,
+ _Receiver __rcvr) :
+ __task{{}, __scope, __notify_waiter},
+ __op_(stdexec::connect((_Constrained&&)__sndr, (_Receiver&&)__rcvr))
+ {}
+
+ private:
+ static void __notify_waiter(__task* __self) noexcept
{
- __waiters.push_back(this);
- return;
+ start(static_cast<__t*>(__self)->__op_);
}
- __guard.unlock();
- start(this->__op_);
- }
- friend void tag_invoke(start_t, __when_empty_op& __self) noexcept
- {
- return __self.__start_();
- }
+ void __start_() noexcept
+ {
+ std::unique_lock __guard{this->__scope_->__lock_};
+ auto& __active = this->__scope_->__active_;
+ auto& __waiters = this->__scope_->__waiters_;
+ if (__active != 0)
+ {
+ __waiters.push_back(this);
+ return;
+ }
+ __guard.unlock();
+ start(this->__op_);
+ }
- STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
- connect_result_t<_Constrained, _Receiver> __op_;
+ friend void tag_invoke(start_t, __t& __self) noexcept
+ {
+ return __self.__start_();
+ }
+
+ STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
+ connect_result_t<_Constrained, _Receiver> __op_;
+ };
};
template <class _ConstrainedId>
struct __when_empty_sender
{
- using _Constrained = __t<_ConstrainedId>;
- using sender_concept = stdexec::sender_t;
+ using _Constrained = stdexec::__t<_ConstrainedId>;
- template <class _Self, class _Receiver>
- using __when_empty_op_t =
- __when_empty_op<__x<__copy_cvref_t<_Self, _Constrained>>,
- __x<_Receiver>>;
-
- template <__decays_to<__when_empty_sender> _Self, receiver _Receiver>
- requires sender_to<__copy_cvref_t<_Self, _Constrained>, _Receiver>
- [[nodiscard]] friend __when_empty_op_t<_Self, _Receiver>
- tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
+ struct __t
{
- return __when_empty_op_t<_Self, _Receiver>{
- __self.__scope_, ((_Self&&)__self).__c_, (_Receiver&&)__rcvr};
- }
+ using __id = __when_empty_sender;
+ using sender_concept = stdexec::sender_t;
- template <__decays_to<__when_empty_sender> _Self, class _Env>
- friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
- -> completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
- __env_t<_Env>>
- {
- return {};
- }
+ template <class _Self, class _Receiver>
+ using __when_empty_op_t =
+ stdexec::__t<__when_empty_op<__cvref_id<_Self, _Constrained>,
+ stdexec::__id<_Receiver>>>;
- friend empty_env tag_invoke(get_env_t, const __when_empty_sender&) noexcept
- {
- return {};
- }
+ template <__decays_to<__t> _Self, receiver _Receiver>
+ requires sender_to<__copy_cvref_t<_Self, _Constrained>, _Receiver>
+ [[nodiscard]] friend __when_empty_op_t<_Self, _Receiver>
+ tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
+ {
+ return __when_empty_op_t<_Self, _Receiver>{
+ __self.__scope_, ((_Self&&)__self).__c_, (_Receiver&&)__rcvr};
+ }
- const __impl* __scope_;
- STDEXEC_ATTRIBUTE((no_unique_address)) _Constrained __c_;
+ template <__decays_to<__t> _Self, class _Env>
+ friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
+ -> completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
+ __env_t<_Env>>
+ {
+ return {};
+ }
+
+ friend empty_env tag_invoke(get_env_t, const __t&) noexcept
+ {
+ return {};
+ }
+
+ const __impl* __scope_;
+ STDEXEC_ATTRIBUTE((no_unique_address)) _Constrained __c_;
+ };
};
template <class _Constrained>
-using __when_empty_sender_t = __when_empty_sender<__x<__decay_t<_Constrained>>>;
+using __when_empty_sender_t =
+ stdexec::__t<__when_empty_sender<__id<__decay_t<_Constrained>>>>;
////////////////////////////////////////////////////////////////////////////
// async_scope::nest implementation
template <class _ReceiverId>
struct __nest_op_base : __immovable
{
- using _Receiver = __t<_ReceiverId>;
+ using _Receiver = stdexec::__t<_ReceiverId>;
const __impl* __scope_;
STDEXEC_ATTRIBUTE((no_unique_address)) _Receiver __rcvr_;
};
@@ -159,121 +163,140 @@
template <class _ReceiverId>
struct __nest_rcvr
{
- using receiver_concept = stdexec::receiver_t;
- using _Receiver = __t<_ReceiverId>;
- __nest_op_base<_ReceiverId>* __op_;
+ using _Receiver = stdexec::__t<_ReceiverId>;
- static void __complete(const __impl* __scope) noexcept
+ struct __t
{
- std::unique_lock __guard{__scope->__lock_};
- auto& __active = __scope->__active_;
- if (--__active == 0)
+ using __id = __nest_rcvr;
+ using receiver_concept = stdexec::receiver_t;
+ __nest_op_base<_ReceiverId>* __op_;
+
+ static void __complete(const __impl* __scope) noexcept
{
- auto __local = std::move(__scope->__waiters_);
- __guard.unlock();
- __scope = nullptr;
- // do not access __scope
- while (!__local.empty())
+ std::unique_lock __guard{__scope->__lock_};
+ auto& __active = __scope->__active_;
+ if (--__active == 0)
{
- auto* __next = __local.pop_front();
- __next->__notify_waiter(__next);
- // __scope must be considered deleted
+ auto __local = std::move(__scope->__waiters_);
+ __guard.unlock();
+ __scope = nullptr;
+ // do not access __scope
+ while (!__local.empty())
+ {
+ auto* __next = __local.pop_front();
+ __next->__notify_waiter(__next);
+ // __scope must be considered deleted
+ }
}
}
- }
- template <__completion_tag _Tag, class... _As>
- requires __callable<_Tag, _Receiver, _As...>
- friend void tag_invoke(_Tag, __nest_rcvr&& __self, _As&&... __as) noexcept
- {
- auto __scope = __self.__op_->__scope_;
- _Tag{}(std::move(__self.__op_->__rcvr_), (_As&&)__as...);
- // do not access __op_
- // do not access this
- __complete(__scope);
- }
+ template <__completion_tag _Tag, class... _As>
+ requires __callable<_Tag, _Receiver, _As...>
+ friend void tag_invoke(_Tag, __t&& __self, _As&&... __as) noexcept
+ {
+ auto __scope = __self.__op_->__scope_;
+ _Tag{}(std::move(__self.__op_->__rcvr_), (_As&&)__as...);
+ // do not access __op_
+ // do not access this
+ __complete(__scope);
+ }
- friend __env_t<env_of_t<_Receiver>>
- tag_invoke(get_env_t, const __nest_rcvr& __self) noexcept
- {
- return make_env(
- get_env(__self.__op_->__rcvr_),
- with(get_stop_token,
- __self.__op_->__scope_->__stop_source_.get_token()));
- }
+ friend __env_t<env_of_t<_Receiver>>
+ tag_invoke(get_env_t, const __t& __self) noexcept
+ {
+ return make_env(
+ get_env(__self.__op_->__rcvr_),
+ with(get_stop_token,
+ __self.__op_->__scope_->__stop_source_.get_token()));
+ }
+ };
};
template <class _ConstrainedId, class _ReceiverId>
-struct __nest_op : __nest_op_base<_ReceiverId>
+struct __nest_op
{
- using _Constrained = __t<_ConstrainedId>;
- using _Receiver = __t<_ReceiverId>;
- STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
- connect_result_t<_Constrained, __nest_rcvr<_ReceiverId>> __op_;
+ using _Constrained = stdexec::__t<_ConstrainedId>;
+ using _Receiver = stdexec::__t<_ReceiverId>;
- template <__decays_to<_Constrained> _Sender, __decays_to<_Receiver> _Rcvr>
- explicit __nest_op(const __impl* __scope, _Sender&& __c, _Rcvr&& __rcvr) :
- __nest_op_base<_ReceiverId>{{}, __scope, (_Rcvr&&)__rcvr},
- __op_(stdexec::connect((_Sender&&)__c, __nest_rcvr<_ReceiverId>{this}))
- {}
-
- private:
- void __start_() noexcept
+ struct __t : __nest_op_base<_ReceiverId>
{
- STDEXEC_ASSERT(this->__scope_);
- std::unique_lock __guard{this->__scope_->__lock_};
- auto& __active = this->__scope_->__active_;
- ++__active;
- __guard.unlock();
- start(__op_);
- }
+ using __id = __nest_op;
+ using __nest_rcvr_t = stdexec::__t<__nest_rcvr<_ReceiverId>>;
+ STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
+ connect_result_t<_Constrained, __nest_rcvr_t> __op_;
- friend void tag_invoke(start_t, __nest_op& __self) noexcept
- {
- return __self.__start_();
- }
+ template <__decays_to<_Constrained> _Sender,
+ __decays_to<_Receiver> _Rcvr>
+ explicit __t(const __impl* __scope, _Sender&& __c, _Rcvr&& __rcvr) :
+ __nest_op_base<_ReceiverId>{{}, __scope, (_Rcvr&&)__rcvr},
+ __op_(stdexec::connect((_Sender&&)__c, __nest_rcvr_t{this}))
+ {}
+
+ private:
+ void __start_() noexcept
+ {
+ STDEXEC_ASSERT(this->__scope_);
+ std::unique_lock __guard{this->__scope_->__lock_};
+ auto& __active = this->__scope_->__active_;
+ ++__active;
+ __guard.unlock();
+ start(__op_);
+ }
+
+ friend void tag_invoke(start_t, __t& __self) noexcept
+ {
+ return __self.__start_();
+ }
+ };
};
template <class _ConstrainedId>
struct __nest_sender
{
- using _Constrained = __t<_ConstrainedId>;
- using sender_concept = stdexec::sender_t;
-
- const __impl* __scope_;
- STDEXEC_ATTRIBUTE((no_unique_address)) _Constrained __c_;
-
- template <class _Receiver>
- using __nest_operation_t = __nest_op<_ConstrainedId, __x<_Receiver>>;
- template <class _Receiver>
- using __nest_receiver_t = __nest_rcvr<__x<_Receiver>>;
-
- template <__decays_to<__nest_sender> _Self, receiver _Receiver>
- requires sender_to<__copy_cvref_t<_Self, _Constrained>,
- __nest_receiver_t<_Receiver>>
- [[nodiscard]] friend __nest_operation_t<_Receiver>
- tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
+ using _Constrained = stdexec::__t<_ConstrainedId>;
+ struct __t
{
- return __nest_operation_t<_Receiver>{
- __self.__scope_, ((_Self&&)__self).__c_, (_Receiver&&)__rcvr};
- }
+ using __id = __nest_sender;
+ using sender_concept = stdexec::sender_t;
- template <__decays_to<__nest_sender> _Self, class _Env>
- friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
- -> completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
- __env_t<_Env>>
- {
- return {};
- }
+ const __impl* __scope_;
+ STDEXEC_ATTRIBUTE((no_unique_address)) _Constrained __c_;
- friend empty_env tag_invoke(get_env_t, const __nest_sender&) noexcept
- {
- return {};
- }
+ template <class _Receiver>
+ using __nest_operation_t =
+ stdexec::__t<__nest_op<_ConstrainedId, stdexec::__id<_Receiver>>>;
+ template <class _Receiver>
+ using __nest_receiver_t =
+ stdexec::__t<__nest_rcvr<stdexec::__id<_Receiver>>>;
+
+ template <__decays_to<__t> _Self, receiver _Receiver>
+ requires sender_to<__copy_cvref_t<_Self, _Constrained>,
+ __nest_receiver_t<_Receiver>>
+ [[nodiscard]] friend __nest_operation_t<_Receiver>
+ tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
+ {
+ return __nest_operation_t<_Receiver>{
+ __self.__scope_, ((_Self&&)__self).__c_, (_Receiver&&)__rcvr};
+ }
+
+ template <__decays_to<__t> _Self, class _Env>
+ friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
+ -> completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
+ __env_t<_Env>>
+ {
+ return {};
+ }
+
+ friend empty_env tag_invoke(get_env_t, const __t&) noexcept
+ {
+ return {};
+ }
+ };
};
template <class _Constrained>
-using __nest_sender_t = __nest_sender<__x<__decay_t<_Constrained>>>;
+using __nest_sender_t =
+ stdexec::__t<__nest_sender<__id<__decay_t<_Constrained>>>>;
////////////////////////////////////////////////////////////////////////////
// async_scope::spawn_future implementation
@@ -312,130 +335,134 @@
};
template <class _SenderId, class _EnvId, class _ReceiverId>
-class __future_op : __subscription
+struct __future_op
{
- using _Sender = __t<_SenderId>;
- using _Env = __t<_EnvId>;
- using _Receiver = __t<_ReceiverId>;
+ using _Sender = stdexec::__t<_SenderId>;
+ using _Env = stdexec::__t<_EnvId>;
+ using _Receiver = stdexec::__t<_ReceiverId>;
- using __forward_consumer = typename stop_token_of_t<
- env_of_t<_Receiver>>::template callback_type<__forward_stopped>;
-
- friend void tag_invoke(start_t, __future_op& __self) noexcept
+ class __t : __subscription
{
- __self.__start_();
- }
+ using __forward_consumer = typename stop_token_of_t<
+ env_of_t<_Receiver>>::template callback_type<__forward_stopped>;
- void __complete_() noexcept
- {
- try
+ friend void tag_invoke(start_t, __t& __self) noexcept
{
- auto __state = std::move(__state_);
- STDEXEC_ASSERT(__state != nullptr);
- std::unique_lock __guard{__state->__mutex_};
- // either the future is still in use or it has passed ownership to
- // __state->__no_future_
- if (__state->__no_future_.get() != nullptr ||
- __state->__step_ != __future_step::__future)
- {
- // invalid state - there is a code bug in the state machine
- std::terminate();
- }
- else if (get_stop_token(get_env(__rcvr_)).stop_requested())
- {
- __guard.unlock();
- set_stopped((_Receiver&&)__rcvr_);
- __guard.lock();
- }
- else
- {
- std::visit(
- [this, &__guard]<class _Tup>(_Tup& __tup) {
- if constexpr (same_as<_Tup, std::monostate>)
- {
- std::terminate();
- }
- else
- {
- std::apply(
- [this, &__guard]<class... _As>(auto tag,
- _As&... __as) {
- __guard.unlock();
- tag((_Receiver&&)__rcvr_, (_As&&)__as...);
- __guard.lock();
- },
- __tup);
- }
- },
- __state->__data_);
- }
+ __self.__start_();
}
- catch (...)
- {
- set_error((_Receiver&&)__rcvr_, std::current_exception());
- }
- }
- void __start_() noexcept
- {
- try
+ void __complete_() noexcept
{
- if (!!__state_)
+ try
{
- std::unique_lock __guard{__state_->__mutex_};
- if (__state_->__data_.index() != 0)
+ auto __state = std::move(__state_);
+ STDEXEC_ASSERT(__state != nullptr);
+ std::unique_lock __guard{__state->__mutex_};
+ // either the future is still in use or it has passed ownership
+ // to __state->__no_future_
+ if (__state->__no_future_.get() != nullptr ||
+ __state->__step_ != __future_step::__future)
+ {
+ // invalid state - there is a code bug in the state machine
+ std::terminate();
+ }
+ else if (get_stop_token(get_env(__rcvr_)).stop_requested())
{
__guard.unlock();
- __complete_();
+ set_stopped((_Receiver&&)__rcvr_);
+ __guard.lock();
}
else
{
- __state_->__subscribers_.push_back(this);
+ std::visit(
+ [this, &__guard]<class _Tup>(_Tup& __tup) {
+ if constexpr (same_as<_Tup, std::monostate>)
+ {
+ std::terminate();
+ }
+ else
+ {
+ std::apply(
+ [this, &__guard]<class... _As>(auto tag,
+ _As&... __as) {
+ __guard.unlock();
+ tag((_Receiver&&)__rcvr_, (_As&&)__as...);
+ __guard.lock();
+ },
+ __tup);
+ }
+ },
+ __state->__data_);
}
}
- }
- catch (...)
- {
- set_error((_Receiver&&)__rcvr_, std::current_exception());
- }
- }
-
- STDEXEC_ATTRIBUTE((no_unique_address)) _Receiver __rcvr_;
- std::unique_ptr<__future_state<_Sender, _Env>> __state_;
- STDEXEC_ATTRIBUTE((no_unique_address))
- __forward_consumer __forward_consumer_;
-
- public:
- ~__future_op() noexcept
- {
- if (__state_ != nullptr)
- {
- auto __raw_state = __state_.get();
- std::unique_lock __guard{__raw_state->__mutex_};
- if (__raw_state->__data_.index() > 0)
+ catch (...)
{
- // completed given sender
- // state is no longer needed
- return;
+ set_error((_Receiver&&)__rcvr_, std::current_exception());
}
- __raw_state->__no_future_ = std::move(__state_);
- __raw_state->__step_from_to_(__guard, __future_step::__future,
- __future_step::__no_future);
}
- }
- template <class _Receiver2>
- explicit __future_op(
- _Receiver2&& __rcvr,
- std::unique_ptr<__future_state<_Sender, _Env>> __state) :
- __subscription{{},
- [](__subscription* __self) noexcept -> void {
- static_cast<__future_op*>(__self)->__complete_();
- }},
- __rcvr_((_Receiver2&&)__rcvr), __state_(std::move(__state)),
- __forward_consumer_(get_stop_token(get_env(__rcvr_)),
- __forward_stopped{&__state_->__stop_source_})
- {}
+ void __start_() noexcept
+ {
+ try
+ {
+ if (!!__state_)
+ {
+ std::unique_lock __guard{__state_->__mutex_};
+ if (__state_->__data_.index() != 0)
+ {
+ __guard.unlock();
+ __complete_();
+ }
+ else
+ {
+ __state_->__subscribers_.push_back(this);
+ }
+ }
+ }
+ catch (...)
+ {
+ set_error((_Receiver&&)__rcvr_, std::current_exception());
+ }
+ }
+
+ STDEXEC_ATTRIBUTE((no_unique_address)) _Receiver __rcvr_;
+ std::unique_ptr<__future_state<_Sender, _Env>> __state_;
+ STDEXEC_ATTRIBUTE((no_unique_address))
+ __forward_consumer __forward_consumer_;
+
+ public:
+ using __id = __future_op;
+
+ ~__t() noexcept
+ {
+ if (__state_ != nullptr)
+ {
+ auto __raw_state = __state_.get();
+ std::unique_lock __guard{__raw_state->__mutex_};
+ if (__raw_state->__data_.index() > 0)
+ {
+ // completed given sender
+ // state is no longer needed
+ return;
+ }
+ __raw_state->__no_future_ = std::move(__state_);
+ __raw_state->__step_from_to_(__guard, __future_step::__future,
+ __future_step::__no_future);
+ }
+ }
+
+ template <class _Receiver2>
+ explicit __t(_Receiver2&& __rcvr,
+ std::unique_ptr<__future_state<_Sender, _Env>> __state) :
+ __subscription{{},
+ [](__subscription* __self) noexcept -> void {
+ static_cast<__t*>(__self)->__complete_();
+ }},
+ __rcvr_((_Receiver2&&)__rcvr), __state_(std::move(__state)),
+ __forward_consumer_(get_stop_token(get_env(__rcvr_)),
+ __forward_stopped{&__state_->__stop_source_})
+ {}
+ };
};
#if STDEXEC_NVHPC()
@@ -448,7 +475,7 @@
using __t = std::tuple<_Tag, _Ts...>;
};
template <class _Fn>
-using __completion_as_tuple_t = __t<__completion_as_tuple2_<_Fn>>;
+using __completion_as_tuple_t = stdexec::__t<__completion_as_tuple2_<_Fn>>;
#else
@@ -555,69 +582,74 @@
__env_t<_Env> __env_;
};
-template <class _CompletionsId, class _EnvId>
+template <class _Completions, class _EnvId>
struct __future_rcvr
{
- using receiver_concept = stdexec::receiver_t;
- using _Completions = __t<_CompletionsId>;
- using _Env = __t<_EnvId>;
- __future_state_base<_Completions, _Env>* __state_;
- const __impl* __scope_;
+ using _Env = stdexec::__t<_EnvId>;
- void __dispatch_result_() noexcept
+ struct __t
{
- auto& __state = *__state_;
- std::unique_lock __guard{__state.__mutex_};
- auto __local = std::move(__state.__subscribers_);
- __state.__forward_scope_ = std::nullopt;
- if (__state.__no_future_.get() != nullptr)
- {
- // nobody is waiting for the results
- // delete this and return
- __state.__step_from_to_(__guard, __future_step::__no_future,
- __future_step::__deleted);
- __guard.unlock();
- __state.__no_future_.reset();
- return;
- }
- __guard.unlock();
- while (!__local.empty())
- {
- auto* __sub = __local.pop_front();
- __sub->__complete();
- }
- }
+ using __id = __future_rcvr;
+ using receiver_concept = stdexec::receiver_t;
+ __future_state_base<_Completions, _Env>* __state_;
+ const __impl* __scope_;
- template <__completion_tag _Tag, __movable_value... _As>
- friend void tag_invoke(_Tag, __future_rcvr&& __self, _As&&... __as) noexcept
- {
- auto& __state = *__self.__state_;
- try
+ void __dispatch_result_() noexcept
{
+ auto& __state = *__state_;
std::unique_lock __guard{__state.__mutex_};
- using _Tuple = __decayed_tuple<_Tag, _As...>;
- __state.__data_.template emplace<_Tuple>(_Tag{}, (_As&&)__as...);
+ auto __local = std::move(__state.__subscribers_);
+ __state.__forward_scope_ = std::nullopt;
+ if (__state.__no_future_.get() != nullptr)
+ {
+ // nobody is waiting for the results
+ // delete this and return
+ __state.__step_from_to_(__guard, __future_step::__no_future,
+ __future_step::__deleted);
+ __guard.unlock();
+ __state.__no_future_.reset();
+ return;
+ }
__guard.unlock();
- __self.__dispatch_result_();
+ while (!__local.empty())
+ {
+ auto* __sub = __local.pop_front();
+ __sub->__complete();
+ }
}
- catch (...)
- {
- using _Tuple = std::tuple<set_error_t, std::exception_ptr>;
- __state.__data_.template emplace<_Tuple>(set_error_t{},
- std::current_exception());
- }
- }
- friend const __env_t<_Env>& tag_invoke(get_env_t,
- const __future_rcvr& __self) noexcept
- {
- return __self.__state_->__env_;
- }
+ template <__completion_tag _Tag, __movable_value... _As>
+ friend void tag_invoke(_Tag, __t&& __self, _As&&... __as) noexcept
+ {
+ auto& __state = *__self.__state_;
+ try
+ {
+ std::unique_lock __guard{__state.__mutex_};
+ using _Tuple = __decayed_tuple<_Tag, _As...>;
+ __state.__data_.template emplace<_Tuple>(_Tag{},
+ (_As&&)__as...);
+ __guard.unlock();
+ __self.__dispatch_result_();
+ }
+ catch (...)
+ {
+ using _Tuple = std::tuple<set_error_t, std::exception_ptr>;
+ __state.__data_.template emplace<_Tuple>(
+ set_error_t{}, std::current_exception());
+ }
+ }
+
+ friend const __env_t<_Env>& tag_invoke(get_env_t,
+ const __t& __self) noexcept
+ {
+ return __self.__state_->__env_;
+ }
+ };
};
template <class _Sender, class _Env>
using __future_receiver_t =
- __future_rcvr<__x<__future_completions_t<_Sender, _Env>>, __x<_Env>>;
+ __t<__future_rcvr<__future_completions_t<_Sender, _Env>, __id<_Env>>>;
template <class _Sender, class _Env>
struct __future_state :
@@ -636,77 +668,84 @@
};
template <class _SenderId, class _EnvId>
-class __future
+struct __future
{
- using _Sender = __t<_SenderId>;
- using _Env = __t<_EnvId>;
- friend struct async_scope;
+ using _Sender = stdexec::__t<_SenderId>;
+ using _Env = stdexec::__t<_EnvId>;
- public:
- using sender_concept = stdexec::sender_t;
-
- __future(__future&&) = default;
- __future& operator=(__future&&) = default;
-
- ~__future() noexcept
+ struct __t
{
- if (__state_ != nullptr)
+ using __id = __future;
+ using sender_concept = stdexec::sender_t;
+
+ __t(__t&&) = default;
+ __t& operator=(__t&&) = default;
+
+ ~__t() noexcept
{
- auto __raw_state = __state_.get();
- std::unique_lock __guard{__raw_state->__mutex_};
- if (__raw_state->__data_.index() != 0)
+ if (__state_ != nullptr)
{
- // completed given sender
- // state is no longer needed
- return;
+ auto __raw_state = __state_.get();
+ std::unique_lock __guard{__raw_state->__mutex_};
+ if (__raw_state->__data_.index() != 0)
+ {
+ // completed given sender
+ // state is no longer needed
+ return;
+ }
+ __raw_state->__no_future_ = std::move(__state_);
+ __raw_state->__step_from_to_(__guard, __future_step::__future,
+ __future_step::__no_future);
}
- __raw_state->__no_future_ = std::move(__state_);
- __raw_state->__step_from_to_(__guard, __future_step::__future,
- __future_step::__no_future);
}
- }
- private:
- template <class _Self>
- using __completions_t =
- __future_completions_t<__mfront<_Sender, _Self>, _Env>;
+ private:
+ friend struct async_scope;
+ template <class _Self>
+ using __completions_t =
+ __future_completions_t<__mfront<_Sender, _Self>, _Env>;
- explicit __future(
- std::unique_ptr<__future_state<_Sender, _Env>> __state) noexcept :
- __state_(std::move(__state))
- {
- std::unique_lock __guard{__state_->__mutex_};
- __state_->__step_from_to_(__guard, __future_step::__created,
- __future_step::__future);
- }
+ template <class _Receiver>
+ using __future_op_t = stdexec::__t<
+ __future_op<_SenderId, _EnvId, stdexec::__id<_Receiver>>>;
- template <__decays_to<__future> _Self, receiver _Receiver>
- requires receiver_of<_Receiver, __completions_t<_Self>>
- friend __future_op<_SenderId, _EnvId, __x<_Receiver>>
- tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
- {
- return __future_op<_SenderId, _EnvId, __x<_Receiver>>{
- (_Receiver&&)__rcvr, std::move(__self.__state_)};
- }
+ explicit __t(
+ std::unique_ptr<__future_state<_Sender, _Env>> __state) noexcept :
+ __state_(std::move(__state))
+ {
+ std::unique_lock __guard{__state_->__mutex_};
+ __state_->__step_from_to_(__guard, __future_step::__created,
+ __future_step::__future);
+ }
- template <__decays_to<__future> _Self, class _OtherEnv>
- friend auto tag_invoke(get_completion_signatures_t, _Self&&, _OtherEnv&&)
- -> __completions_t<_Self>
- {
- return {};
- }
+ template <__decays_to<__t> _Self, receiver _Receiver>
+ requires receiver_of<_Receiver, __completions_t<_Self>>
+ friend __future_op_t<_Receiver> tag_invoke(connect_t, _Self&& __self,
+ _Receiver __rcvr)
+ {
+ return __future_op_t<_Receiver>{(_Receiver&&)__rcvr,
+ std::move(__self.__state_)};
+ }
- friend empty_env tag_invoke(get_env_t, const __future&) noexcept
- {
- return {};
- }
+ template <__decays_to<__t> _Self, class _OtherEnv>
+ friend auto tag_invoke(get_completion_signatures_t, _Self&&,
+ _OtherEnv&&) -> __completions_t<_Self>
+ {
+ return {};
+ }
- std::unique_ptr<__future_state<_Sender, _Env>> __state_;
+ friend empty_env tag_invoke(get_env_t, const __t&) noexcept
+ {
+ return {};
+ }
+
+ std::unique_ptr<__future_state<_Sender, _Env>> __state_;
+ };
};
template <class _Sender, class _Env>
-using __future_t =
- __future<__x<__nest_sender_t<_Sender>>, __x<__decay_t<_Env>>>;
+using __future_t = stdexec::__t<
+ __future<__id<__nest_sender_t<_Sender>>, __id<__decay_t<_Env>>>>;
////////////////////////////////////////////////////////////////////////////
// async_scope::spawn implementation
@@ -719,7 +758,7 @@
template <class _EnvId>
struct __spawn_op_base
{
- using _Env = __t<_EnvId>;
+ using _Env = stdexec::__t<_EnvId>;
__spawn_env_t<_Env> __env_;
void (*__delete_)(__spawn_op_base*);
};
@@ -727,70 +766,77 @@
template <class _EnvId>
struct __spawn_rcvr
{
- using receiver_concept = stdexec::receiver_t;
- using _Env = __t<_EnvId>;
- __spawn_op_base<_EnvId>* __op_;
- const __impl* __scope_;
+ using _Env = stdexec::__t<_EnvId>;
- template <__one_of<set_value_t, set_stopped_t> _Tag>
- friend void tag_invoke(_Tag, __spawn_rcvr&& __self) noexcept
+ struct __t
{
- __self.__op_->__delete_(__self.__op_);
- }
+ using __id = __spawn_rcvr;
+ using receiver_concept = stdexec::receiver_t;
+ __spawn_op_base<_EnvId>* __op_;
- // BUGBUG NOT TO SPEC spawn shouldn't accept senders that can fail.
- template <same_as<set_error_t> _Tag>
- [[noreturn]] friend void tag_invoke(_Tag, __spawn_rcvr&&,
- const std::exception_ptr&) noexcept
- {
- std::terminate();
- }
+ template <__one_of<set_value_t, set_stopped_t> _Tag>
+ friend void tag_invoke(_Tag, __t&& __self) noexcept
+ {
+ __self.__op_->__delete_(__self.__op_);
+ }
- friend const __spawn_env_t<_Env>&
- tag_invoke(get_env_t, const __spawn_rcvr& __self) noexcept
- {
- return __self.__op_->__env_;
- }
+ // BUGBUG NOT TO SPEC spawn shouldn't accept senders that can fail.
+ template <same_as<set_error_t> _Tag>
+ [[noreturn]] friend void tag_invoke(_Tag, __t&&,
+ const std::exception_ptr&) noexcept
+ {
+ std::terminate();
+ }
+
+ friend const __spawn_env_t<_Env>& tag_invoke(get_env_t,
+ const __t& __self) noexcept
+ {
+ return __self.__op_->__env_;
+ }
+ };
};
template <class _Env>
-using __spawn_receiver_t = __spawn_rcvr<__x<_Env>>;
+using __spawn_receiver_t = stdexec::__t<__spawn_rcvr<__id<_Env>>>;
template <class _SenderId, class _EnvId>
-struct __spawn_op : __spawn_op_base<_EnvId>
+struct __spawn_op
{
- using _Env = __t<_EnvId>;
- using _Sender = __t<_SenderId>;
+ using _Env = stdexec::__t<_EnvId>;
+ using _Sender = stdexec::__t<_SenderId>;
- template <__decays_to<_Sender> _Sndr>
- __spawn_op(_Sndr&& __sndr, _Env __env, const __impl* __scope) :
- __spawn_op_base<_EnvId>{
- __join_env(
- (_Env&&)__env,
- __mkprop(__scope->__stop_source_.get_token(), get_stop_token),
- __mkprop(__inln::__scheduler{}, get_scheduler)),
- [](__spawn_op_base<_EnvId>* __op) {
- delete static_cast<__spawn_op*>(__op);
- }},
- __op_(stdexec::connect((_Sndr&&)__sndr,
- __spawn_receiver_t<_Env>{this, __scope}))
- {}
-
- void __start_() noexcept
+ struct __t : __spawn_op_base<_EnvId>
{
- start(__op_);
- }
+ template <__decays_to<_Sender> _Sndr>
+ __t(_Sndr&& __sndr, _Env __env, const __impl* __scope) :
+ __spawn_op_base<_EnvId>{
+ __join_env((_Env&&)__env,
+ __mkprop(__scope->__stop_source_.get_token(),
+ get_stop_token),
+ __mkprop(__inln::__scheduler{}, get_scheduler)),
+ [](__spawn_op_base<_EnvId>* __op) {
+ delete static_cast<__t*>(__op);
+ }},
+ __op_(stdexec::connect((_Sndr&&)__sndr,
+ __spawn_receiver_t<_Env>{this}))
+ {}
- friend void tag_invoke(start_t, __spawn_op& __self) noexcept
- {
- return __self.__start_();
- }
+ void __start_() noexcept
+ {
+ start(__op_);
+ }
- connect_result_t<_Sender, __spawn_receiver_t<_Env>> __op_;
+ friend void tag_invoke(start_t, __t& __self) noexcept
+ {
+ return __self.__start_();
+ }
+
+ connect_result_t<_Sender, __spawn_receiver_t<_Env>> __op_;
+ };
};
template <class _Sender, class _Env>
-using __spawn_operation_t = __spawn_op<__x<_Sender>, __x<_Env>>;
+using __spawn_operation_t = stdexec::__t<__spawn_op<__id<_Sender>, __id<_Env>>>;
////////////////////////////////////////////////////////////////////////////
// async_scope
diff --git a/include/sdbusplus/async/stdexec/commit.info b/include/sdbusplus/async/stdexec/commit.info
index 4ec6e44..6788e94 100644
--- a/include/sdbusplus/async/stdexec/commit.info
+++ b/include/sdbusplus/async/stdexec/commit.info
@@ -1 +1 @@
-e05f158445ef957dc53731607d417ec6a7d834d7
+98f1f7a18dba497859b7705ce60d31e0ba3d204d
diff --git a/include/sdbusplus/async/stdexec/env.hpp b/include/sdbusplus/async/stdexec/env.hpp
index 32532d6..c77176f 100644
--- a/include/sdbusplus/async/stdexec/env.hpp
+++ b/include/sdbusplus/async/stdexec/env.hpp
@@ -58,40 +58,49 @@
struct read_with_default_t;
template <class _Tag, class _DefaultId, class _ReceiverId>
-struct __operation : __immovable
+struct __operation
{
- using _Default = __t<_DefaultId>;
- using _Receiver = __t<_ReceiverId>;
+ using _Default = stdexec::__t<_DefaultId>;
+ using _Receiver = stdexec::__t<_ReceiverId>;
- STDEXEC_ATTRIBUTE((no_unique_address)) _Default __default_;
- _Receiver __rcvr_;
-
- friend void tag_invoke(start_t, __operation& __self) noexcept
+ struct __t : __immovable
{
- try
+ using __id = __operation;
+
+ STDEXEC_ATTRIBUTE((no_unique_address)) _Default __default_;
+ _Receiver __rcvr_;
+
+ friend void tag_invoke(start_t, __t& __self) noexcept
{
- if constexpr (__callable<_Tag, env_of_t<_Receiver>>)
+ try
{
- const auto& __env = get_env(__self.__rcvr_);
- set_value(std::move(__self.__rcvr_), _Tag{}(__env));
+ if constexpr (__callable<_Tag, env_of_t<_Receiver>>)
+ {
+ const auto& __env = get_env(__self.__rcvr_);
+ set_value(std::move(__self.__rcvr_), _Tag{}(__env));
+ }
+ else
+ {
+ set_value(std::move(__self.__rcvr_),
+ std::move(__self.__default_));
+ }
}
- else
+ catch (...)
{
- set_value(std::move(__self.__rcvr_),
- std::move(__self.__default_));
+ set_error(std::move(__self.__rcvr_), std::current_exception());
}
}
- catch (...)
- {
- set_error(std::move(__self.__rcvr_), std::current_exception());
- }
- }
+ };
};
-template <class _Tag, class _DefaultId>
+template <class _Tag, class _Default, class _Receiver>
+using __operation_t = __t<__operation<_Tag, __id<_Default>, __id<_Receiver>>>;
+
+template <class _Tag, class _Default>
struct __sender
{
- using _Default = __t<_DefaultId>;
+ using __id = __sender;
+ using __t = __sender;
using sender_concept = stdexec::sender_t;
STDEXEC_ATTRIBUTE((no_unique_address)) _Default __default_;
@@ -109,8 +118,7 @@
requires receiver_of<_Receiver, __completions_t<env_of_t<_Receiver>>>
friend auto tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr) //
noexcept(std::is_nothrow_move_constructible_v<_Receiver>)
- -> __operation<_Tag, __x<__default_t<env_of_t<_Receiver>>>,
- __x<_Receiver>>
+ -> __operation_t<_Tag, __default_t<env_of_t<_Receiver>>, _Receiver>
{
return {{}, ((_Self&&)__self).__default_, (_Receiver&&)__rcvr};
}
@@ -127,7 +135,7 @@
{
template <class _Tag, class _Default>
constexpr auto operator()(_Tag, _Default&& __default) const
- -> __sender<_Tag, __x<__decay_t<_Default>>>
+ -> __sender<_Tag, __decay_t<_Default>>
{
return {(_Default&&)__default};
}
diff --git a/include/sdbusplus/async/stdexec/execution.hpp b/include/sdbusplus/async/stdexec/execution.hpp
index dce1a9f..87abcd9 100644
--- a/include/sdbusplus/async/stdexec/execution.hpp
+++ b/include/sdbusplus/async/stdexec/execution.hpp
@@ -22,6 +22,7 @@
#include "__detail/__intrusive_ptr.hpp"
#include "__detail/__meta.hpp"
#include "__detail/__scope.hpp"
+#include "__detail/__utility.hpp"
#include "concepts.hpp"
#include "coroutine.hpp"
#include "functional.hpp"
@@ -41,8 +42,6 @@
#include <variant>
STDEXEC_PRAGMA_PUSH()
-STDEXEC_PRAGMA_IGNORE_GNU("-Wpragmas")
-STDEXEC_PRAGMA_IGNORE_GNU("-Wunknown-warning-option")
STDEXEC_PRAGMA_IGNORE_GNU("-Wundefined-inline")
STDEXEC_PRAGMA_IGNORE_GNU("-Wsubobject-linkage")
STDEXEC_PRAGMA_IGNORE_GNU("-Wmissing-braces")
@@ -871,9 +870,17 @@
static_assert(sizeof(_Env),
"Incomplete type used with get_completion_signatures");
- if constexpr (__with_tag_invoke<_Sender, _Env>)
+ // Compute the type of the transformed sender:
+ using _TfxSender = __tfx_sender<_Sender, _Env>;
+
+ if constexpr (__merror<_TfxSender>)
{
- using _TfxSender = __tfx_sender<_Sender, _Env>;
+ // Computing the type of the transformed sender returned an error
+ // type. Propagate it.
+ return (_TfxSender(*)()) nullptr;
+ }
+ else if constexpr (__with_tag_invoke<_Sender, _Env>)
+ {
using _Result = tag_invoke_result_t<get_completion_signatures_t,
_TfxSender, _Env>;
return (_Result(*)()) nullptr;
@@ -3019,6 +3026,10 @@
} // namespace __then
using __then::then_t;
+
+/// @brief The then sender adaptor, which invokes a function with the result of
+/// a sender, making the result available to the next receiver.
+/// @hideinitializer
inline constexpr then_t then{};
template <>
@@ -3325,9 +3336,32 @@
{};
////////////////////////////////////////////////////////////////////////////
-// [execution.senders.adaptors.split]
-namespace __split
+// shared components of split and ensure_started
+//
+// The split and ensure_started algorithms are very similar in implementation.
+// The salient differences are:
+//
+// split: the input async operation is always connected. It is only
+// started when one of the split senders is connected and started.
+// split senders are copyable, so there are multiple operation states
+// to be notified on completion. These are stored in an instrusive
+// linked list.
+//
+// ensure_started: the input async operation is always started, so
+// the internal receiver will always be completed. The ensure_started
+// sender is move-only and single-shot, so there will only ever be one
+// operation state to be notified on completion.
+//
+// The shared state should add-ref itself when the input async
+// operation is started and release itself when its completion
+// is notified.
+namespace __shared
{
+template <class _BaseEnv>
+using __env_t = //
+ __make_env_t<_BaseEnv, // BUGBUG NOT TO SPEC
+ __with<get_stop_token_t, in_place_stop_token>>;
+
struct __on_stop_request
{
in_place_stop_source& __stop_source_;
@@ -3341,58 +3375,107 @@
template <class _Receiver>
auto __notify_visitor(_Receiver&& __rcvr) noexcept
{
- return [&](const auto& __tupl) noexcept -> void {
+ return [&]<class _Tuple>(_Tuple&& __tupl) noexcept -> void {
__apply(
- [&](auto __tag, const auto&... __args) noexcept -> void {
- __tag(std::move(__rcvr), __args...);
+ [&](auto __tag, auto&&... __args) noexcept -> void {
+ __tag(std::move(__rcvr), __forward_like<_Tuple>(__args)...);
},
__tupl);
};
}
-struct __split_state_base : __immovable
+enum class __action_kind : bool
{
- using __notify_fn = void(__split_state_base*) noexcept;
-
- __split_state_base* __next_{};
- __notify_fn* __notify_{};
+ __notify,
+ __detach
};
-template <class _Sender, class _Receiver>
-struct __split_state :
- __split_state_base,
- __enable_receiver_from_this<_Sender, _Receiver>
+struct __local_state_base : __immovable
{
- using __shared_state_ptr = __decay_t<__data_of<_Sender>>;
- using __on_stop_cb = //
- typename stop_token_of_t<env_of_t<_Receiver>&>::template callback_type<
- __on_stop_request>;
+ using __action_fn = void(__local_state_base*, __action_kind) noexcept;
- explicit __split_state(_Sender&& __sndr) noexcept :
- __split_state_base{{}, nullptr, __notify}, __on_stop_(),
- __shared_state_(STDEXEC_CALL_EXPLICIT_THIS_MEMFN(
- (_Sender&&)__sndr, apply)(__detail::__get_data()))
- {}
-
- static void __notify(__split_state_base* __self) noexcept
- {
- __split_state* __op = static_cast<__split_state*>(__self);
- __op->__on_stop_.reset();
- std::visit(__split::__notify_visitor(__op->__receiver()),
- __op->__shared_state_->__data_);
- }
-
- std::optional<__on_stop_cb> __on_stop_;
- __shared_state_ptr __shared_state_;
+ __action_fn* __action_{};
+ __local_state_base* __next_{};
};
template <class _CvrefSender, class _Env>
-struct __sh_state;
+struct __shared_state;
-template <class _BaseEnv>
-using __env_t = //
- __make_env_t<_BaseEnv, // BUGBUG NOT TO SPEC
- __with<get_stop_token_t, in_place_stop_token>>;
+// Each operation state of a split sender has one of these,
+// created when a split sender is connected. There are 0 or
+// more of them per input async operation. It is what
+// the split sender's `get_state` fn returns. It holds a
+// reference to the shared state of the input async operation.
+template <class _CvrefSender, class _Receiver>
+struct __local_state :
+ __local_state_base,
+ __enable_receiver_from_this<_CvrefSender, _Receiver>
+{
+ using __data_t = __decay_t<__data_of<_CvrefSender>>;
+ using __shared_state_t = __mapply<__q<__mfront>, __data_t>;
+ using __on_stop_cb_t = //
+ typename stop_token_of_t<env_of_t<_Receiver>&>::template callback_type<
+ __on_stop_request>;
+ using __tag_t = tag_of_t<_CvrefSender>;
+ static_assert(__one_of<__tag_t, __split::__split_t,
+ __ensure_started::__ensure_started_t>);
+
+ explicit __local_state(_CvrefSender&& __sndr) noexcept :
+ __local_state::__local_state_base{{},
+ &__action<tag_of_t<_CvrefSender>>},
+ __shared_state_(
+ STDEXEC_CALL_EXPLICIT_THIS_MEMFN((_CvrefSender&&)__sndr,
+ apply)(__detail::__get_data())
+ .__shared_state)
+ {}
+
+ ~__local_state()
+ {
+ __action_(this, __action_kind::__detach);
+ }
+
+ // This is called when the input async operation completes; or,
+ // if it has already completed when start is called, it is called
+ // from start:
+ template <class _Tag>
+ static void __action(__local_state_base* __self,
+ __action_kind __kind) noexcept
+ {
+ __local_state* const __op = static_cast<__local_state*>(__self);
+ if (__kind == __action_kind::__notify)
+ {
+ __op->__on_stop_.reset();
+
+ // The split algorithm sends by T const&. ensure_started sends by
+ // T&&.
+ if constexpr (same_as<__split::__split_t, _Tag>)
+ {
+ std::visit(__notify_visitor(__op->__receiver()),
+ std::as_const(__op->__shared_state_->__data_));
+ }
+ else
+ {
+ std::visit(__notify_visitor(__op->__receiver()),
+ std::move(__op->__shared_state_->__data_));
+ }
+ }
+ else
+ {
+ // This is a detach operation
+ if constexpr (same_as<__split::__split_t, _Tag>)
+ {
+ // no-op
+ }
+ else
+ {
+ __op->__shared_state_->__detach();
+ }
+ }
+ }
+
+ std::optional<__on_stop_cb_t> __on_stop_{};
+ __intrusive_ptr<__shared_state_t> __shared_state_;
+};
template <class _CvrefSenderId, class _EnvId>
struct __receiver
@@ -3402,281 +3485,18 @@
struct __t
{
- __sh_state<_CvrefSender, _Env>& __sh_state_;
-
using receiver_concept = receiver_t;
using __id = __receiver;
- template <__completion_tag _Tag, class... _As>
- friend void tag_invoke(_Tag __tag, __t&& __self, _As&&... __as) noexcept
- {
- __sh_state<_CvrefSender, _Env>& __state = __self.__sh_state_;
-
- try
- {
- using __tuple_t = __decayed_tuple<_Tag, _As...>;
- __state.__data_.template emplace<__tuple_t>(__tag,
- (_As&&)__as...);
- }
- catch (...)
- {
- using __tuple_t =
- __decayed_tuple<set_error_t, std::exception_ptr>;
- __state.__data_.template emplace<__tuple_t>(
- set_error, std::current_exception());
- }
- __state.__notify();
- }
-
- friend const __env_t<_Env>& tag_invoke(get_env_t,
- const __t& __self) noexcept
- {
- return __self.__sh_state_.__env_;
- }
- };
-};
-
-template <class _CvrefSender, class _Env = empty_env>
-struct __sh_state
-{
- using __t = __sh_state;
- using __id = __sh_state;
-
- template <class... _Ts>
- using __bind_tuples = //
- __mbind_front_q<__variant,
- std::tuple<set_stopped_t>, // Initial state of the
- // variant is set_stopped
- std::tuple<set_error_t, std::exception_ptr>, _Ts...>;
-
- using __bound_values_t = //
- __value_types_of_t<_CvrefSender, __env_t<_Env>,
- __mbind_front_q<__decayed_tuple, set_value_t>,
- __q<__bind_tuples>>;
-
- using __variant_t = //
- __error_types_of_t<
- _CvrefSender, __env_t<_Env>,
- __transform<__mbind_front_q<__decayed_tuple, set_error_t>,
- __bound_values_t>>;
-
- using __receiver_t =
- stdexec::__t<__receiver<__cvref_id<_CvrefSender>, stdexec::__id<_Env>>>;
-
- in_place_stop_source __stop_source_{};
- __variant_t __data_;
- std::atomic<void*> __head_{nullptr};
- __env_t<_Env> __env_;
- connect_result_t<_CvrefSender, __receiver_t> __op_state2_;
-
- explicit __sh_state(_CvrefSender&& __sndr, _Env __env = {}) :
- __env_(__make_env((_Env&&)__env, __mkprop(__stop_source_.get_token(),
- get_stop_token))),
- __op_state2_(connect((_CvrefSender&&)__sndr, __receiver_t{*this}))
- {}
-
- void __notify() noexcept
- {
- void* const __completion_state = static_cast<void*>(this);
- void* __old = __head_.exchange(__completion_state,
- std::memory_order_acq_rel);
- __split_state_base* __state = static_cast<__split_state_base*>(__old);
-
- while (__state != nullptr)
- {
- __split_state_base* __next = __state->__next_;
- __state->__notify_(__state);
- __state = __next;
- }
- }
-};
-
-template <class... _Tys>
-using __set_value_t =
- completion_signatures<set_value_t(const __decay_t<_Tys>&...)>;
-
-template <class _Ty>
-using __set_error_t = completion_signatures<set_error_t(const __decay_t<_Ty>&)>;
-
-template <class _CvrefSenderId, class _EnvId>
-using __completions_t = //
- __try_make_completion_signatures<
- // NOT TO SPEC:
- // See https://github.com/brycelelbach/wg21_p2300_execution/issues/26
- __cvref_t<_CvrefSenderId>, __env_t<__t<_EnvId>>,
- completion_signatures<set_error_t(const std::exception_ptr&),
- set_stopped_t()>, // NOT TO SPEC
- __q<__set_value_t>, __q<__set_error_t>>;
-
-struct __split_t
-{};
-
-struct split_t : __split_t
-{
- template <sender _Sender>
- requires sender_in<_Sender, empty_env> &&
- __decay_copyable<env_of_t<_Sender>>
- auto operator()(_Sender&& __sndr) const
- {
- auto __domain = __get_early_domain(__sndr);
- return stdexec::transform_sender(
- __domain, __make_sexpr<split_t>(__(), (_Sender&&)__sndr));
- }
-
- template <sender _Sender, class _Env>
- requires sender_in<_Sender, _Env> && __decay_copyable<env_of_t<_Sender>>
- auto operator()(_Sender&& __sndr, _Env&& __env) const
- {
- auto __domain = __get_late_domain(__sndr, __env);
- return stdexec::transform_sender(
- __domain, __make_sexpr<split_t>(__(), (_Sender&&)__sndr),
- (_Env&&)__env);
- }
-
- STDEXEC_ATTRIBUTE((always_inline)) //
- __binder_back<split_t> operator()() const
- {
- return {{}, {}, {}};
- }
-
- using _Sender = __1;
- using __legacy_customizations_t = //
- __types<tag_invoke_t(split_t,
- get_completion_scheduler_t<set_value_t>(
- get_env_t(const _Sender&)),
- _Sender),
- tag_invoke_t(split_t, _Sender)>;
-
- template <class _Sender, class... _Env>
- static auto transform_sender(_Sender&& __sndr, _Env... __env)
- {
- return __sexpr_apply(
- (_Sender&&)__sndr,
- [&]<class _Child>(__ignore, __ignore, _Child&& __child) {
- using __sh_state_t = __sh_state<_Child, _Env...>;
- auto __sh_state = std::make_shared<__sh_state_t>(
- (_Child&&)__child, std::move(__env)...);
- return __make_sexpr<__split_t>(std::move(__sh_state));
- });
- }
-};
-
-struct __split_impl : __sexpr_defaults
-{
- static constexpr auto get_state = //
- []<class _Sender, class _Receiver>(
- _Sender&& __sndr,
- _Receiver&) noexcept -> __split_state<_Sender, _Receiver> {
- static_assert(sender_expr_for<_Sender, __split_t>);
- return __split_state<_Sender, _Receiver>{(_Sender&&)__sndr};
- };
-
- static constexpr auto start = //
- []<class _Sender, class _Receiver>(
- __split_state<_Sender, _Receiver>& __state,
- _Receiver& __rcvr) noexcept -> void {
- auto* __shared_state = __state.__shared_state_.get();
- std::atomic<void*>& __head = __shared_state->__head_;
- void* const __completion_state = static_cast<void*>(__shared_state);
- void* __old = __head.load(std::memory_order_acquire);
-
- if (__old != __completion_state)
- {
- __state.__on_stop_.emplace(
- get_stop_token(stdexec::get_env(__rcvr)),
- __on_stop_request{__shared_state->__stop_source_});
- }
-
- do
- {
- if (__old == __completion_state)
- {
- __state.__notify(&__state);
- return;
- }
- __state.__next_ = static_cast<__split_state_base*>(__old);
- } while (!__head.compare_exchange_weak(
- __old, static_cast<void*>(&__state), std::memory_order_release,
- std::memory_order_acquire));
-
- if (__old == nullptr)
- {
- // the inner sender isn't running
- if (__shared_state->__stop_source_.stop_requested())
- {
- // 1. resets __head to completion state
- // 2. notifies waiting threads
- // 3. propagates "stopped" signal to `out_r'`
- __shared_state->__notify();
- }
- else
- {
- stdexec::start(__shared_state->__op_state2_);
- }
- }
- };
-
- static constexpr auto __get_completion_signatures_fn = //
- []<class _ShState>(auto, const std::shared_ptr<_ShState>&) //
- -> __mapply<__q<__completions_t>, __id<_ShState>> { return {}; };
-
- static constexpr auto get_completion_signatures = //
- []<class _Self, class _OtherEnv>(_Self&&, _OtherEnv&&) noexcept
- -> __call_result_t<__sexpr_apply_t, _Self,
- __mtypeof<__get_completion_signatures_fn>> {
- static_assert(sender_expr_for<_Self, __split_t>);
- return {};
- };
-};
-} // namespace __split
-
-using __split::split_t;
-inline constexpr split_t split{};
-
-template <>
-struct __sexpr_impl<__split::__split_t> : __split::__split_impl
-{};
-
-template <>
-struct __sexpr_impl<split_t> : __split::__split_impl
-{};
-
-/////////////////////////////////////////////////////////////////////////////
-// [execution.senders.adaptors.ensure_started]
-namespace __ensure_started
-{
-template <class _BaseEnv>
-using __env_t = //
- __make_env_t<_BaseEnv, // NOT TO SPEC
- __with<get_stop_token_t, in_place_stop_token>>;
-
-template <class _CvrefSenderId, class _EnvId>
-struct __sh_state;
-
-template <class _CvrefSenderId, class _EnvId = __id<empty_env>>
-struct __receiver
-{
- using _CvrefSender = stdexec::__cvref_t<_CvrefSenderId>;
- using _Env = stdexec::__t<_EnvId>;
-
- class __t
- {
- __intrusive_ptr<stdexec::__t<__sh_state<_CvrefSenderId, _EnvId>>>
- __shared_state_;
-
- public:
- using receiver_concept = receiver_t;
- using __id = __receiver;
-
- explicit __t(stdexec::__t<__sh_state<_CvrefSenderId, _EnvId>>&
- __shared_state) noexcept :
- __shared_state_(__shared_state.__intrusive_from_this())
+ explicit __t(
+ __shared_state<_CvrefSender, _Env>* __shared_state) noexcept :
+ __shared_state_(__shared_state)
{}
template <__completion_tag _Tag, class... _As>
friend void tag_invoke(_Tag __tag, __t&& __self, _As&&... __as) noexcept
{
- stdexec::__t<__sh_state<_CvrefSenderId, _EnvId>>& __state =
+ __shared_state<_CvrefSender, _Env>& __state =
*__self.__shared_state_;
try
@@ -3694,7 +3514,6 @@
}
__state.__notify();
- __self.__shared_state_.reset();
}
friend const __env_t<_Env>& tag_invoke(get_env_t,
@@ -3702,310 +3521,315 @@
{
return __self.__shared_state_->__env_;
}
+
+ __shared_state<_CvrefSender, _Env>* __shared_state_;
};
};
-struct __operation_base
+template <class _CvrefSender, class _Env>
+struct __shared_state :
+ __enable_intrusive_from_this<__shared_state<_CvrefSender, _Env>>
{
- using __notify_fn = void(__operation_base*) noexcept;
- __notify_fn* __notify_{};
-};
+ using __variant_t = __compl_sigs::__for_all_sigs<
+ __completion_signatures_of_t<_CvrefSender, _Env>, __q<__decayed_tuple>,
+ __mbind_front_q<__variant,
+ std::tuple<set_stopped_t>, // Initial state of the
+ // variant is set_stopped
+ std::tuple<set_error_t, std::exception_ptr>>>;
-template <class _CvrefSenderId, class _EnvId = __id<empty_env>>
-struct __sh_state
-{
- using _CvrefSender = stdexec::__cvref_t<_CvrefSenderId>;
- using _Env = stdexec::__t<_EnvId>;
+ using __receiver_t = __t<__receiver<__cvref_id<_CvrefSender>, __id<_Env>>>;
- struct __t : __enable_intrusive_from_this<__t>
- {
- using __id = __sh_state;
+ in_place_stop_source __stop_source_{};
+ __variant_t __data_;
+ std::atomic<void*> __head_{nullptr};
+ __env_t<_Env> __env_;
+ connect_result_t<_CvrefSender, __receiver_t> __op_state2_;
- template <class... _Ts>
- using __bind_tuples = //
- __mbind_front_q<__variant,
- std::tuple<set_stopped_t>, // Initial state of the
- // variant is set_stopped
- std::tuple<set_error_t, std::exception_ptr>,
- _Ts...>;
-
- using __bound_values_t = //
- __value_types_of_t<_CvrefSender, __env_t<_Env>,
- __mbind_front_q<__decayed_tuple, set_value_t>,
- __q<__bind_tuples>>;
-
- using __variant_t = //
- __error_types_of_t<
- _CvrefSender, __env_t<_Env>,
- __transform<__mbind_front_q<__decayed_tuple, set_error_t>,
- __bound_values_t>>;
-
- using __receiver_t = stdexec::__t<__receiver<_CvrefSenderId, _EnvId>>;
-
- __variant_t __data_;
- in_place_stop_source __stop_source_{};
-
- std::atomic<void*> __op_state1_{nullptr};
- __env_t<_Env> __env_;
- connect_result_t<_CvrefSender, __receiver_t> __op_state2_;
-
- explicit __t(_CvrefSender&& __sndr, _Env __env = {}) :
- __env_(
- __make_env((_Env&&)__env, __mkprop(__stop_source_.get_token(),
- get_stop_token))),
- __op_state2_(connect((_CvrefSender&&)__sndr, __receiver_t{*this}))
- {
- start(__op_state2_);
- }
-
- void __notify() noexcept
- {
- void* const __completion_state = static_cast<void*>(this);
- void* const __old = __op_state1_.exchange(
- __completion_state, std::memory_order_acq_rel);
- if (__old != nullptr)
- {
- auto* __op = static_cast<__operation_base*>(__old);
- __op->__notify_(__op);
- }
- }
-
- void __detach() noexcept
- {
- __stop_source_.request_stop();
- }
- };
-};
-
-template <class _CvrefSenderId, class _EnvId, class _ReceiverId>
-struct __operation
-{
- using _CvrefSender = stdexec::__cvref_t<_CvrefSenderId>;
- using _Env = stdexec::__t<_EnvId>;
- using _Receiver = stdexec::__t<_ReceiverId>;
-
- class __t : public __operation_base
- {
- struct __on_stop_request
- {
- in_place_stop_source& __stop_source_;
-
- void operator()() noexcept
- {
- __stop_source_.request_stop();
- }
- };
-
- using __on_stop = //
- std::optional<typename stop_token_of_t<env_of_t<_Receiver>&>::
- template callback_type<__on_stop_request>>;
-
- _Receiver __rcvr_;
- __on_stop __on_stop_{};
- __intrusive_ptr<stdexec::__t<__sh_state<_CvrefSenderId, _EnvId>>>
- __shared_state_;
-
- public:
- using __id = __operation;
-
- __t( //
- _Receiver __rcvr, //
- __intrusive_ptr<stdexec::__t<__sh_state<_CvrefSenderId, _EnvId>>>
- __shared_state) //
- noexcept(std::is_nothrow_move_constructible_v<_Receiver>) :
- __operation_base{__notify},
- __rcvr_((_Receiver&&)__rcvr),
- __shared_state_(std::move(__shared_state))
- {}
-
- ~__t()
- {
- // Check to see if this operation was ever started. If not,
- // detach the (potentially still running) operation:
- if (nullptr ==
- __shared_state_->__op_state1_.load(std::memory_order_acquire))
- {
- __shared_state_->__detach();
- }
- }
-
- STDEXEC_IMMOVABLE(__t);
-
- static void __notify(__operation_base* __self) noexcept
- {
- __t* __op = static_cast<__t*>(__self);
- __op->__on_stop_.reset();
-
- std::visit(
- [&](auto& __tupl) noexcept -> void {
- __apply(
- [&](auto __tag, auto&... __args) noexcept -> void {
- __tag((_Receiver&&)__op->__rcvr_, std::move(__args)...);
- },
- __tupl);
- },
- __op->__shared_state_->__data_);
- }
-
- friend void tag_invoke(start_t, __t& __self) noexcept
- {
- stdexec::__t<__sh_state<_CvrefSenderId, _EnvId>>* __shared_state =
- __self.__shared_state_.get();
- std::atomic<void*>& __op_state1 = __shared_state->__op_state1_;
- void* const __completion_state = static_cast<void*>(__shared_state);
- void* const __old = __op_state1.load(std::memory_order_acquire);
- if (__old == __completion_state)
- {
- __self.__notify(&__self);
- }
- else
- {
- // register stop callback:
- __self.__on_stop_.emplace(
- get_stop_token(get_env(__self.__rcvr_)),
- __on_stop_request{__shared_state->__stop_source_});
- // Check if the stop_source has requested cancellation
- if (__shared_state->__stop_source_.stop_requested())
- {
- // Stop has already been requested. Don't bother starting
- // the child operations.
- stdexec::set_stopped((_Receiver&&)__self.__rcvr_);
- }
- else
- {
- // Otherwise, the inner source hasn't notified completion.
- // Set this operation as the __op_state1 so it's notified.
- void* __old = nullptr;
- if (!__op_state1.compare_exchange_weak(
- __old, &__self, std::memory_order_release,
- std::memory_order_acquire))
- {
- // We get here when the task completed during the
- // execution of this function. Complete the operation
- // synchronously.
- STDEXEC_ASSERT(__old == __completion_state);
- __self.__notify(&__self);
- }
- }
- }
- }
- };
-};
-
-template <class _ShState>
-struct __data
-{
- __data(__intrusive_ptr<_ShState> __sh_state) noexcept :
- __sh_state_(std::move(__sh_state))
+ explicit __shared_state(_CvrefSender&& __sndr, _Env __env) :
+ __env_(__make_env((_Env&&)__env, __mkprop(__stop_source_.get_token(),
+ get_stop_token))),
+ __op_state2_(connect((_CvrefSender&&)__sndr, __receiver_t{this}))
{}
- __data(__data&&) = default;
- __data& operator=(__data&&) = default;
-
- ~__data()
+ void __start_op() noexcept
{
- if (__sh_state_ != nullptr)
+ // the inner sender isn't running. if we reach here, then
+ // one way or the other, __shared_state::__notify() will be
+ // called, which decrements the ref count of *this.
+ // So we need to increment it here:
+ this->__inc_ref();
+
+ if (__stop_source_.stop_requested())
{
- // detach from the still-running operation.
- // NOT TO SPEC: This also requests cancellation.
- __sh_state_->__detach();
+ // 1. resets __head to completion state
+ // 2. notifies waiting threads
+ // 3. propagates "stopped" signal to `out_r'`
+ __notify();
+ }
+ else
+ {
+ stdexec::start(__op_state2_);
}
}
- __intrusive_ptr<_ShState> __sh_state_;
+ // This is called when the shared async operation completes:
+ void __notify() noexcept
+ {
+ void* const __completion_state = static_cast<void*>(this);
+ void* const __old = __head_.exchange(__completion_state,
+ std::memory_order_acq_rel);
+ __local_state_base* __state = static_cast<__local_state_base*>(__old);
+
+ while (__state != nullptr)
+ {
+ __local_state_base* __next = __state->__next_;
+ __state->__action_(__state, __action_kind::__notify);
+ __state = __next;
+ }
+
+ // The async operation has completed, so we can release our
+ // ref-count on it:
+ this->__dec_ref();
+ }
+
+ void __detach() noexcept
+ {
+ // Check to see if this operation was ever started. If not,
+ // detach the (potentially still running) operation:
+ if (nullptr == __head_.load(std::memory_order_acquire))
+ {
+ __stop_source_.request_stop();
+ }
+ }
};
-template <class... _Tys>
-using __set_value_t = completion_signatures<set_value_t(__decay_t<_Tys>&&...)>;
-
-template <class _Ty>
-using __set_error_t = completion_signatures<set_error_t(__decay_t<_Ty>&&)>;
-
-template <class _CvrefSenderId, class _EnvId>
+template <class _Cvref, class _CvrefSenderId, class _EnvId>
using __completions_t = //
__try_make_completion_signatures<
// NOT TO SPEC:
- // See https://github.com/brycelelbach/wg21_p2300_execution/issues/26
+ // See https://github.com/cplusplus/sender-receiver/issues/23
__cvref_t<_CvrefSenderId>, __env_t<__t<_EnvId>>,
- completion_signatures<set_error_t(std::exception_ptr&&),
+ completion_signatures<set_error_t(
+ __minvoke<_Cvref, std::exception_ptr>),
set_stopped_t()>, // NOT TO SPEC
- __q<__set_value_t>, __q<__set_error_t>>;
+ __transform<_Cvref,
+ __mcompose<__q<completion_signatures>, __qf<set_value_t>>>,
+ __transform<_Cvref,
+ __mcompose<__q<completion_signatures>, __qf<set_error_t>>>>;
-template <class _Receiver>
-auto __connect_fn_(_Receiver& __rcvr) noexcept
-{
- return [&]<class _ShState>(auto, __data<_ShState> __dat) //
- noexcept(__nothrow_decay_copyable<_Receiver>)
- -> __t<__mapply<__mbind_back_q<__operation, __id<_Receiver>>,
- __id<_ShState>>> {
- return {(_Receiver&&)__rcvr, std::move(__dat.__sh_state_)};
- };
-}
+template <class _Ty>
+using __clref_t = const __decay_t<_Ty>&;
-template <class _Receiver>
-auto __connect_fn(_Receiver& __rcvr) noexcept -> decltype(__connect_fn_(__rcvr))
-{
- return __connect_fn_(__rcvr);
-}
+template <class _Ty>
+using __rref_t = __decay_t<_Ty>&&;
+template <class _Tag>
+using __cvref_results_t = //
+ __if_c<same_as<_Tag, __split::__split_t>, __q<__clref_t>, __q<__rref_t>>;
+
+template <class _Tag>
inline auto __get_completion_signatures_fn() noexcept
-{
- return []<class _ShState>(auto, const __data<_ShState>&) //
- -> __mapply<__q<__completions_t>, __id<_ShState>> { return {}; };
+{ //
+ return
+ []<template <class> class _Data, class _ShState>(
+ auto, const _Data<_ShState>&) //
+ -> __mapply<__mbind_front_q<__completions_t, __cvref_results_t<_Tag>>,
+ _ShState> { return {}; };
}
-struct __ensure_started_t
-{};
-
-struct __ensure_started_impl : __sexpr_defaults
+template <class _Tag>
+struct __shared_impl : __sexpr_defaults
{
- static constexpr auto connect = //
- []<class _Self, class _Receiver>(
- _Self&& __self,
- _Receiver
- __rcvr) noexcept(__nothrow_callable<__sexpr_apply_t, _Self,
- decltype(__connect_fn(
- __declval<
- _Receiver&>()))>)
- -> __call_result_t<__sexpr_apply_t, _Self,
- decltype(__connect_fn(__declval<_Receiver&>()))> {
- static_assert(sender_expr_for<_Self, __ensure_started_t>);
- return __sexpr_apply((_Self&&)__self, __connect_fn(__rcvr));
+ static constexpr auto get_state = //
+ []<class _Sender, class _Receiver>(
+ _Sender&& __sndr,
+ _Receiver&) noexcept -> __local_state<_Sender, _Receiver> {
+ static_assert(sender_expr_for<_Sender, _Tag>);
+ return __local_state<_Sender, _Receiver>{(_Sender&&)__sndr};
};
static constexpr auto get_completion_signatures = //
[]<class _Self, class _OtherEnv>(_Self&&, _OtherEnv&&) noexcept
-> __call_result_t<__sexpr_apply_t, _Self,
- __result_of<__get_completion_signatures_fn>> {
- static_assert(sender_expr_for<_Self, __ensure_started_t>);
+ __result_of<__get_completion_signatures_fn<_Tag>>> {
+ static_assert(sender_expr_for<_Self, _Tag>);
return {};
};
+
+ static constexpr auto start = //
+ []<class _Sender, class _Receiver>(
+ __local_state<_Sender, _Receiver>& __state,
+ _Receiver& __rcvr) noexcept -> void {
+ auto* __shared_state = __state.__shared_state_.get();
+ std::atomic<void*>& __head = __shared_state->__head_;
+ void* const __completion_state = static_cast<void*>(__shared_state);
+ void* __old = __head.load(std::memory_order_acquire);
+
+ if (__old != __completion_state)
+ {
+ __state.__on_stop_.emplace(
+ get_stop_token(stdexec::get_env(__rcvr)),
+ __on_stop_request{__shared_state->__stop_source_});
+
+ if constexpr (same_as<_Tag, __ensure_started::__ensure_started_t>)
+ {
+ // Check if the stop_source has requested cancellation
+ if (__shared_state->__stop_source_.stop_requested())
+ {
+ // Stop has already been requested. Don't bother starting
+ // the child operations.
+ stdexec::set_stopped((_Receiver&&)__rcvr);
+ return;
+ }
+ }
+ }
+
+ // With the split algorithm, multiple split senders can be started
+ // simultaneously, but only one should start the async operation. The
+ // following loop atomically (re)tries to set the pointer to the head of
+ // the list to __state. When it finally succeeds, the prior value is
+ // checked. If it is nullptr, then this split sender has won the race
+ // and has the honor of starting the async operation.
+ do
+ {
+ if (__old == __completion_state)
+ {
+ __state.template __action<_Tag>(&__state,
+ __action_kind::__notify);
+ return;
+ }
+ __state.__next_ = static_cast<__local_state_base*>(__old);
+ } while (!__head.compare_exchange_weak(
+ __old, static_cast<void*>(&__state), std::memory_order_release,
+ std::memory_order_acquire));
+
+ if constexpr (same_as<_Tag, __split::__split_t>)
+ {
+ if (__old == nullptr)
+ {
+ __shared_state->__start_op();
+ }
+ }
+ };
};
+} // namespace __shared
+
+////////////////////////////////////////////////////////////////////////////
+// [execution.senders.adaptors.split]
+namespace __split
+{
+using namespace __shared;
+
+template <class _ShState>
+struct __data
+{
+ explicit __data(__intrusive_ptr<_ShState> __shared_state) noexcept :
+ __shared_state(std::move(__shared_state))
+ {}
+
+ __intrusive_ptr<_ShState> __shared_state;
+};
+
+struct __split_t
+{};
+
+struct split_t
+{
+ template <sender _Sender, class _Env = empty_env>
+ requires sender_in<_Sender, _Env> && __decay_copyable<env_of_t<_Sender>>
+ auto operator()(_Sender&& __sndr, _Env&& __env = {}) const
+ {
+ auto __domain = __get_late_domain(__sndr, __env);
+ return stdexec::transform_sender(
+ __domain, __make_sexpr<split_t>((_Env&&)__env, (_Sender&&)__sndr));
+ }
+
+ STDEXEC_ATTRIBUTE((always_inline)) //
+ __binder_back<split_t> operator()() const
+ {
+ return {{}, {}, {}};
+ }
+
+ using _Sender = __1;
+ using __legacy_customizations_t = //
+ __types<tag_invoke_t(split_t,
+ get_completion_scheduler_t<set_value_t>(
+ get_env_t(const _Sender&)),
+ _Sender),
+ tag_invoke_t(split_t, _Sender)>;
+
+ template <class _CvrefSender, class _Env>
+ using __receiver_t =
+ __t<__meval<__receiver, __cvref_id<_CvrefSender>, __id<_Env>>>;
+
+ template <class _Sender>
+ static auto transform_sender(_Sender&& __sndr)
+ {
+ using _Receiver =
+ __receiver_t<__child_of<_Sender>, __decay_t<__data_of<_Sender>>>;
+ static_assert(sender_to<__child_of<_Sender>, _Receiver>);
+ return __sexpr_apply((_Sender&&)__sndr,
+ [&]<class _Env, class _Child>(
+ __ignore, _Env&& __env, _Child&& __child) {
+ auto __state =
+ __make_intrusive<__shared_state<_Child, __decay_t<_Env>>>(
+ (_Child&&)__child, (_Env&&)__env);
+ return __make_sexpr<__split_t>(__data{std::move(__state)});
+ });
+ }
+};
+} // namespace __split
+
+using __split::split_t;
+inline constexpr split_t split{};
+
+template <>
+struct __sexpr_impl<__split::__split_t> :
+ __shared::__shared_impl<__split::__split_t>
+{};
+
+/////////////////////////////////////////////////////////////////////////////
+// [execution.senders.adaptors.ensure_started]
+namespace __ensure_started
+{
+using namespace __shared;
+
+// Each ensure_started sender has one of these, created when
+// ensure_started() is called.
+template <class _ShState>
+struct __data
+{
+ explicit __data(__intrusive_ptr<_ShState> __ptr) noexcept :
+ __shared_state(std::move(__ptr))
+ {
+ // Eagerly launch the async operation.
+ __shared_state->__start_op();
+ }
+
+ __data(__data&&) noexcept = default;
+ __data& operator=(__data&&) noexcept = default;
+
+ ~__data()
+ {
+ if (__shared_state != nullptr)
+ {
+ // detach from the still-running operation.
+ // NOT TO SPEC: This also requests cancellation.
+ __shared_state->__detach();
+ }
+ }
+
+ __intrusive_ptr<_ShState> __shared_state;
+};
+
+struct __ensure_started_t
+{};
struct ensure_started_t
{
- template <sender _Sender>
- requires sender_in<_Sender, empty_env> &&
- __decay_copyable<env_of_t<_Sender>>
- auto operator()(_Sender&& __sndr) const
- {
- if constexpr (sender_expr_for<_Sender, __ensure_started_t>)
- {
- return (_Sender&&)__sndr;
- }
- else
- {
- auto __domain = __get_early_domain(__sndr);
- return stdexec::transform_sender(
- __domain,
- __make_sexpr<ensure_started_t>(__(), (_Sender&&)__sndr));
- }
- STDEXEC_UNREACHABLE();
- }
-
- template <sender _Sender, class _Env>
+ template <sender _Sender, class _Env = empty_env>
requires sender_in<_Sender, _Env> && __decay_copyable<env_of_t<_Sender>>
- auto operator()(_Sender&& __sndr, _Env&& __env) const
+ auto operator()(_Sender&& __sndr, _Env&& __env = {}) const
{
if constexpr (sender_expr_for<_Sender, __ensure_started_t>)
{
@@ -4015,9 +3839,8 @@
{
auto __domain = __get_late_domain(__sndr, __env);
return stdexec::transform_sender(
- __domain,
- __make_sexpr<ensure_started_t>(__(), (_Sender&&)__sndr),
- (_Env&&)__env);
+ __domain, __make_sexpr<ensure_started_t>((_Env&&)__env,
+ (_Sender&&)__sndr));
}
STDEXEC_UNREACHABLE();
}
@@ -4036,24 +3859,23 @@
_Sender),
tag_invoke_t(ensure_started_t, _Sender)>;
- template <class _CvrefSender, class... _Env>
- using __receiver_t = stdexec::__t<
- __meval<__receiver, __cvref_id<_CvrefSender>, __id<_Env>...>>;
+ template <class _CvrefSender, class _Env>
+ using __receiver_t =
+ __t<__meval<__receiver, __cvref_id<_CvrefSender>, __id<_Env>>>;
- template <class _Sender, class... _Env>
- requires sender_to<__child_of<_Sender>,
- __receiver_t<__child_of<_Sender>, _Env...>>
- static auto transform_sender(_Sender&& __sndr, _Env... __env)
+ template <class _Sender>
+ static auto transform_sender(_Sender&& __sndr)
{
- return __sexpr_apply(
- (_Sender&&)__sndr,
- [&]<class _Child>(__ignore, __ignore, _Child&& __child) {
- using __sh_state_t =
- __t<__sh_state<__cvref_id<_Child>, __id<_Env>...>>;
- auto __sh_state = __make_intrusive<__sh_state_t>(
- (_Child&&)__child, std::move(__env)...);
- return __make_sexpr<__ensure_started_t>(
- __data{std::move(__sh_state)});
+ using _Receiver =
+ __receiver_t<__child_of<_Sender>, __decay_t<__data_of<_Sender>>>;
+ static_assert(sender_to<__child_of<_Sender>, _Receiver>);
+ return __sexpr_apply((_Sender&&)__sndr,
+ [&]<class _Env, class _Child>(
+ __ignore, _Env&& __env, _Child&& __child) {
+ auto __state =
+ __make_intrusive<__shared_state<_Child, __decay_t<_Env>>>(
+ (_Child&&)__child, (_Env&&)__env);
+ return __make_sexpr<__ensure_started_t>(__data{std::move(__state)});
});
}
};
@@ -4064,7 +3886,7 @@
template <>
struct __sexpr_impl<__ensure_started::__ensure_started_t> :
- __ensure_started::__ensure_started_impl
+ __shared::__shared_impl<__ensure_started::__ensure_started_t>
{};
STDEXEC_PRAGMA_PUSH()
@@ -4110,44 +3932,23 @@
// [exec.let]
namespace __let
{
-template <class _SetTag, class _Domain = dependent_domain>
+template <class _Set, class _Domain = dependent_domain>
struct __let_t;
template <class _Set>
-struct __on_not_callable_
-{
- using __t =
- __callable_error<"In stdexec::let_value(Sender, Function)..."__csz>;
-};
+inline constexpr __mstring __in_which_let_msg{
+ "In stdexec::let_value(Sender, Function)..."};
template <>
-struct __on_not_callable_<set_error_t>
-{
- using __t =
- __callable_error<"In stdexec::let_error(Sender, Function)..."__csz>;
-};
+inline constexpr __mstring __in_which_let_msg<set_error_t>{
+ "In stdexec::let_error(Sender, Function)..."};
template <>
-struct __on_not_callable_<set_stopped_t>
-{
- using __t =
- __callable_error<"In stdexec::let_stopped(Sender, Function)..."__csz>;
-};
+inline constexpr __mstring __in_which_let_msg<set_stopped_t>{
+ "In stdexec::let_stopped(Sender, Function)..."};
template <class _Set>
-using __on_not_callable = __t<__on_not_callable_<_Set>>;
-
-template <class _Tp>
-using __decay_ref = __decay_t<_Tp>&;
-
-// A metafunction that computes the result sender type for a given set of
-// argument types
-template <class _Fun, class _Set>
-using __result_sender_t = //
- __transform<
- __q<__decay_ref>,
- __mbind_front<__mtry_catch_q<__call_result_t, __on_not_callable<_Set>>,
- _Fun>>;
+using __on_not_callable = __callable_error<__in_which_let_msg<_Set>>;
// FUTURE: when we have a scheduler query for "always completes inline",
// then we can use that instead of hard-coding `__inln::__scheduler` here.
@@ -4189,6 +3990,50 @@
__env::__env_join_t<__env::__prop<_Scheduler(get_scheduler_t)>,
__env::__prop<void(get_domain_t)>, _Env>>;
+template <class _Tp>
+using __decay_ref = __decay_t<_Tp>&;
+
+template <__mstring _Where, __mstring _What>
+struct _FUNCTION_MUST_RETURN_A_VALID_SENDER_IN_THE_CURRENT_ENVIRONMENT_
+{};
+
+#if STDEXEC_NVHPC()
+template <class _Sender, class _Env, class _Set>
+struct __bad_result_sender_
+{
+ using __t = __mexception<
+ _FUNCTION_MUST_RETURN_A_VALID_SENDER_IN_THE_CURRENT_ENVIRONMENT_<
+ __in_which_let_msg<_Set>,
+ "The function must return a valid sender for the current environment"__csz>,
+ _WITH_SENDER_<_Sender>, _WITH_ENVIRONMENT_<_Env>>;
+};
+template <class _Sender, class _Env, class _Set>
+using __bad_result_sender = __t<__bad_result_sender_<_Sender, _Env, _Set>>;
+#else
+template <class _Sender, class _Env, class _Set>
+using __bad_result_sender = __mexception<
+ _FUNCTION_MUST_RETURN_A_VALID_SENDER_IN_THE_CURRENT_ENVIRONMENT_<
+ __in_which_let_msg<_Set>,
+ "The function must return a valid sender for the current environment"__csz>,
+ _WITH_SENDER_<_Sender>, _WITH_ENVIRONMENT_<_Env>>;
+#endif
+
+template <class _Sender, class _Env, class _Set>
+using __ensure_sender = //
+ __minvoke_if_c<sender_in<_Sender, _Env>, __q<__midentity>,
+ __mbind_back_q<__bad_result_sender, _Set, _Env>, _Sender>;
+
+// A metafunction that computes the result sender type for a given set of
+// argument types
+template <class _Fun, class _Set, class _Env, class _Sched>
+using __result_sender_fn = //
+ __mcompose<
+ __mbind_back_q<__ensure_sender, __result_env_t<_Env, _Sched>, _Set>,
+ __transform<__q<__decay_ref>,
+ __mbind_front<__mtry_catch_q<__call_result_t,
+ __on_not_callable<_Set>>,
+ _Fun>>>;
+
// The receiver that gets connected to the result sender is the input receiver,
// possibly augmented with the input sender's completion scheduler (which is
// where the result sender will be started).
@@ -4201,7 +4046,7 @@
using __op_state_for = //
__mcompose<__mbind_back_q<connect_result_t,
__result_receiver_t<_Receiver, _Sched>>,
- __result_sender_t<_Fun, _Set>>;
+ __result_sender_fn<_Fun, _Set, env_of_t<_Receiver>, _Sched>>;
template <class _Set, class _Sig>
struct __tfx_signal_fn
@@ -4216,7 +4061,7 @@
template <class _Env, class _Fun, class _Sched>
using __f = //
__try_make_completion_signatures<
- __minvoke<__result_sender_t<_Fun, _Set>, _Args...>,
+ __minvoke<__result_sender_fn<_Fun, _Set, _Env, _Sched>, _Args...>,
__result_env_t<_Env, _Sched>,
// because we don't know if connect-ing the result sender will
// throw:
@@ -4241,33 +4086,56 @@
__q<__concat_completion_signatures_t>>,
__completion_signatures_of_t<_CvrefSender, _Env>>;
+template <__mstring _Where, __mstring _What>
+struct _NO_COMMON_DOMAIN_
+{};
+
+template <class _Set>
+using __no_common_domain_t = //
+ _NO_COMMON_DOMAIN_<
+ __in_which_let_msg<_Set>,
+ "The senders returned by Function do not all share a common domain"__csz>;
+
+template <class _Set>
+using __try_common_domain_fn = //
+ __mtry_catch_q<
+ __domain::__common_domain_t,
+ __mcompose<__mbind_front_q<__mexception, __no_common_domain_t<_Set>>,
+ __q<_WITH_SENDERS_>>>;
+
// Compute all the domains of all the result senders and make sure they're all
// the same
-template <class _SetTag, class _Child, class _Fun, class _Env>
-using __result_domain_t = __gather_completions_for<
- _SetTag, _Child, _Env,
- __mtry_catch<
- __transform<__q<__decay_ref>, __mbind_front_q<__call_result_t, _Fun>>,
- __on_not_callable<_SetTag>>,
- __q<__domain::__common_domain_t>>;
+template <class _Set, class _Child, class _Fun, class _Env, class _Sched>
+using __result_domain_t = //
+ __gather_completions_for<_Set, _Child, _Env,
+ __result_sender_fn<_Fun, _Set, _Env, _Sched>,
+ __try_common_domain_fn<_Set>>;
template <class _LetTag, class _Env>
auto __mk_transform_env_fn(const _Env& __env) noexcept
{
- using _SetTag = __t<_LetTag>;
- return [&]<class _Fun, sender_in<_Env> _Child>(
- __ignore, _Fun&&, _Child&& __child) -> decltype(auto) {
- using _Scheduler = __completion_sched<_Child, _SetTag>;
- if constexpr (__unknown_context<_Scheduler>)
+ using _Set = __t<_LetTag>;
+ return [&]<class _Fun, class _Child>(__ignore, _Fun&&,
+ _Child&& __child) -> decltype(auto) {
+ using __completions_t = __completion_signatures_of_t<_Child, _Env>;
+ if constexpr (__merror<__completions_t>)
{
- return (__env);
+ return __completions_t();
}
else
{
- return __join_env(__mkprop(get_completion_scheduler<_SetTag>(
- stdexec::get_env(__child)),
- get_scheduler),
- __mkprop(get_domain), __env);
+ using _Scheduler = __completion_sched<_Child, _Set>;
+ if constexpr (__unknown_context<_Scheduler>)
+ {
+ return (__env);
+ }
+ else
+ {
+ return __join_env(__mkprop(get_completion_scheduler<_Set>(
+ stdexec::get_env(__child)),
+ get_scheduler),
+ __mkprop(get_domain), __env);
+ }
}
STDEXEC_UNREACHABLE();
};
@@ -4276,13 +4144,31 @@
template <class _LetTag, class _Env>
auto __mk_transform_sender_fn(const _Env&) noexcept
{
- using _SetTag = __t<_LetTag>;
- return []<class _Fun, sender_in<_Env> _Child>(__ignore, _Fun&& __fun,
- _Child&& __child) {
- using _Domain = __result_domain_t<_SetTag, _Child, _Fun, _Env>;
- static_assert(__none_of<_Domain, __none_such, dependent_domain>);
- return __make_sexpr<__let_t<_SetTag, _Domain>>((_Fun&&)__fun,
- (_Child&&)__child);
+ using _Set = __t<_LetTag>;
+ return
+ []<class _Fun, class _Child>(__ignore, _Fun&& __fun, _Child&& __child) {
+ using __completions_t = __completion_signatures_of_t<_Child, _Env>;
+ if constexpr (__merror<__completions_t>)
+ {
+ return __completions_t();
+ }
+ else
+ {
+ using _Sched = __completion_sched<_Child, _Set>;
+ using _Domain = __result_domain_t<_Set, _Child, _Fun, _Env, _Sched>;
+ if constexpr (__merror<_Domain>)
+ {
+ return _Domain();
+ }
+ else
+ {
+ static_assert(
+ __none_of<_Domain, __none_such, dependent_domain>);
+ return __make_sexpr<__let_t<_Set, _Domain>>((_Fun&&)__fun,
+ (_Child&&)__child);
+ }
+ }
+ STDEXEC_UNREACHABLE();
};
}
@@ -4319,11 +4205,11 @@
__op_state_variant __op_state3_;
};
-template <class _SetTag, class _Domain>
+template <class _Set, class _Domain>
struct __let_t
{
using __domain_t = _Domain;
- using __t = _SetTag;
+ using __t = _Set;
template <sender _Sender, __movable_value _Fun>
auto operator()(_Sender&& __sndr, _Fun __fun) const
@@ -4331,7 +4217,7 @@
auto __domain = __get_early_domain(__sndr);
return stdexec::transform_sender(
__domain,
- __make_sexpr<__let_t<_SetTag>>((_Fun&&)__fun, (_Sender&&)__sndr));
+ __make_sexpr<__let_t<_Set>>((_Fun&&)__fun, (_Sender&&)__sndr));
}
template <class _Fun>
@@ -4350,23 +4236,23 @@
_Sender, _Function),
tag_invoke_t(__let_t, _Sender, _Function)>;
- template <sender_expr_for<__let_t<_SetTag>> _Sender, class _Env>
+ template <sender_expr_for<__let_t<_Set>> _Sender, class _Env>
static decltype(auto) transform_env(_Sender&& __sndr, const _Env& __env)
{
return __sexpr_apply((_Sender&&)__sndr,
- __mk_transform_env_fn<__let_t<_SetTag>>(__env));
+ __mk_transform_env_fn<__let_t<_Set>>(__env));
}
- template <sender_expr_for<__let_t<_SetTag>> _Sender, class _Env>
+ template <sender_expr_for<__let_t<_Set>> _Sender, class _Env>
requires same_as<__early_domain_of_t<_Sender>, dependent_domain>
static decltype(auto) transform_sender(_Sender&& __sndr, const _Env& __env)
{
return __sexpr_apply((_Sender&&)__sndr,
- __mk_transform_sender_fn<__let_t<_SetTag>>(__env));
+ __mk_transform_sender_fn<__let_t<_Set>>(__env));
}
};
-template <class _SetTag, class _Domain>
+template <class _Set, class _Domain>
struct __let_impl : __sexpr_defaults
{
static constexpr auto get_attrs = //
@@ -4377,27 +4263,26 @@
static constexpr auto get_completion_signatures = //
[]<class _Self, class _Env>(_Self&&, _Env&&) noexcept
- -> __completions<__child_of<_Self>, _Env, __let_t<_SetTag, _Domain>,
+ -> __completions<__child_of<_Self>, _Env, __let_t<_Set, _Domain>,
__data_of<_Self>> {
- static_assert(sender_expr_for<_Self, __let_t<_SetTag, _Domain>>);
+ static_assert(sender_expr_for<_Self, __let_t<_Set, _Domain>>);
return {};
};
static constexpr auto get_state = //
- []<class _Sender, class _Receiver>(_Sender&& __sndr,
- _Receiver& __rcvr) {
- static_assert(sender_expr_for<_Sender, __let_t<_SetTag, _Domain>>);
+ []<class _Sender, class _Receiver>(_Sender&& __sndr, _Receiver&) {
+ static_assert(sender_expr_for<_Sender, __let_t<_Set, _Domain>>);
using _Fun = __data_of<_Sender>;
- using _Sched = __completion_sched<_Sender, _SetTag>;
+ using _Sched = __completion_sched<_Sender, _Set>;
using __mk_let_state =
- __mbind_front_q<__let_state, _Receiver, _Fun, _SetTag, _Sched>;
+ __mbind_front_q<__let_state, _Receiver, _Fun, _Set, _Sched>;
using __let_state_t =
- __gather_completions_for<_SetTag, __child_of<_Sender>,
+ __gather_completions_for<_Set, __child_of<_Sender>,
env_of_t<_Receiver>, __q<__decayed_tuple>,
__mk_let_state>;
- _Sched __sched = query_or(get_completion_scheduler<_SetTag>,
+ _Sched __sched = query_or(get_completion_scheduler<_Set>,
stdexec::get_env(__sndr), __none_such());
return __let_state_t{
STDEXEC_CALL_EXPLICIT_THIS_MEMFN((_Sender&&)__sndr,
@@ -4406,25 +4291,23 @@
};
template <class _State, class _Receiver, class... _As>
- static void __bind(_State&& __state, _Receiver&& __rcvr,
+ static void __bind(_State& __state, _Receiver& __rcvr,
_As&&... __as) noexcept
{
try
{
- using __fun_t = typename _State::__fun_t;
- using __sched_t = typename _State::__sched_t;
- using __tuple_t = __decayed_tuple<_As...>;
- using __op_state_t = __minvoke<
- __op_state_for<_Receiver, __fun_t, _SetTag, __sched_t>, _As...>;
auto& __args =
- __state.__args_.template emplace<__tuple_t>((_As&&)__as...);
- auto& __op =
- __state.__op_state3_.template emplace<__op_state_t>(__conv{[&] {
- return stdexec::connect(
- __apply(std::move(__state.__fun_), __args),
- __state.__get_result_receiver((_Receiver&&)__rcvr));
- }});
- stdexec::start(__op);
+ __state.__args_.template emplace<__decayed_tuple<_As...>>(
+ (_As&&)__as...);
+ auto __sndr2 = __apply(std::move(__state.__fun_), __args);
+ auto __rcvr2 = __state.__get_result_receiver((_Receiver&&)__rcvr);
+ auto __mkop = [&] {
+ return stdexec::connect(std::move(__sndr2), std::move(__rcvr2));
+ };
+ auto& __op2 =
+ __state.__op_state3_.template emplace<decltype(__mkop())>(
+ __conv{__mkop});
+ stdexec::start(__op2);
}
catch (...)
{
@@ -4436,9 +4319,9 @@
[]<class _State, class _Receiver, class _Tag, class... _As>(
__ignore, _State& __state, _Receiver& __rcvr, _Tag,
_As&&... __as) noexcept -> void {
- if constexpr (std::same_as<_Tag, _SetTag>)
+ if constexpr (std::same_as<_Tag, _Set>)
{
- __bind((_State&&)__state, (_Receiver&&)__rcvr, (_As&&)__as...);
+ __bind(__state, __rcvr, (_As&&)__as...);
}
else
{
@@ -4457,9 +4340,9 @@
using let_stopped_t = __let::__let_t<set_stopped_t>;
inline constexpr let_stopped_t let_stopped{};
-template <class _SetTag, class _Domain>
-struct __sexpr_impl<__let::__let_t<_SetTag, _Domain>> :
- __let::__let_impl<_SetTag, _Domain>
+template <class _Set, class _Domain>
+struct __sexpr_impl<__let::__let_t<_Set, _Domain>> :
+ __let::__let_impl<_Set, _Domain>
{};
/////////////////////////////////////////////////////////////////////////////
@@ -4835,200 +4718,10 @@
// tuple<set_error_t, __decay_t<_Error2>>,
// ...
// >
-template <class _State, class... _Tuples>
-using __make_bind_ = __mbind_back<_State, _Tuples...>;
-
-template <class _State>
-using __make_bind = __mbind_front_q<__make_bind_, _State>;
-
-template <class _Tag>
-using __tuple_t = __mbind_front_q<__decayed_tuple, _Tag>;
-
-template <class _Sender, class _Env, class _State, class _Tag>
-using __bind_completions_t =
- __gather_completions_for<_Tag, _Sender, _Env, __tuple_t<_Tag>,
- __make_bind<_State>>;
-
-template <class _Sender, class _Env>
-using __variant_for_t = //
- __minvoke<__minvoke<
- __mfold_right<__nullable_variant_t,
- __mbind_front_q<__bind_completions_t, _Sender, _Env>>,
- set_value_t, set_error_t, set_stopped_t>>;
-
-template <class _SchedulerId, class _VariantId, class _ReceiverId>
-struct __operation1_base;
-
-// This receiver is to be completed on the execution context
-// associated with the scheduler. When the source sender
-// completes, the completion information is saved off in the
-// operation state so that when this receiver completes, it can
-// read the completion out of the operation state and forward it
-// to the output receiver after transitioning to the scheduler's
-// context.
-template <class _SchedulerId, class _VariantId, class _ReceiverId>
-struct __receiver2
-{
- using _Receiver = stdexec::__t<_ReceiverId>;
-
- struct __t
- {
- using receiver_concept = receiver_t;
- using __id = __receiver2;
- __operation1_base<_SchedulerId, _VariantId, _ReceiverId>* __op_state_;
-
- // If the work is successfully scheduled on the new execution
- // context and is ready to run, forward the completion signal in
- // the operation state
- template <same_as<set_value_t> _Tag>
- friend void tag_invoke(_Tag, __t&& __self) noexcept
- {
- __self.__op_state_->__complete();
- }
-
- template <__one_of<set_error_t, set_stopped_t> _Tag, class... _As>
- requires __callable<_Tag, _Receiver, _As...>
- friend void tag_invoke(_Tag, __t&& __self, _As&&... __as) noexcept
- {
- _Tag{}((_Receiver&&)__self.__op_state_->__rcvr_, (_As&&)__as...);
- }
-
- friend auto tag_invoke(get_env_t, const __t& __self) noexcept
- -> env_of_t<_Receiver>
- {
- return get_env(__self.__op_state_->__rcvr_);
- }
- };
-};
-
-// This receiver is connected to the input sender. When that
-// sender completes (on whatever context it completes on), save
-// the completion information into the operation state. Then,
-// schedule a second operation to __complete on the execution
-// context of the scheduler. That second receiver will read the
-// completion information out of the operation state and propagate
-// it to the output receiver from within the desired context.
-template <class _SchedulerId, class _VariantId, class _ReceiverId>
-struct __receiver1
-{
- using _Scheduler = stdexec::__t<_SchedulerId>;
- using _Receiver = stdexec::__t<_ReceiverId>;
- using __receiver2_t =
- stdexec::__t<__receiver2<_SchedulerId, _VariantId, _ReceiverId>>;
-
- struct __t
- {
- using __id = __receiver1;
- using receiver_concept = receiver_t;
- __operation1_base<_SchedulerId, _VariantId, _ReceiverId>* __op_state_;
-
- template <class... _Args>
- static constexpr bool __nothrow_complete_ =
- (__nothrow_decay_copyable<_Args> && ...);
-
- template <class _Tag, class... _Args>
- static void __complete_(_Tag, __t&& __self, _Args&&... __args) //
- noexcept(__nothrow_complete_<_Args...>)
- {
- // Write the tag and the args into the operation state so that
- // we can forward the completion from within the scheduler's
- // execution context.
- __self.__op_state_->__data_
- .template emplace<__decayed_tuple<_Tag, _Args...>>(
- _Tag{}, (_Args&&)__args...);
- // Enqueue the schedule operation so the completion happens
- // on the scheduler's execution context.
- start(__self.__op_state_->__state2_);
- }
-
- template <__completion_tag _Tag, class... _Args>
- requires __callable<_Tag, _Receiver, __decay_t<_Args>...>
- friend void tag_invoke(_Tag __tag, __t&& __self,
- _Args&&... __args) noexcept
- {
- __try_call((_Receiver&&)__self.__op_state_->__rcvr_,
- __function_constant_v<__complete_<_Tag, _Args...>>,
- (_Tag&&)__tag, (__t&&)__self, (_Args&&)__args...);
- }
-
- friend auto tag_invoke(get_env_t, const __t& __self) noexcept
- -> env_of_t<_Receiver>
- {
- return get_env(__self.__op_state_->__rcvr_);
- }
- };
-};
-
-template <class _SchedulerId, class _VariantId, class _ReceiverId>
-struct __operation1_base : __immovable
-{
- using _Scheduler = stdexec::__t<_SchedulerId>;
- using _Receiver = stdexec::__t<_ReceiverId>;
- using _Variant = stdexec::__t<_VariantId>;
- using __receiver2_t =
- stdexec::__t<__receiver2<_SchedulerId, _VariantId, _ReceiverId>>;
-
- _Scheduler __sched_;
- _Receiver __rcvr_;
- _Variant __data_;
- connect_result_t<schedule_result_t<_Scheduler>, __receiver2_t> __state2_;
-
- __operation1_base(_Scheduler __sched, _Receiver&& __rcvr) :
- __sched_((_Scheduler&&)__sched), __rcvr_((_Receiver&&)__rcvr),
- __state2_(connect(schedule(__sched_), __receiver2_t{this}))
- {}
-
- void __complete() noexcept
- {
- STDEXEC_ASSERT(!__data_.valueless_by_exception());
- std::visit(
- [this]<class _Tup>(_Tup& __tupl) -> void {
- if constexpr (same_as<_Tup, std::monostate>)
- {
- std::terminate(); // reaching this indicates a bug in
- // schedule_from
- }
- else
- {
- __apply(
- [&]<class... _Args>(auto __tag, _Args&... __args) -> void {
- __tag((_Receiver&&)__rcvr_, (_Args&&)__args...);
- },
- __tupl);
- }
- },
- __data_);
- }
-};
-
-template <class _SchedulerId, class _CvrefSenderId, class _ReceiverId>
-struct __operation1
-{
- using _Scheduler = stdexec::__t<_SchedulerId>;
- using _CvrefSender = stdexec::__cvref_t<_CvrefSenderId>;
- using _Receiver = stdexec::__t<_ReceiverId>;
- using __variant_t = __variant_for_t<_CvrefSender, env_of_t<_Receiver>>;
- using __receiver1_t = stdexec::__t<
- __receiver1<_SchedulerId, stdexec::__id<__variant_t>, _ReceiverId>>;
- using __base_t = __operation1_base<_SchedulerId, stdexec::__id<__variant_t>,
- _ReceiverId>;
-
- struct __t : __base_t
- {
- using __id = __operation1;
- connect_result_t<_CvrefSender, __receiver1_t> __state1_;
-
- __t(_Scheduler __sched, _CvrefSender&& __sndr, _Receiver&& __rcvr) :
- __base_t{(_Scheduler&&)__sched, (_Receiver&&)__rcvr},
- __state1_(connect((_CvrefSender&&)__sndr, __receiver1_t{this}))
- {}
-
- friend void tag_invoke(start_t, __t& __op_state) noexcept
- {
- start(__op_state.__state1_);
- }
- };
-};
+template <class _CvrefSender, class _Env>
+using __variant_for_t = __compl_sigs::__maybe_for_all_sigs<
+ __completion_signatures_of_t<_CvrefSender, _Env>, __q<__decayed_tuple>,
+ __nullable_variant_t>;
template <class _Tp>
using __decay_rvalue_ref = __decay_t<_Tp>&&;
@@ -5038,6 +4731,30 @@
__transform<__q<__decay_rvalue_ref>,
__mcompose<__q<completion_signatures>, __qf<_Tag>>>;
+template <class... _Ts>
+using __all_nothrow_decay_copyable_ =
+ __mbool<(__nothrow_decay_copyable<_Ts> && ...)>;
+
+template <class _CvrefSender, class _Env>
+using __all_values_and_errors_nothrow_decay_copyable = //
+ __compl_sigs::__maybe_for_all_sigs<
+ __completion_signatures_of_t<_CvrefSender, _Env>,
+ __q<__all_nothrow_decay_copyable_>, __q<__mand>>;
+
+template <class _CvrefSender, class _Env>
+using __with_error_t = //
+ __if<__all_values_and_errors_nothrow_decay_copyable<_CvrefSender, _Env>,
+ completion_signatures<>, __with_exception_ptr>;
+
+template <class _Scheduler, class _CvrefSender, class _Env>
+using __completions_t = //
+ __try_make_completion_signatures<
+ _CvrefSender, _Env,
+ __try_make_completion_signatures<schedule_result_t<_Scheduler>, _Env,
+ __with_error_t<_CvrefSender, _Env>,
+ __mconst<completion_signatures<>>>,
+ __decay_signature<set_value_t>, __decay_signature<set_error_t>>;
+
template <class _SchedulerId>
struct __environ
{
@@ -5056,31 +4773,75 @@
};
};
-template <class... _Ts>
-using __all_nothrow_decay_copyable =
- __mbool<(__nothrow_decay_copyable<_Ts> && ...)>;
+template <class _Scheduler, class _Sexpr, class _Receiver>
+struct __state;
-template <class _CvrefSender, class _Env>
-using __all_values_and_errors_nothrow_decay_copyable = //
- __mand<
- __try_error_types_of_t<_CvrefSender, _Env,
- __q<__all_nothrow_decay_copyable>>,
- __try_value_types_of_t<_CvrefSender, _Env,
- __q<__all_nothrow_decay_copyable>, __q<__mand>>>;
+// This receiver is to be completed on the execution context
+// associated with the scheduler. When the source sender
+// completes, the completion information is saved off in the
+// operation state so that when this receiver completes, it can
+// read the completion out of the operation state and forward it
+// to the output receiver after transitioning to the scheduler's
+// context.
+template <class _Scheduler, class _Sexpr, class _Receiver>
+struct __receiver2 :
+ receiver_adaptor<__receiver2<_Scheduler, _Sexpr, _Receiver>>
+{
+ explicit __receiver2(
+ __state<_Scheduler, _Sexpr, _Receiver>* __state) noexcept :
+ __state_{__state}
+ {}
-template <class _CvrefSender, class _Env>
-using __with_error_t = //
- __if<__all_values_and_errors_nothrow_decay_copyable<_CvrefSender, _Env>,
- completion_signatures<>, __with_exception_ptr>;
+ _Receiver&& base() && noexcept
+ {
+ return std::move(__state_->__receiver());
+ }
-template <class _Scheduler, class _CvrefSender, class _Env>
-using __completions_t = //
- __try_make_completion_signatures<
- _CvrefSender, _Env,
- __try_make_completion_signatures<schedule_result_t<_Scheduler>, _Env,
- __with_error_t<_CvrefSender, _Env>,
- __mconst<completion_signatures<>>>,
- __decay_signature<set_value_t>, __decay_signature<set_error_t>>;
+ const _Receiver& base() const& noexcept
+ {
+ return __state_->__receiver();
+ }
+
+ void set_value() && noexcept
+ {
+ STDEXEC_ASSERT(!__state_->__data_.valueless_by_exception());
+ std::visit(
+ [__state = __state_]<class _Tup>(_Tup& __tupl) noexcept -> void {
+ if constexpr (same_as<_Tup, std::monostate>)
+ {
+ std::terminate(); // reaching this indicates a bug in
+ // schedule_from
+ }
+ else
+ {
+ __apply(
+ [&]<class... _Args>(auto __tag,
+ _Args&... __args) noexcept -> void {
+ __tag(std::move(__state->__receiver()), (_Args&&)__args...);
+ },
+ __tupl);
+ }
+ },
+ __state_->__data_);
+ }
+
+ __state<_Scheduler, _Sexpr, _Receiver>* __state_;
+};
+
+template <class _Scheduler, class _Sexpr, class _Receiver>
+struct __state : __enable_receiver_from_this<_Sexpr, _Receiver>
+{
+ using __variant_t =
+ __variant_for_t<__child_of<_Sexpr>, env_of_t<_Receiver>>;
+ using __receiver2_t = __receiver2<_Scheduler, _Sexpr, _Receiver>;
+
+ __variant_t __data_;
+ connect_result_t<schedule_result_t<_Scheduler>, __receiver2_t> __state2_;
+
+ explicit __state(_Scheduler __sched) :
+ __data_(), __state2_(connect(schedule(__sched), __receiver2_t{this}))
+ {}
+};
struct schedule_from_t
{
@@ -5104,30 +4865,10 @@
struct __schedule_from_impl : __sexpr_defaults
{
- template <class, class _Data, class _Child>
- using __env_ = __env::__env_join_t<_Data, env_of_t<_Child>>;
-
- template <class _Sender>
- using __env_t = __mapply<__q<__env_>, _Sender>;
-
template <class _Sender>
using __scheduler_t =
__decay_t<__call_result_t<get_completion_scheduler_t<set_value_t>,
- __env_t<_Sender>>>;
-
- template <class _Sender, class _Receiver>
- using __receiver_t = //
- stdexec::__t<__receiver1<stdexec::__id<__scheduler_t<_Sender>>,
- stdexec::__id<__variant_for_t<
- __child_of<_Sender>, env_of_t<_Receiver>>>,
- stdexec::__id<_Receiver>>>;
-
- template <class _Sender, class _Receiver>
- using __operation_t = //
- stdexec::__t<__operation1< //
- stdexec::__id<__scheduler_t<_Sender>>,
- stdexec::__cvref_id<__child_of<_Sender>>,
- stdexec::__id<_Receiver>>>;
+ env_of_t<_Sender>>>;
static constexpr auto get_attrs = //
[]<class _Data, class _Child>(const _Data& __data,
@@ -5142,21 +4883,45 @@
return {};
};
- static constexpr auto connect = //
- []<class _Sender, receiver _Receiver>(_Sender&& __sndr,
- _Receiver __rcvr) //
- -> __operation_t<_Sender, _Receiver> //
- requires sender_to<__child_of<_Sender>,
- __receiver_t<_Sender, _Receiver>>
- {
+ static constexpr auto get_state =
+ []<class _Sender, class _Receiver>(_Sender&& __sndr, _Receiver&) {
static_assert(sender_expr_for<_Sender, schedule_from_t>);
- return __sexpr_apply((_Sender&&)__sndr,
- [&]<class _Data, class _Child>(
- __ignore, _Data&& __data, _Child&& __child)
- -> __operation_t<_Sender, _Receiver> {
- auto __sched = get_completion_scheduler<set_value_t>(__data);
- return {__sched, (_Child&&)__child, (_Receiver&&)__rcvr};
- });
+ auto __sched =
+ get_completion_scheduler<set_value_t>(stdexec::get_env(__sndr));
+ using _Scheduler = decltype(__sched);
+ return __state<_Scheduler, _Sender, _Receiver>{__sched};
+ };
+
+ static constexpr auto complete =
+ []<class _Tag, class... _Args>(__ignore, auto& __state, auto& __rcvr,
+ _Tag,
+ _Args&&... __args) noexcept -> void {
+ // Write the tag and the args into the operation state so that
+ // we can forward the completion from within the scheduler's
+ // execution context.
+ using __async_result = __decayed_tuple<_Tag, _Args...>;
+ if constexpr (__nothrow_constructible_from<__async_result, _Tag,
+ _Args...>)
+ {
+ __state.__data_.template emplace<__async_result>(
+ _Tag(), (_Args&&)__args...);
+ }
+ else
+ {
+ try
+ {
+ __state.__data_.template emplace<__async_result>(
+ _Tag(), (_Args&&)__args...);
+ }
+ catch (...)
+ {
+ set_error(std::move(__rcvr), std::current_exception());
+ return;
+ }
+ }
+ // Enqueue the schedule operation so the completion happens
+ // on the scheduler's execution context.
+ stdexec::start(__state.__state2_);
};
};
} // namespace __schedule_from
@@ -5330,44 +5095,6 @@
// __write adaptor
namespace __write_
{
-template <class _ReceiverId, class _Env>
-struct __operation_base
-{
- using _Receiver = __t<_ReceiverId>;
- _Receiver __rcvr_;
- const _Env __env_;
-};
-
-inline constexpr auto __get_env = [](auto* __op) noexcept -> decltype(auto) {
- return (__op->__env_);
-};
-
-template <class _ReceiverId, class _Env>
-using __receiver_t = //
- __t<__detail::__receiver_with<&__operation_base<_ReceiverId, _Env>::__rcvr_,
- __get_env>>;
-
-template <class _CvrefSenderId, class _ReceiverId, class _Env>
-struct __operation : __operation_base<_ReceiverId, _Env>
-{
- using _CvrefSender = __cvref_t<_CvrefSenderId>;
- using _Receiver = __t<_ReceiverId>;
- using __base_t = __operation_base<_ReceiverId, _Env>;
- using __receiver_t = __write_::__receiver_t<_ReceiverId, _Env>;
- connect_result_t<_CvrefSender, __receiver_t> __state_;
-
- __operation(_CvrefSender&& __sndr, _Receiver&& __rcvr, auto&& __env) :
- __base_t{(_Receiver&&)__rcvr, (decltype(__env))__env},
- __state_{
- stdexec::connect((_CvrefSender&&)__sndr, __receiver_t{{}, this})}
- {}
-
- friend void tag_invoke(start_t, __operation& __self) noexcept
- {
- start(__self.__state_);
- }
-};
-
struct __write_t
{
template <sender _Sender, class... _Envs>
@@ -5388,8 +5115,8 @@
STDEXEC_ATTRIBUTE((always_inline))
static auto __transform_env_fn(_Env&& __env) noexcept
{
- return [&](__ignore, const auto& __data, __ignore) noexcept {
- return __join_env(__data, (_Env&&)__env);
+ return [&](__ignore, const auto& __state, __ignore) noexcept {
+ return __join_env(__state, (_Env&&)__env);
};
}
@@ -5402,27 +5129,9 @@
struct __write_impl : __sexpr_defaults
{
- template <class _Self, class _Receiver>
- using __receiver_t =
- __write_::__receiver_t<__id<_Receiver>, __decay_t<__data_of<_Self>>>;
-
- template <class _Self, class _Receiver>
- using __operation_t =
- __operation<__cvref_id<__child_of<_Self>>, __id<_Receiver>,
- __decay_t<__data_of<_Self>>>;
-
- static constexpr auto connect = //
- []<class _Self, receiver _Receiver>(_Self&& __self, _Receiver __rcvr) //
- -> __operation_t<_Self, _Receiver> //
- requires sender_to<__child_of<_Self>, __receiver_t<_Self, _Receiver>>
- {
- static_assert(sender_expr_for<_Self, __write_t>);
- return __sexpr_apply((_Self&&)__self,
- [&]<class _Env, class _Child>(
- __write_t, _Env&& __env, _Child&& __child) //
- -> __operation_t<_Self, _Receiver> {
- return {(_Child&&)__child, (_Receiver&&)__rcvr, (_Env&&)__env};
- });
+ static constexpr auto get_env = //
+ [](__ignore, const auto& __state, const auto& __rcvr) noexcept {
+ return __join_env(__state, stdexec::get_env(__rcvr));
};
static constexpr auto get_completion_signatures = //
@@ -5457,201 +5166,26 @@
return __env_t{__env};
}
+
+template <class _Ty, class = __name_of<__decay_t<_Ty>>>
+struct __always
+{
+ _Ty __val_;
+
+ _Ty operator()() noexcept
+ {
+ return static_cast<_Ty&&>(__val_);
+ }
+};
+
+template <class _Ty>
+__always(_Ty) -> __always<_Ty>;
} // namespace __detail
/////////////////////////////////////////////////////////////////////////////
// [execution.senders.adaptors.on]
namespace __on
{
-template <class _SchedulerId, class _SenderId, class _ReceiverId>
-struct __operation;
-
-template <class _SchedulerId, class _SenderId, class _ReceiverId>
-struct __operation_base : __immovable
-{
- using _Scheduler = stdexec::__t<_SchedulerId>;
- using _Sender = stdexec::__t<_SenderId>;
- using _Receiver = stdexec::__t<_ReceiverId>;
-
- _Scheduler __scheduler_;
- _Sender __sndr_;
- _Receiver __rcvr_;
-};
-
-inline constexpr auto __sched_prop = [](auto* __op) noexcept {
- return __mkprop(__op->__scheduler_, get_scheduler);
-};
-inline constexpr auto __domain_prop = [](auto*) noexcept {
- return __mkprop(get_domain);
-};
-
-template <class _SchedulerId, class _SenderId, class _ReceiverId>
-using __receiver_ref_t = __t<__detail::__receiver_with<
- &__operation_base<_SchedulerId, _SenderId, _ReceiverId>::__rcvr_,
- __sched_prop, __domain_prop>>;
-
-template <class _SchedulerId, class _SenderId, class _ReceiverId>
-struct __receiver
-{
- using _Scheduler = stdexec::__t<_SchedulerId>;
- using _Sender = stdexec::__t<_SenderId>;
- using _Receiver = stdexec::__t<_ReceiverId>;
-
- struct __t : receiver_adaptor<__t>
- {
- using __id = __receiver;
- using __receiver_ref_t =
- __on::__receiver_ref_t<_SchedulerId, _SenderId, _ReceiverId>;
- stdexec::__t<__operation<_SchedulerId, _SenderId, _ReceiverId>>*
- __op_state_;
-
- _Receiver&& base() && noexcept
- {
- return (_Receiver&&)__op_state_->__rcvr_;
- }
-
- const _Receiver& base() const& noexcept
- {
- return __op_state_->__rcvr_;
- }
-
- void set_value() && noexcept
- {
- // cache this locally since *this is going bye-bye.
- auto* __op_state = __op_state_;
- try
- {
- // This line will invalidate *this:
- start(__op_state->__data_.template emplace<1>(
- __conv{[__op_state] {
- return connect((_Sender&&)__op_state->__sndr_,
- __receiver_ref_t{{}, __op_state});
- }}));
- }
- catch (...)
- {
- set_error((_Receiver&&)__op_state->__rcvr_,
- std::current_exception());
- }
- }
- };
-};
-
-template <class _SchedulerId, class _SenderId, class _ReceiverId>
-struct __operation
-{
- using _Scheduler = stdexec::__t<_SchedulerId>;
- using _Sender = stdexec::__t<_SenderId>;
- using _Receiver = stdexec::__t<_ReceiverId>;
-
- struct __t : __operation_base<_SchedulerId, _SenderId, _ReceiverId>
- {
- using _Base = __operation_base<_SchedulerId, _SenderId, _ReceiverId>;
- using __id = __operation;
- using __receiver_t =
- stdexec::__t<__receiver<_SchedulerId, _SenderId, _ReceiverId>>;
- using __receiver_ref_t =
- __on::__receiver_ref_t<_SchedulerId, _SenderId, _ReceiverId>;
- using __schedule_sender_t = __result_of<schedule, _Scheduler>;
-
- friend void tag_invoke(start_t, __t& __self) noexcept
- {
- start(std::get<0>(__self.__data_));
- }
-
- template <class _Sender2, class _Receiver2>
- __t(_Scheduler __sched, _Sender2&& __sndr, _Receiver2&& __rcvr) :
- _Base{{},
- (_Scheduler&&)__sched,
- (_Sender2&&)__sndr,
- (_Receiver2&&)__rcvr},
- __data_{std::in_place_index<0>, __conv{[this] {
- return connect(schedule(this->__scheduler_),
- __receiver_t{{}, this});
- }}}
- {}
-
- std::variant<connect_result_t<__schedule_sender_t, __receiver_t>,
- connect_result_t<_Sender, __receiver_ref_t>>
- __data_;
- };
-};
-
-template <class _SchedulerId, class _SenderId>
-struct __sender;
-
-struct on_t;
-
-template <class _SchedulerId, class _SenderId>
-struct __sender
-{
- using _Scheduler = stdexec::__t<_SchedulerId>;
- using _Sender = stdexec::__t<_SenderId>;
-
- struct __t
- {
- using __id = __sender;
- using sender_concept = sender_t;
-
- using __schedule_sender_t = __result_of<schedule, _Scheduler>;
- template <class _ReceiverId>
- using __receiver_ref_t =
- __on::__receiver_ref_t<_SchedulerId, _SenderId, _ReceiverId>;
- template <class _ReceiverId>
- using __receiver_t =
- stdexec::__t<__receiver<_SchedulerId, _SenderId, _ReceiverId>>;
- template <class _ReceiverId>
- using __operation_t =
- stdexec::__t<__operation<_SchedulerId, _SenderId, _ReceiverId>>;
-
- _Scheduler __scheduler_;
- _Sender __sndr_;
-
- template <__decays_to<__t> _Self, receiver _Receiver>
- requires constructible_from<_Sender,
- __copy_cvref_t<_Self, _Sender>> &&
- sender_to<__schedule_sender_t,
- __receiver_t<stdexec::__id<_Receiver>>> &&
- sender_to<_Sender,
- __receiver_ref_t<stdexec::__id<_Receiver>>>
- friend auto tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
- -> __operation_t<stdexec::__id<_Receiver>>
- {
- return {((_Self&&)__self).__scheduler_, ((_Self&&)__self).__sndr_,
- (_Receiver&&)__rcvr};
- }
-
- friend auto tag_invoke(get_env_t, const __t& __self) noexcept
- -> env_of_t<const _Sender&>
- {
- return get_env(__self.__sndr_);
- }
-
- template <__decays_to<__t> _Self, class _Env>
- friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
- -> __try_make_completion_signatures<
- __schedule_sender_t, _Env,
- __try_make_completion_signatures<
- __copy_cvref_t<_Self, _Sender>,
- __make_env_t<_Env, __with<get_scheduler_t, _Scheduler>>,
- completion_signatures<set_error_t(std::exception_ptr)>>,
- __mconst<completion_signatures<>>>
- {
- return {};
- }
-
- // BUGBUG better would be to port the `on` algorithm to __sexpr
- template <class _Self, class _Fun, class _OnTag = on_t>
- static auto apply(_Self&& __self, _Fun __fun)
- -> __call_result_t<_Fun, _OnTag, __copy_cvref_t<_Self, _Scheduler>,
- __copy_cvref_t<_Self, _Sender>>
- {
- return ((_Fun&&)__fun)(_OnTag(), ((_Self&&)__self).__scheduler_,
- ((_Self&&)__self).__sndr_);
- }
- };
-};
-
struct on_t
{
using _Sender = __1;
@@ -5668,10 +5202,6 @@
__make_sexpr<on_t>((_Scheduler&&)__sched, (_Sender&&)__sndr));
}
- template <class _Scheduler, class _Sender>
- using __sender_t = __t<__sender<stdexec::__id<__decay_t<_Scheduler>>,
- stdexec::__id<__decay_t<_Sender>>>>;
-
template <class _Env>
STDEXEC_ATTRIBUTE((always_inline))
static auto __transform_env_fn(_Env&& __env) noexcept
@@ -5688,14 +5218,13 @@
}
template <class _Sender, class _Env>
- requires __is_not_instance_of<__id<__decay_t<_Sender>>, __sender>
static auto transform_sender(_Sender&& __sndr, const _Env&)
{
return __sexpr_apply((_Sender&&)__sndr,
[]<class _Data, class _Child>(
__ignore, _Data&& __data, _Child&& __child) {
- return __sender_t<_Data, _Child>{(_Data&&)__data,
- (_Child&&)__child};
+ return let_value(schedule(__data),
+ __detail::__always{(_Child&&)__child});
});
}
};
@@ -5704,14 +5233,6 @@
using __on::on_t;
inline constexpr on_t on{};
-// BUGBUG this will also be unnecessary when `on` returns a __sexpr
-namespace __detail
-{
-template <class _SchedulerId, class _SenderId>
-extern __mconst<__on::__sender<__t<_SchedulerId>, __name_of<__t<_SenderId>>>>
- __name_of_v<__on::__sender<_SchedulerId, _SenderId>>;
-}
-
/////////////////////////////////////////////////////////////////////////////
// [execution.senders.adaptors.into_variant]
namespace __into_variant
@@ -5873,20 +5394,19 @@
__single_values_of_t<_Env, _Senders>...>>;
template <class... _Args>
-using __all_nothrow_decay_copyable =
+using __all_nothrow_decay_copyable_ =
__mbool<(__nothrow_decay_copyable<_Args> && ...)>;
template <class _Env, class... _Senders>
-using __all_value_and_error_args_nothrow_decay_copyable = //
+using __all_nothrow_decay_copyable = //
__mand<__compl_sigs::__maybe_for_all_sigs<
__completion_signatures_of_t<_Senders, _Env>,
- __q<__all_nothrow_decay_copyable>, __q<__mand>>...>;
+ __q<__all_nothrow_decay_copyable_>, __q<__mand>>...>;
template <class _Env, class... _Senders>
using __completions_t = //
__concat_completion_signatures_t<
- __if<__all_value_and_error_args_nothrow_decay_copyable<_Env,
- _Senders...>,
+ __if<__all_nothrow_decay_copyable<_Env, _Senders...>,
completion_signatures<set_stopped_t()>,
completion_signatures<set_stopped_t(),
set_error_t(std::exception_ptr&&)>>,
@@ -5958,9 +5478,7 @@
error_types_of_t<_Senders, __env_t<_Env>, __types>...>;
using __errors_variant = //
- __if<__all_value_and_error_args_nothrow_decay_copyable<_Env,
- _Senders...>,
- __error_types,
+ __if<__all_nothrow_decay_copyable<_Env, _Senders...>, __error_types,
__minvoke<__push_back_unique<__q<std::variant>>, __error_types,
std::exception_ptr>>;
};
@@ -6159,14 +5677,14 @@
}
static constexpr auto complete = //
- []<class _Index, class _State, class _Receiver, class _SetTag,
- class... _Args>(_Index, _State& __state, _Receiver& __rcvr, _SetTag,
+ []<class _Index, class _State, class _Receiver, class _Set,
+ class... _Args>(_Index, _State& __state, _Receiver& __rcvr, _Set,
_Args&&... __args) noexcept -> void {
- if constexpr (same_as<_SetTag, set_error_t>)
+ if constexpr (same_as<_Set, set_error_t>)
{
__set_error(__state, __rcvr, (_Args&&)__args...);
}
- else if constexpr (same_as<_SetTag, set_stopped_t>)
+ else if constexpr (same_as<_Set, set_stopped_t>)
{
__state_t __expected = __started;
// Transition to the "stopped" state if and only if we're in the
@@ -6598,20 +6116,6 @@
STDEXEC_PRAGMA_POP()
-template <class _Ty, class = __name_of<__decay_t<_Ty>>>
-struct __always
-{
- _Ty __val_;
-
- _Ty operator()() noexcept
- {
- return static_cast<_Ty&&>(__val_);
- }
-};
-
-template <class _Ty>
-__always(_Ty) -> __always<_Ty>;
-
struct on_t : __no_scheduler_in_environment
{
template <scheduler _Scheduler, sender _Sender>
@@ -6651,7 +6155,7 @@
_Child&& __child) {
auto __old = get_scheduler(__env);
return transfer(let_value(transfer_just(std::move(__sched)),
- __always{(_Child&&)__child}),
+ __detail::__always{(_Child&&)__child}),
std::move(__old));
});
}
@@ -6757,6 +6261,11 @@
using __sync_wait_result_t =
__mtry_eval<__sync_wait_result_impl, _Sender, __q<std::tuple>>;
+template <class _Sender>
+using __sync_wait_with_variant_result_t =
+ __mtry_eval<__sync_wait_result_impl, __result_of<into_variant, _Sender>,
+ __q<__midentity>>;
+
template <class... _Values>
struct __state
{
@@ -6846,7 +6355,7 @@
template <class _Sender>
struct __variant_for
{
- using __t = __sync_wait_result_t<__result_of<into_variant, _Sender>>;
+ using __t = __sync_wait_with_variant_result_t<_Sender>;
};
template <class _Sender>
using __variant_for_t = __t<__variant_for<_Sender>>;
@@ -6888,7 +6397,7 @@
if constexpr (!sender_in<_Sender, __env>)
{
using _Completions = __completion_signatures_of_t<_Sender, __env>;
- if constexpr (!__ok<_Completions>)
+ if constexpr (__merror<_Completions>)
{
return _Completions();
}
@@ -6935,11 +6444,8 @@
auto operator()(_Sender&& __sndr) const
-> std::optional<__value_tuple_for_t<_Sender>>
{
- using _SD = __early_domain_of_t<_Sender>;
- constexpr bool __has_custom_impl =
- __callable<apply_sender_t, _SD, sync_wait_t, _Sender>;
- using _Domain = __if_c<__has_custom_impl, _SD, default_domain>;
- return stdexec::apply_sender(_Domain(), *this, (_Sender&&)__sndr);
+ auto __domain = __get_early_domain(__sndr);
+ return stdexec::apply_sender(__domain, *this, (_Sender&&)__sndr);
}
#if STDEXEC_NVHPC()
@@ -7024,7 +6530,11 @@
auto apply_sender(_Sender&& __sndr) const
-> std::optional<__variant_for_t<_Sender>>
{
- return sync_wait_t()(into_variant((_Sender&&)__sndr));
+ if (auto __opt_values = sync_wait_t()(into_variant((_Sender&&)__sndr)))
+ {
+ return std::move(std::get<0>(*__opt_values));
+ }
+ return std::nullopt;
}
};
} // namespace __sync_wait
diff --git a/include/sdbusplus/async/stdexec/task.hpp b/include/sdbusplus/async/stdexec/task.hpp
index 379941e..56ad9aa 100644
--- a/include/sdbusplus/async/stdexec/task.hpp
+++ b/include/sdbusplus/async/stdexec/task.hpp
@@ -30,7 +30,6 @@
#include <variant>
STDEXEC_PRAGMA_PUSH()
-STDEXEC_PRAGMA_IGNORE_GNU("-Wpragmas")
STDEXEC_PRAGMA_IGNORE_GNU("-Wundefined-inline")
namespace exec