stdexec: update to latest commit

Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: Ica6de68697329c2aad3a030a7ffb45bdb6a30f11
diff --git a/include/sdbusplus/async/stdexec/__detail/__basic_sender.hpp b/include/sdbusplus/async/stdexec/__detail/__basic_sender.hpp
index 70a02ec..c71a879 100644
--- a/include/sdbusplus/async/stdexec/__detail/__basic_sender.hpp
+++ b/include/sdbusplus/async/stdexec/__detail/__basic_sender.hpp
@@ -163,16 +163,15 @@
                __connect_fn<_Sexpr, _Receiver>> &&
     __mvalid<__state_type_t, tag_of_t<_Sexpr>, _Sexpr, _Receiver>;
 
-// Note: This is UB. UBSAN allows it for now.
-template <class _Parent, class _Child>
-_Parent* __parent_from_child(_Child* __child,
-                             _Child _Parent::*__mbr_ptr) noexcept
-{
-    alignas(_Parent) char __buf[sizeof(_Parent)];
-    _Parent* __parent = (_Parent*)&__buf;
-    const std::ptrdiff_t __offset = (char*)&(__parent->*__mbr_ptr) - __buf;
-    return (_Parent*)((char*)__child - __offset);
-}
+// // Note: This is UB. UBSAN allows it for now.
+// template <class _Parent, class _Child>
+// _Parent* __parent_from_child(_Child* __child, _Child _Parent::*__mbr_ptr)
+// noexcept {
+//   alignas(_Parent) char __buf[sizeof(_Parent)];
+//   _Parent* __parent = (_Parent*) &__buf;
+//   const std::ptrdiff_t __offset = (char*) &(__parent->*__mbr_ptr) - __buf;
+//   return (_Parent*) ((char*) __child - __offset);
+// }
 
 inline constexpr auto __get_attrs = //
     [](__ignore, const auto&... __child) noexcept -> decltype(auto) {
@@ -243,17 +242,15 @@
         // created with this receiver.
         __parent_op_t* __op_;
 
-        template <class _ChildSexpr, class _ChildReceiver>
-        static __t __from_op_state(
-            __op_state<_ChildSexpr, _ChildReceiver>* __child) noexcept
-        {
-            using __parent_op_t = __op_state<_Sexpr, _Receiver>;
-            std::ptrdiff_t __offset =
-                __parent_op_t::template __get_child_op_offset<__v<_Idx>>();
-            __parent_op_t* __parent =
-                (__parent_op_t*)((char*)__child - __offset);
-            return __t{__parent};
-        }
+        // template <class _ChildSexpr, class _ChildReceiver>
+        // static __t __from_op_state(__op_state<_ChildSexpr, _ChildReceiver>*
+        // __child) noexcept {
+        //   using __parent_op_t = __op_state<_Sexpr, _Receiver>;
+        //   std::ptrdiff_t __offset = __parent_op_t::template
+        //   __get_child_op_offset<__v<_Idx>>();
+        //   __parent_op_t* __parent = (__parent_op_t*) ((char*) __child -
+        //   __offset); return __t{__parent};
+        // }
 
         template <__completion_tag _Tag, class... _Args>
         STDEXEC_ATTRIBUTE((always_inline))
@@ -292,12 +289,7 @@
         __state_(__sexpr_impl<__tag_t>::get_state((_Sexpr&&)__sndr, __rcvr_))
     {}
 
-    _Receiver& __rcvr() noexcept
-    {
-        return __rcvr_;
-    }
-
-    const _Receiver& __rcvr() const noexcept
+    _Receiver& __rcvr() & noexcept
     {
         return __rcvr_;
     }
@@ -341,16 +333,6 @@
         __op_base_t* __base = (__op_base_t*)((char*)__derived - __offset);
         return __base->__rcvr();
     }
-
-    decltype(auto) __receiver() const noexcept
-    {
-        using __derived_t = decltype(__op_base_t::__state_);
-        const __derived_t* __derived = static_cast<const __derived_t*>(this);
-        constexpr std::size_t __offset = offsetof(__op_base_t, __state_);
-        const __op_base_t* __base =
-            (const __op_base_t*)((const char*)__derived - __offset);
-        return __base->__rcvr();
-    }
 };
 
 STDEXEC_PRAGMA_POP()
@@ -401,29 +383,22 @@
     using __data_t = typename __desc_t::__data;
     using __children_t = typename __desc_t::__children;
     using __state_t = typename __op_state::__state_t;
-    using __connect_t = __connect_fn<_Sexpr, _Receiver>;
-
-    static auto __connect(__op_state* __self, _Sexpr&& __sexpr)
-        -> __result_of<__sexpr_apply, _Sexpr, __connect_t>
-    {
-        return __sexpr_apply((_Sexpr&&)__sexpr, __connect_t{__self});
-    }
-
     using __inner_ops_t =
-        decltype(__op_state::__connect(nullptr, __declval<_Sexpr>()));
+        __result_of<__sexpr_apply, _Sexpr, __connect_fn<_Sexpr, _Receiver>>;
+
     __inner_ops_t __inner_ops_;
 
-    template <std::size_t _Idx>
-    static std::ptrdiff_t __get_child_op_offset() noexcept
-    {
-        __op_state* __self = (__op_state*)&__self;
-        return (std::ptrdiff_t)(
-            (char*)&__tup::__get<_Idx>(__self->__inner_ops_) - (char*)__self);
-    }
+    // template <std::size_t _Idx>
+    // static std::ptrdiff_t __get_child_op_offset() noexcept {
+    //   __op_state* __self = (__op_state*) &__self;
+    //   return (std::ptrdiff_t)((char*)
+    //   &__tup::__get<_Idx>(__self->__inner_ops_) - (char*) __self);
+    // }
 
     __op_state(_Sexpr&& __sexpr, _Receiver __rcvr) :
         __op_state::__op_base{(_Sexpr&&)__sexpr, (_Receiver&&)__rcvr},
-        __inner_ops_(__op_state::__connect(this, (_Sexpr&&)__sexpr))
+        __inner_ops_(__sexpr_apply((_Sexpr&&)__sexpr,
+                                   __connect_fn<_Sexpr, _Receiver>{this}))
     {}
 
     template <same_as<start_t> _Tag2>
diff --git a/include/sdbusplus/async/stdexec/__detail/__config.hpp b/include/sdbusplus/async/stdexec/__detail/__config.hpp
index d63c894..537de40 100644
--- a/include/sdbusplus/async/stdexec/__detail/__config.hpp
+++ b/include/sdbusplus/async/stdexec/__detail/__config.hpp
@@ -219,7 +219,12 @@
 #define STDEXEC_PRAGMA_IGNORE_EDG(...)                                         \
     _Pragma(STDEXEC_STRINGIZE(diag_suppress __VA_ARGS__))
 #elif STDEXEC_CLANG() || STDEXEC_GCC()
-#define STDEXEC_PRAGMA_PUSH() _Pragma("GCC diagnostic push")
+#define STDEXEC_PRAGMA_PUSH()                                                  \
+    _Pragma("GCC diagnostic push") STDEXEC_PRAGMA_IGNORE_GNU("-Wpragmas")      \
+        STDEXEC_PRAGMA_IGNORE_GNU("-Wunknown-pragmas")                         \
+            STDEXEC_PRAGMA_IGNORE_GNU("-Wunknown-warning-option")              \
+                STDEXEC_PRAGMA_IGNORE_GNU("-Wunknown-attributes")              \
+                    STDEXEC_PRAGMA_IGNORE_GNU("-Wattributes")
 #define STDEXEC_PRAGMA_POP() _Pragma("GCC diagnostic pop")
 #define STDEXEC_PRAGMA_IGNORE_GNU(...)                                         \
     _Pragma(STDEXEC_STRINGIZE(GCC diagnostic ignored __VA_ARGS__))
@@ -241,6 +246,12 @@
 #define STDEXEC_HAS_BUILTIN(...) 0
 #endif
 
+#if !STDEXEC_MSVC() && defined(__has_feature)
+#define STDEXEC_HAS_FEATURE __has_feature
+#else
+#define STDEXEC_HAS_FEATURE(...) 0
+#endif
+
 #if STDEXEC_HAS_BUILTIN(__is_trivially_copyable) || STDEXEC_MSVC()
 #define STDEXEC_IS_TRIVIALLY_COPYABLE(...) __is_trivially_copyable(__VA_ARGS__)
 #else
@@ -307,6 +318,12 @@
 #define STDEXEC_TERMINATE() std::terminate()
 #endif
 
+#if STDEXEC_HAS_FEATURE(thread_sanitizer) || defined(__SANITIZE_THREAD__)
+#define STDEXEC_TSAN(...) STDEXEC_HEAD_OR_TAIL(1, __VA_ARGS__)
+#else
+#define STDEXEC_TSAN(...) STDEXEC_HEAD_OR_NULL(0, __VA_ARGS__)
+#endif
+
 // Before clang-16, clang did not like libstdc++'s ranges implementation
 #if __has_include(<ranges>) && \
   (defined(__cpp_lib_ranges) && __cpp_lib_ranges >= 201911L) && \
diff --git a/include/sdbusplus/async/stdexec/__detail/__env.hpp b/include/sdbusplus/async/stdexec/__detail/__env.hpp
index 0b2e7f1..229de70 100644
--- a/include/sdbusplus/async/stdexec/__detail/__env.hpp
+++ b/include/sdbusplus/async/stdexec/__detail/__env.hpp
@@ -509,8 +509,11 @@
         return tag_invoke(as_awaitable, (_Ty&&)__value, *this);
     }
 
-    friend auto tag_invoke(get_env_t, const __env_promise&) noexcept
-        -> const _Env&;
+    template <same_as<get_env_t> _Tag>
+    friend auto tag_invoke(_Tag, const __env_promise&) noexcept -> const _Env&
+    {
+        std::terminate();
+    }
 };
 
 // For making an environment from key/value pairs and optionally
diff --git a/include/sdbusplus/async/stdexec/__detail/__execution_fwd.hpp b/include/sdbusplus/async/stdexec/__detail/__execution_fwd.hpp
index affffe9..70393e9 100644
--- a/include/sdbusplus/async/stdexec/__detail/__execution_fwd.hpp
+++ b/include/sdbusplus/async/stdexec/__detail/__execution_fwd.hpp
@@ -208,6 +208,26 @@
 extern const bulk_t bulk;
 
 //////////////////////////////////////////////////////////////////////////////////////////////////
+namespace __split
+{
+struct split_t;
+struct __split_t;
+} // namespace __split
+
+using __split::split_t;
+extern const split_t split;
+
+//////////////////////////////////////////////////////////////////////////////////////////////////
+namespace __ensure_started
+{
+struct ensure_started_t;
+struct __ensure_started_t;
+} // namespace __ensure_started
+
+using __ensure_started::ensure_started_t;
+extern const ensure_started_t ensure_started;
+
+//////////////////////////////////////////////////////////////////////////////////////////////////
 namespace __on_v2
 {
 struct on_t;
diff --git a/include/sdbusplus/async/stdexec/__detail/__intrusive_ptr.hpp b/include/sdbusplus/async/stdexec/__detail/__intrusive_ptr.hpp
index feaaccc..74505f9 100644
--- a/include/sdbusplus/async/stdexec/__detail/__intrusive_ptr.hpp
+++ b/include/sdbusplus/async/stdexec/__detail/__intrusive_ptr.hpp
@@ -22,6 +22,10 @@
 #include <memory>
 #include <new>
 
+#if STDEXEC_TSAN()
+#include <sanitizer/tsan_interface.h>
+#endif
+
 namespace stdexec
 {
 namespace __ptr
@@ -30,7 +34,22 @@
 struct __make_intrusive_t;
 
 template <class _Ty>
-struct __enable_intrusive_from_this;
+class __intrusive_ptr;
+
+template <class _Ty>
+struct __enable_intrusive_from_this
+{
+    __intrusive_ptr<_Ty> __intrusive_from_this() noexcept;
+    __intrusive_ptr<const _Ty> __intrusive_from_this() const noexcept;
+
+  private:
+    friend _Ty;
+    void __inc_ref() noexcept;
+    void __dec_ref() noexcept;
+};
+
+STDEXEC_PRAGMA_PUSH()
+STDEXEC_PRAGMA_IGNORE_GNU("-Wtsan")
 
 template <class _Ty>
 struct __control_block
@@ -58,8 +77,27 @@
     {
         return *(_Ty*)__value_;
     }
+
+    void __inc_ref_() noexcept
+    {
+        __refcount_.fetch_add(1, std::memory_order_relaxed);
+    }
+
+    void __dec_ref_() noexcept
+    {
+        if (1u == __refcount_.fetch_sub(1, std::memory_order_release))
+        {
+            std::atomic_thread_fence(std::memory_order_acquire);
+            // TSan does not support std::atomic_thread_fence, so we
+            // need to use the TSan-specific __tsan_acquire instead:
+            STDEXEC_TSAN(__tsan_acquire(&__refcount_));
+            delete this;
+        }
+    }
 };
 
+STDEXEC_PRAGMA_POP()
+
 template <class _Ty>
 class __intrusive_ptr
 {
@@ -73,25 +111,25 @@
         __data_(__data)
     {}
 
-    void __addref_() noexcept
+    void __inc_ref_() noexcept
     {
         if (__data_)
         {
-            __data_->__refcount_.fetch_add(1, std::memory_order_relaxed);
+            __data_->__inc_ref_();
         }
     }
 
-    void __release_() noexcept
+    void __dec_ref_() noexcept
     {
-        if (__data_ &&
-            1u == __data_->__refcount_.fetch_sub(1, std::memory_order_release))
+        if (__data_)
         {
-            std::atomic_thread_fence(std::memory_order_acquire);
-            delete __data_;
+            __data_->__dec_ref_();
         }
     }
 
   public:
+    using element_type = _Ty;
+
     __intrusive_ptr() = default;
 
     __intrusive_ptr(__intrusive_ptr&& __that) noexcept :
@@ -101,9 +139,14 @@
     __intrusive_ptr(const __intrusive_ptr& __that) noexcept :
         __data_(__that.__data_)
     {
-        __addref_();
+        __inc_ref_();
     }
 
+    __intrusive_ptr(__enable_intrusive_from_this<_Ty>* __that) noexcept :
+        __intrusive_ptr(__that ? __that->__intrusive_from_this()
+                               : __intrusive_ptr())
+    {}
+
     __intrusive_ptr& operator=(__intrusive_ptr&& __that) noexcept
     {
         [[maybe_unused]] __intrusive_ptr __old{
@@ -116,9 +159,16 @@
         return operator=(__intrusive_ptr(__that));
     }
 
+    __intrusive_ptr&
+        operator=(__enable_intrusive_from_this<_Ty>* __that) noexcept
+    {
+        return operator=(__that ? __that->__intrusive_from_this()
+                                : __intrusive_ptr());
+    }
+
     ~__intrusive_ptr()
     {
-        __release_();
+        __dec_ref_();
     }
 
     void reset() noexcept
@@ -165,26 +215,36 @@
 };
 
 template <class _Ty>
-struct __enable_intrusive_from_this
+__intrusive_ptr<_Ty>
+    __enable_intrusive_from_this<_Ty>::__intrusive_from_this() noexcept
 {
-    __intrusive_ptr<_Ty> __intrusive_from_this() noexcept
-    {
-        static_assert(0 == offsetof(__control_block<_Ty>, __value_));
-        _Ty* __this = static_cast<_Ty*>(this);
-        __intrusive_ptr<_Ty> __p{(__control_block<_Ty>*)__this};
-        __p.__addref_();
-        return __p;
-    }
+    auto* __data = (__control_block<_Ty>*)static_cast<_Ty*>(this);
+    __data->__inc_ref_();
+    return __intrusive_ptr<_Ty>{__data};
+}
 
-    __intrusive_ptr<const _Ty> __intrusive_from_this() const noexcept
-    {
-        static_assert(0 == offsetof(__control_block<_Ty>, __value_));
-        const _Ty* __this = static_cast<const _Ty*>(this);
-        __intrusive_ptr<const _Ty> __p{(__control_block<_Ty>*)__this};
-        __p.__addref_();
-        return __p;
-    }
-};
+template <class _Ty>
+__intrusive_ptr<const _Ty>
+    __enable_intrusive_from_this<_Ty>::__intrusive_from_this() const noexcept
+{
+    auto* __data = (__control_block<_Ty>*)static_cast<const _Ty*>(this);
+    __data->__inc_ref_();
+    return __intrusive_ptr<const _Ty>{__data};
+}
+
+template <class _Ty>
+void __enable_intrusive_from_this<_Ty>::__inc_ref() noexcept
+{
+    auto* __data = (__control_block<_Ty>*)static_cast<_Ty*>(this);
+    __data->__inc_ref_();
+}
+
+template <class _Ty>
+void __enable_intrusive_from_this<_Ty>::__dec_ref() noexcept
+{
+    auto* __data = (__control_block<_Ty>*)static_cast<_Ty*>(this);
+    __data->__dec_ref_();
+}
 
 template <class _Ty>
 struct __make_intrusive_t
diff --git a/include/sdbusplus/async/stdexec/__detail/__meta.hpp b/include/sdbusplus/async/stdexec/__detail/__meta.hpp
index 14c5bf2..8ddc0de 100644
--- a/include/sdbusplus/async/stdexec/__detail/__meta.hpp
+++ b/include/sdbusplus/async/stdexec/__detail/__meta.hpp
@@ -401,11 +401,11 @@
     requires(sizeof...(_False) <= 1)
 using __if_c = __minvoke<__if_::__<_Pred>, _True, _False...>;
 
-template <class _Pred, class _True, class... _False>
-using __minvoke_if = __minvoke<__if<_Pred, _True, _False...>>;
+template <class _Pred, class _True, class _False, class... _Args>
+using __minvoke_if = __minvoke<__if<_Pred, _True, _False>, _Args...>;
 
-template <bool _Pred, class _True, class... _False>
-using __minvoke_if_c = __minvoke<__if_c<_Pred, _True, _False...>>;
+template <bool _Pred, class _True, class _False, class... _Args>
+using __minvoke_if_c = __minvoke<__if_c<_Pred, _True, _False>, _Args...>;
 
 template <class _Tp>
 struct __mconst
@@ -744,23 +744,11 @@
     using __f = __minvoke<_Continuation, _Ts...>;
 };
 
-// For hiding a template type parameter from ADL
-template <class _Ty>
-struct _Xp
-{
-    using __t = struct _Up
-    {
-        using __t = _Ty;
-    };
-};
-template <class _Ty>
-using __x = __t<_Xp<_Ty>>;
-
 template <class _Ty>
 concept __has_id = requires { typename _Ty::__id; };
 
 template <class _Ty>
-struct _Yp
+struct _Id
 {
     using __t = _Ty;
 
@@ -783,7 +771,7 @@
 struct __id_<false>
 {
     template <class _Ty>
-    using __f = _Yp<_Ty>;
+    using __f = _Id<_Ty>;
 };
 template <class _Ty>
 using __id = __minvoke<__id_<__has_id<_Ty>>, _Ty>;
diff --git a/include/sdbusplus/async/stdexec/__detail/__utility.hpp b/include/sdbusplus/async/stdexec/__detail/__utility.hpp
new file mode 100644
index 0000000..4d6a1c1
--- /dev/null
+++ b/include/sdbusplus/async/stdexec/__detail/__utility.hpp
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2023 NVIDIA Corporation
+ *
+ * Licensed under the Apache License Version 2.0 with LLVM Exceptions
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *   https://llvm.org/LICENSE.txt
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include "__config.hpp"
+#include "__type_traits.hpp"
+
+#include <type_traits>
+
+namespace stdexec
+{
+namespace __detail
+{
+template <class _Cpcvref>
+inline constexpr auto __forward_like =
+    []<class _Uy>(_Uy&& __uy) noexcept -> auto&& {
+    return static_cast<
+        typename _Cpcvref::template __f<std::remove_reference_t<_Uy>>>(__uy);
+};
+}
+
+template <class _Ty>
+inline constexpr const auto& __forward_like =
+    __detail::__forward_like<__copy_cvref_fn<_Ty&&>>;
+} // namespace stdexec
diff --git a/include/sdbusplus/async/stdexec/async_scope.hpp b/include/sdbusplus/async/stdexec/async_scope.hpp
index 4109299..8c2483f 100644
--- a/include/sdbusplus/async/stdexec/async_scope.hpp
+++ b/include/sdbusplus/async/stdexec/async_scope.hpp
@@ -58,100 +58,104 @@
 
 ////////////////////////////////////////////////////////////////////////////
 // async_scope::when_empty implementation
-template <class _ReceiverId>
-struct __when_empty_op_base : __task
-{
-    using _Receiver = __t<_ReceiverId>;
-    STDEXEC_ATTRIBUTE((no_unique_address)) _Receiver __rcvr_;
-};
-
 template <class _ConstrainedId, class _ReceiverId>
-struct __when_empty_op : __task
+struct __when_empty_op
 {
-    using _Constrained = __t<_ConstrainedId>;
-    using _Receiver = __t<_ReceiverId>;
+    using _Constrained = __cvref_t<_ConstrainedId>;
+    using _Receiver = stdexec::__t<_ReceiverId>;
 
-    explicit __when_empty_op(const __impl* __scope, _Constrained&& __sndr,
-                             _Receiver __rcvr) :
-        __task{{}, __scope, __notify_waiter},
-        __op_(stdexec::connect((_Constrained&&)__sndr, (_Receiver&&)__rcvr))
-    {}
-
-  private:
-    static void __notify_waiter(__task* __self) noexcept
+    struct __t : __task
     {
-        start(static_cast<__when_empty_op*>(__self)->__op_);
-    }
+        using __id = __when_empty_op;
 
-    void __start_() noexcept
-    {
-        std::unique_lock __guard{this->__scope_->__lock_};
-        auto& __active = this->__scope_->__active_;
-        auto& __waiters = this->__scope_->__waiters_;
-        if (__active != 0)
+        explicit __t(const __impl* __scope, _Constrained&& __sndr,
+                     _Receiver __rcvr) :
+            __task{{}, __scope, __notify_waiter},
+            __op_(stdexec::connect((_Constrained&&)__sndr, (_Receiver&&)__rcvr))
+        {}
+
+      private:
+        static void __notify_waiter(__task* __self) noexcept
         {
-            __waiters.push_back(this);
-            return;
+            start(static_cast<__t*>(__self)->__op_);
         }
-        __guard.unlock();
-        start(this->__op_);
-    }
 
-    friend void tag_invoke(start_t, __when_empty_op& __self) noexcept
-    {
-        return __self.__start_();
-    }
+        void __start_() noexcept
+        {
+            std::unique_lock __guard{this->__scope_->__lock_};
+            auto& __active = this->__scope_->__active_;
+            auto& __waiters = this->__scope_->__waiters_;
+            if (__active != 0)
+            {
+                __waiters.push_back(this);
+                return;
+            }
+            __guard.unlock();
+            start(this->__op_);
+        }
 
-    STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
-    connect_result_t<_Constrained, _Receiver> __op_;
+        friend void tag_invoke(start_t, __t& __self) noexcept
+        {
+            return __self.__start_();
+        }
+
+        STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
+        connect_result_t<_Constrained, _Receiver> __op_;
+    };
 };
 
 template <class _ConstrainedId>
 struct __when_empty_sender
 {
-    using _Constrained = __t<_ConstrainedId>;
-    using sender_concept = stdexec::sender_t;
+    using _Constrained = stdexec::__t<_ConstrainedId>;
 
-    template <class _Self, class _Receiver>
-    using __when_empty_op_t =
-        __when_empty_op<__x<__copy_cvref_t<_Self, _Constrained>>,
-                        __x<_Receiver>>;
-
-    template <__decays_to<__when_empty_sender> _Self, receiver _Receiver>
-        requires sender_to<__copy_cvref_t<_Self, _Constrained>, _Receiver>
-    [[nodiscard]] friend __when_empty_op_t<_Self, _Receiver>
-        tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
+    struct __t
     {
-        return __when_empty_op_t<_Self, _Receiver>{
-            __self.__scope_, ((_Self&&)__self).__c_, (_Receiver&&)__rcvr};
-    }
+        using __id = __when_empty_sender;
+        using sender_concept = stdexec::sender_t;
 
-    template <__decays_to<__when_empty_sender> _Self, class _Env>
-    friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
-        -> completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
-                                      __env_t<_Env>>
-    {
-        return {};
-    }
+        template <class _Self, class _Receiver>
+        using __when_empty_op_t =
+            stdexec::__t<__when_empty_op<__cvref_id<_Self, _Constrained>,
+                                         stdexec::__id<_Receiver>>>;
 
-    friend empty_env tag_invoke(get_env_t, const __when_empty_sender&) noexcept
-    {
-        return {};
-    }
+        template <__decays_to<__t> _Self, receiver _Receiver>
+            requires sender_to<__copy_cvref_t<_Self, _Constrained>, _Receiver>
+        [[nodiscard]] friend __when_empty_op_t<_Self, _Receiver>
+            tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
+        {
+            return __when_empty_op_t<_Self, _Receiver>{
+                __self.__scope_, ((_Self&&)__self).__c_, (_Receiver&&)__rcvr};
+        }
 
-    const __impl* __scope_;
-    STDEXEC_ATTRIBUTE((no_unique_address)) _Constrained __c_;
+        template <__decays_to<__t> _Self, class _Env>
+        friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
+            -> completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
+                                          __env_t<_Env>>
+        {
+            return {};
+        }
+
+        friend empty_env tag_invoke(get_env_t, const __t&) noexcept
+        {
+            return {};
+        }
+
+        const __impl* __scope_;
+        STDEXEC_ATTRIBUTE((no_unique_address)) _Constrained __c_;
+    };
 };
 
 template <class _Constrained>
-using __when_empty_sender_t = __when_empty_sender<__x<__decay_t<_Constrained>>>;
+using __when_empty_sender_t =
+    stdexec::__t<__when_empty_sender<__id<__decay_t<_Constrained>>>>;
 
 ////////////////////////////////////////////////////////////////////////////
 // async_scope::nest implementation
 template <class _ReceiverId>
 struct __nest_op_base : __immovable
 {
-    using _Receiver = __t<_ReceiverId>;
+    using _Receiver = stdexec::__t<_ReceiverId>;
     const __impl* __scope_;
     STDEXEC_ATTRIBUTE((no_unique_address)) _Receiver __rcvr_;
 };
@@ -159,121 +163,140 @@
 template <class _ReceiverId>
 struct __nest_rcvr
 {
-    using receiver_concept = stdexec::receiver_t;
-    using _Receiver = __t<_ReceiverId>;
-    __nest_op_base<_ReceiverId>* __op_;
+    using _Receiver = stdexec::__t<_ReceiverId>;
 
-    static void __complete(const __impl* __scope) noexcept
+    struct __t
     {
-        std::unique_lock __guard{__scope->__lock_};
-        auto& __active = __scope->__active_;
-        if (--__active == 0)
+        using __id = __nest_rcvr;
+        using receiver_concept = stdexec::receiver_t;
+        __nest_op_base<_ReceiverId>* __op_;
+
+        static void __complete(const __impl* __scope) noexcept
         {
-            auto __local = std::move(__scope->__waiters_);
-            __guard.unlock();
-            __scope = nullptr;
-            // do not access __scope
-            while (!__local.empty())
+            std::unique_lock __guard{__scope->__lock_};
+            auto& __active = __scope->__active_;
+            if (--__active == 0)
             {
-                auto* __next = __local.pop_front();
-                __next->__notify_waiter(__next);
-                // __scope must be considered deleted
+                auto __local = std::move(__scope->__waiters_);
+                __guard.unlock();
+                __scope = nullptr;
+                // do not access __scope
+                while (!__local.empty())
+                {
+                    auto* __next = __local.pop_front();
+                    __next->__notify_waiter(__next);
+                    // __scope must be considered deleted
+                }
             }
         }
-    }
 
-    template <__completion_tag _Tag, class... _As>
-        requires __callable<_Tag, _Receiver, _As...>
-    friend void tag_invoke(_Tag, __nest_rcvr&& __self, _As&&... __as) noexcept
-    {
-        auto __scope = __self.__op_->__scope_;
-        _Tag{}(std::move(__self.__op_->__rcvr_), (_As&&)__as...);
-        // do not access __op_
-        // do not access this
-        __complete(__scope);
-    }
+        template <__completion_tag _Tag, class... _As>
+            requires __callable<_Tag, _Receiver, _As...>
+        friend void tag_invoke(_Tag, __t&& __self, _As&&... __as) noexcept
+        {
+            auto __scope = __self.__op_->__scope_;
+            _Tag{}(std::move(__self.__op_->__rcvr_), (_As&&)__as...);
+            // do not access __op_
+            // do not access this
+            __complete(__scope);
+        }
 
-    friend __env_t<env_of_t<_Receiver>>
-        tag_invoke(get_env_t, const __nest_rcvr& __self) noexcept
-    {
-        return make_env(
-            get_env(__self.__op_->__rcvr_),
-            with(get_stop_token,
-                 __self.__op_->__scope_->__stop_source_.get_token()));
-    }
+        friend __env_t<env_of_t<_Receiver>>
+            tag_invoke(get_env_t, const __t& __self) noexcept
+        {
+            return make_env(
+                get_env(__self.__op_->__rcvr_),
+                with(get_stop_token,
+                     __self.__op_->__scope_->__stop_source_.get_token()));
+        }
+    };
 };
 
 template <class _ConstrainedId, class _ReceiverId>
-struct __nest_op : __nest_op_base<_ReceiverId>
+struct __nest_op
 {
-    using _Constrained = __t<_ConstrainedId>;
-    using _Receiver = __t<_ReceiverId>;
-    STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
-    connect_result_t<_Constrained, __nest_rcvr<_ReceiverId>> __op_;
+    using _Constrained = stdexec::__t<_ConstrainedId>;
+    using _Receiver = stdexec::__t<_ReceiverId>;
 
-    template <__decays_to<_Constrained> _Sender, __decays_to<_Receiver> _Rcvr>
-    explicit __nest_op(const __impl* __scope, _Sender&& __c, _Rcvr&& __rcvr) :
-        __nest_op_base<_ReceiverId>{{}, __scope, (_Rcvr&&)__rcvr},
-        __op_(stdexec::connect((_Sender&&)__c, __nest_rcvr<_ReceiverId>{this}))
-    {}
-
-  private:
-    void __start_() noexcept
+    struct __t : __nest_op_base<_ReceiverId>
     {
-        STDEXEC_ASSERT(this->__scope_);
-        std::unique_lock __guard{this->__scope_->__lock_};
-        auto& __active = this->__scope_->__active_;
-        ++__active;
-        __guard.unlock();
-        start(__op_);
-    }
+        using __id = __nest_op;
+        using __nest_rcvr_t = stdexec::__t<__nest_rcvr<_ReceiverId>>;
+        STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
+        connect_result_t<_Constrained, __nest_rcvr_t> __op_;
 
-    friend void tag_invoke(start_t, __nest_op& __self) noexcept
-    {
-        return __self.__start_();
-    }
+        template <__decays_to<_Constrained> _Sender,
+                  __decays_to<_Receiver> _Rcvr>
+        explicit __t(const __impl* __scope, _Sender&& __c, _Rcvr&& __rcvr) :
+            __nest_op_base<_ReceiverId>{{}, __scope, (_Rcvr&&)__rcvr},
+            __op_(stdexec::connect((_Sender&&)__c, __nest_rcvr_t{this}))
+        {}
+
+      private:
+        void __start_() noexcept
+        {
+            STDEXEC_ASSERT(this->__scope_);
+            std::unique_lock __guard{this->__scope_->__lock_};
+            auto& __active = this->__scope_->__active_;
+            ++__active;
+            __guard.unlock();
+            start(__op_);
+        }
+
+        friend void tag_invoke(start_t, __t& __self) noexcept
+        {
+            return __self.__start_();
+        }
+    };
 };
 
 template <class _ConstrainedId>
 struct __nest_sender
 {
-    using _Constrained = __t<_ConstrainedId>;
-    using sender_concept = stdexec::sender_t;
-
-    const __impl* __scope_;
-    STDEXEC_ATTRIBUTE((no_unique_address)) _Constrained __c_;
-
-    template <class _Receiver>
-    using __nest_operation_t = __nest_op<_ConstrainedId, __x<_Receiver>>;
-    template <class _Receiver>
-    using __nest_receiver_t = __nest_rcvr<__x<_Receiver>>;
-
-    template <__decays_to<__nest_sender> _Self, receiver _Receiver>
-        requires sender_to<__copy_cvref_t<_Self, _Constrained>,
-                           __nest_receiver_t<_Receiver>>
-    [[nodiscard]] friend __nest_operation_t<_Receiver>
-        tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
+    using _Constrained = stdexec::__t<_ConstrainedId>;
+    struct __t
     {
-        return __nest_operation_t<_Receiver>{
-            __self.__scope_, ((_Self&&)__self).__c_, (_Receiver&&)__rcvr};
-    }
+        using __id = __nest_sender;
+        using sender_concept = stdexec::sender_t;
 
-    template <__decays_to<__nest_sender> _Self, class _Env>
-    friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
-        -> completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
-                                      __env_t<_Env>>
-    {
-        return {};
-    }
+        const __impl* __scope_;
+        STDEXEC_ATTRIBUTE((no_unique_address)) _Constrained __c_;
 
-    friend empty_env tag_invoke(get_env_t, const __nest_sender&) noexcept
-    {
-        return {};
-    }
+        template <class _Receiver>
+        using __nest_operation_t =
+            stdexec::__t<__nest_op<_ConstrainedId, stdexec::__id<_Receiver>>>;
+        template <class _Receiver>
+        using __nest_receiver_t =
+            stdexec::__t<__nest_rcvr<stdexec::__id<_Receiver>>>;
+
+        template <__decays_to<__t> _Self, receiver _Receiver>
+            requires sender_to<__copy_cvref_t<_Self, _Constrained>,
+                               __nest_receiver_t<_Receiver>>
+        [[nodiscard]] friend __nest_operation_t<_Receiver>
+            tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
+        {
+            return __nest_operation_t<_Receiver>{
+                __self.__scope_, ((_Self&&)__self).__c_, (_Receiver&&)__rcvr};
+        }
+
+        template <__decays_to<__t> _Self, class _Env>
+        friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
+            -> completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
+                                          __env_t<_Env>>
+        {
+            return {};
+        }
+
+        friend empty_env tag_invoke(get_env_t, const __t&) noexcept
+        {
+            return {};
+        }
+    };
 };
 
 template <class _Constrained>
-using __nest_sender_t = __nest_sender<__x<__decay_t<_Constrained>>>;
+using __nest_sender_t =
+    stdexec::__t<__nest_sender<__id<__decay_t<_Constrained>>>>;
 
 ////////////////////////////////////////////////////////////////////////////
 // async_scope::spawn_future implementation
@@ -312,130 +335,134 @@
 };
 
 template <class _SenderId, class _EnvId, class _ReceiverId>
-class __future_op : __subscription
+struct __future_op
 {
-    using _Sender = __t<_SenderId>;
-    using _Env = __t<_EnvId>;
-    using _Receiver = __t<_ReceiverId>;
+    using _Sender = stdexec::__t<_SenderId>;
+    using _Env = stdexec::__t<_EnvId>;
+    using _Receiver = stdexec::__t<_ReceiverId>;
 
-    using __forward_consumer = typename stop_token_of_t<
-        env_of_t<_Receiver>>::template callback_type<__forward_stopped>;
-
-    friend void tag_invoke(start_t, __future_op& __self) noexcept
+    class __t : __subscription
     {
-        __self.__start_();
-    }
+        using __forward_consumer = typename stop_token_of_t<
+            env_of_t<_Receiver>>::template callback_type<__forward_stopped>;
 
-    void __complete_() noexcept
-    {
-        try
+        friend void tag_invoke(start_t, __t& __self) noexcept
         {
-            auto __state = std::move(__state_);
-            STDEXEC_ASSERT(__state != nullptr);
-            std::unique_lock __guard{__state->__mutex_};
-            // either the future is still in use or it has passed ownership to
-            // __state->__no_future_
-            if (__state->__no_future_.get() != nullptr ||
-                __state->__step_ != __future_step::__future)
-            {
-                // invalid state - there is a code bug in the state machine
-                std::terminate();
-            }
-            else if (get_stop_token(get_env(__rcvr_)).stop_requested())
-            {
-                __guard.unlock();
-                set_stopped((_Receiver&&)__rcvr_);
-                __guard.lock();
-            }
-            else
-            {
-                std::visit(
-                    [this, &__guard]<class _Tup>(_Tup& __tup) {
-                    if constexpr (same_as<_Tup, std::monostate>)
-                    {
-                        std::terminate();
-                    }
-                    else
-                    {
-                        std::apply(
-                            [this, &__guard]<class... _As>(auto tag,
-                                                           _As&... __as) {
-                            __guard.unlock();
-                            tag((_Receiver&&)__rcvr_, (_As&&)__as...);
-                            __guard.lock();
-                        },
-                            __tup);
-                    }
-                },
-                    __state->__data_);
-            }
+            __self.__start_();
         }
-        catch (...)
-        {
-            set_error((_Receiver&&)__rcvr_, std::current_exception());
-        }
-    }
 
-    void __start_() noexcept
-    {
-        try
+        void __complete_() noexcept
         {
-            if (!!__state_)
+            try
             {
-                std::unique_lock __guard{__state_->__mutex_};
-                if (__state_->__data_.index() != 0)
+                auto __state = std::move(__state_);
+                STDEXEC_ASSERT(__state != nullptr);
+                std::unique_lock __guard{__state->__mutex_};
+                // either the future is still in use or it has passed ownership
+                // to __state->__no_future_
+                if (__state->__no_future_.get() != nullptr ||
+                    __state->__step_ != __future_step::__future)
+                {
+                    // invalid state - there is a code bug in the state machine
+                    std::terminate();
+                }
+                else if (get_stop_token(get_env(__rcvr_)).stop_requested())
                 {
                     __guard.unlock();
-                    __complete_();
+                    set_stopped((_Receiver&&)__rcvr_);
+                    __guard.lock();
                 }
                 else
                 {
-                    __state_->__subscribers_.push_back(this);
+                    std::visit(
+                        [this, &__guard]<class _Tup>(_Tup& __tup) {
+                        if constexpr (same_as<_Tup, std::monostate>)
+                        {
+                            std::terminate();
+                        }
+                        else
+                        {
+                            std::apply(
+                                [this, &__guard]<class... _As>(auto tag,
+                                                               _As&... __as) {
+                                __guard.unlock();
+                                tag((_Receiver&&)__rcvr_, (_As&&)__as...);
+                                __guard.lock();
+                            },
+                                __tup);
+                        }
+                    },
+                        __state->__data_);
                 }
             }
-        }
-        catch (...)
-        {
-            set_error((_Receiver&&)__rcvr_, std::current_exception());
-        }
-    }
-
-    STDEXEC_ATTRIBUTE((no_unique_address)) _Receiver __rcvr_;
-    std::unique_ptr<__future_state<_Sender, _Env>> __state_;
-    STDEXEC_ATTRIBUTE((no_unique_address))
-    __forward_consumer __forward_consumer_;
-
-  public:
-    ~__future_op() noexcept
-    {
-        if (__state_ != nullptr)
-        {
-            auto __raw_state = __state_.get();
-            std::unique_lock __guard{__raw_state->__mutex_};
-            if (__raw_state->__data_.index() > 0)
+            catch (...)
             {
-                // completed given sender
-                // state is no longer needed
-                return;
+                set_error((_Receiver&&)__rcvr_, std::current_exception());
             }
-            __raw_state->__no_future_ = std::move(__state_);
-            __raw_state->__step_from_to_(__guard, __future_step::__future,
-                                         __future_step::__no_future);
         }
-    }
 
-    template <class _Receiver2>
-    explicit __future_op(
-        _Receiver2&& __rcvr,
-        std::unique_ptr<__future_state<_Sender, _Env>> __state) :
-        __subscription{{},
-                       [](__subscription* __self) noexcept -> void {
-        static_cast<__future_op*>(__self)->__complete_();
-    }},
-        __rcvr_((_Receiver2&&)__rcvr), __state_(std::move(__state)),
-        __forward_consumer_(get_stop_token(get_env(__rcvr_)),
-                            __forward_stopped{&__state_->__stop_source_})
-    {}
+        void __start_() noexcept
+        {
+            try
+            {
+                if (!!__state_)
+                {
+                    std::unique_lock __guard{__state_->__mutex_};
+                    if (__state_->__data_.index() != 0)
+                    {
+                        __guard.unlock();
+                        __complete_();
+                    }
+                    else
+                    {
+                        __state_->__subscribers_.push_back(this);
+                    }
+                }
+            }
+            catch (...)
+            {
+                set_error((_Receiver&&)__rcvr_, std::current_exception());
+            }
+        }
+
+        STDEXEC_ATTRIBUTE((no_unique_address)) _Receiver __rcvr_;
+        std::unique_ptr<__future_state<_Sender, _Env>> __state_;
+        STDEXEC_ATTRIBUTE((no_unique_address))
+        __forward_consumer __forward_consumer_;
+
+      public:
+        using __id = __future_op;
+
+        ~__t() noexcept
+        {
+            if (__state_ != nullptr)
+            {
+                auto __raw_state = __state_.get();
+                std::unique_lock __guard{__raw_state->__mutex_};
+                if (__raw_state->__data_.index() > 0)
+                {
+                    // completed given sender
+                    // state is no longer needed
+                    return;
+                }
+                __raw_state->__no_future_ = std::move(__state_);
+                __raw_state->__step_from_to_(__guard, __future_step::__future,
+                                             __future_step::__no_future);
+            }
+        }
+
+        template <class _Receiver2>
+        explicit __t(_Receiver2&& __rcvr,
+                     std::unique_ptr<__future_state<_Sender, _Env>> __state) :
+            __subscription{{},
+                           [](__subscription* __self) noexcept -> void {
+            static_cast<__t*>(__self)->__complete_();
+        }},
+            __rcvr_((_Receiver2&&)__rcvr), __state_(std::move(__state)),
+            __forward_consumer_(get_stop_token(get_env(__rcvr_)),
+                                __forward_stopped{&__state_->__stop_source_})
+        {}
+    };
 };
 
 #if STDEXEC_NVHPC()
@@ -448,7 +475,7 @@
     using __t = std::tuple<_Tag, _Ts...>;
 };
 template <class _Fn>
-using __completion_as_tuple_t = __t<__completion_as_tuple2_<_Fn>>;
+using __completion_as_tuple_t = stdexec::__t<__completion_as_tuple2_<_Fn>>;
 
 #else
 
@@ -555,69 +582,74 @@
     __env_t<_Env> __env_;
 };
 
-template <class _CompletionsId, class _EnvId>
+template <class _Completions, class _EnvId>
 struct __future_rcvr
 {
-    using receiver_concept = stdexec::receiver_t;
-    using _Completions = __t<_CompletionsId>;
-    using _Env = __t<_EnvId>;
-    __future_state_base<_Completions, _Env>* __state_;
-    const __impl* __scope_;
+    using _Env = stdexec::__t<_EnvId>;
 
-    void __dispatch_result_() noexcept
+    struct __t
     {
-        auto& __state = *__state_;
-        std::unique_lock __guard{__state.__mutex_};
-        auto __local = std::move(__state.__subscribers_);
-        __state.__forward_scope_ = std::nullopt;
-        if (__state.__no_future_.get() != nullptr)
-        {
-            // nobody is waiting for the results
-            // delete this and return
-            __state.__step_from_to_(__guard, __future_step::__no_future,
-                                    __future_step::__deleted);
-            __guard.unlock();
-            __state.__no_future_.reset();
-            return;
-        }
-        __guard.unlock();
-        while (!__local.empty())
-        {
-            auto* __sub = __local.pop_front();
-            __sub->__complete();
-        }
-    }
+        using __id = __future_rcvr;
+        using receiver_concept = stdexec::receiver_t;
+        __future_state_base<_Completions, _Env>* __state_;
+        const __impl* __scope_;
 
-    template <__completion_tag _Tag, __movable_value... _As>
-    friend void tag_invoke(_Tag, __future_rcvr&& __self, _As&&... __as) noexcept
-    {
-        auto& __state = *__self.__state_;
-        try
+        void __dispatch_result_() noexcept
         {
+            auto& __state = *__state_;
             std::unique_lock __guard{__state.__mutex_};
-            using _Tuple = __decayed_tuple<_Tag, _As...>;
-            __state.__data_.template emplace<_Tuple>(_Tag{}, (_As&&)__as...);
+            auto __local = std::move(__state.__subscribers_);
+            __state.__forward_scope_ = std::nullopt;
+            if (__state.__no_future_.get() != nullptr)
+            {
+                // nobody is waiting for the results
+                // delete this and return
+                __state.__step_from_to_(__guard, __future_step::__no_future,
+                                        __future_step::__deleted);
+                __guard.unlock();
+                __state.__no_future_.reset();
+                return;
+            }
             __guard.unlock();
-            __self.__dispatch_result_();
+            while (!__local.empty())
+            {
+                auto* __sub = __local.pop_front();
+                __sub->__complete();
+            }
         }
-        catch (...)
-        {
-            using _Tuple = std::tuple<set_error_t, std::exception_ptr>;
-            __state.__data_.template emplace<_Tuple>(set_error_t{},
-                                                     std::current_exception());
-        }
-    }
 
-    friend const __env_t<_Env>& tag_invoke(get_env_t,
-                                           const __future_rcvr& __self) noexcept
-    {
-        return __self.__state_->__env_;
-    }
+        template <__completion_tag _Tag, __movable_value... _As>
+        friend void tag_invoke(_Tag, __t&& __self, _As&&... __as) noexcept
+        {
+            auto& __state = *__self.__state_;
+            try
+            {
+                std::unique_lock __guard{__state.__mutex_};
+                using _Tuple = __decayed_tuple<_Tag, _As...>;
+                __state.__data_.template emplace<_Tuple>(_Tag{},
+                                                         (_As&&)__as...);
+                __guard.unlock();
+                __self.__dispatch_result_();
+            }
+            catch (...)
+            {
+                using _Tuple = std::tuple<set_error_t, std::exception_ptr>;
+                __state.__data_.template emplace<_Tuple>(
+                    set_error_t{}, std::current_exception());
+            }
+        }
+
+        friend const __env_t<_Env>& tag_invoke(get_env_t,
+                                               const __t& __self) noexcept
+        {
+            return __self.__state_->__env_;
+        }
+    };
 };
 
 template <class _Sender, class _Env>
 using __future_receiver_t =
-    __future_rcvr<__x<__future_completions_t<_Sender, _Env>>, __x<_Env>>;
+    __t<__future_rcvr<__future_completions_t<_Sender, _Env>, __id<_Env>>>;
 
 template <class _Sender, class _Env>
 struct __future_state :
@@ -636,77 +668,84 @@
 };
 
 template <class _SenderId, class _EnvId>
-class __future
+struct __future
 {
-    using _Sender = __t<_SenderId>;
-    using _Env = __t<_EnvId>;
-    friend struct async_scope;
+    using _Sender = stdexec::__t<_SenderId>;
+    using _Env = stdexec::__t<_EnvId>;
 
-  public:
-    using sender_concept = stdexec::sender_t;
-
-    __future(__future&&) = default;
-    __future& operator=(__future&&) = default;
-
-    ~__future() noexcept
+    struct __t
     {
-        if (__state_ != nullptr)
+        using __id = __future;
+        using sender_concept = stdexec::sender_t;
+
+        __t(__t&&) = default;
+        __t& operator=(__t&&) = default;
+
+        ~__t() noexcept
         {
-            auto __raw_state = __state_.get();
-            std::unique_lock __guard{__raw_state->__mutex_};
-            if (__raw_state->__data_.index() != 0)
+            if (__state_ != nullptr)
             {
-                // completed given sender
-                // state is no longer needed
-                return;
+                auto __raw_state = __state_.get();
+                std::unique_lock __guard{__raw_state->__mutex_};
+                if (__raw_state->__data_.index() != 0)
+                {
+                    // completed given sender
+                    // state is no longer needed
+                    return;
+                }
+                __raw_state->__no_future_ = std::move(__state_);
+                __raw_state->__step_from_to_(__guard, __future_step::__future,
+                                             __future_step::__no_future);
             }
-            __raw_state->__no_future_ = std::move(__state_);
-            __raw_state->__step_from_to_(__guard, __future_step::__future,
-                                         __future_step::__no_future);
         }
-    }
 
-  private:
-    template <class _Self>
-    using __completions_t =
-        __future_completions_t<__mfront<_Sender, _Self>, _Env>;
+      private:
+        friend struct async_scope;
+        template <class _Self>
+        using __completions_t =
+            __future_completions_t<__mfront<_Sender, _Self>, _Env>;
 
-    explicit __future(
-        std::unique_ptr<__future_state<_Sender, _Env>> __state) noexcept :
-        __state_(std::move(__state))
-    {
-        std::unique_lock __guard{__state_->__mutex_};
-        __state_->__step_from_to_(__guard, __future_step::__created,
-                                  __future_step::__future);
-    }
+        template <class _Receiver>
+        using __future_op_t = stdexec::__t<
+            __future_op<_SenderId, _EnvId, stdexec::__id<_Receiver>>>;
 
-    template <__decays_to<__future> _Self, receiver _Receiver>
-        requires receiver_of<_Receiver, __completions_t<_Self>>
-    friend __future_op<_SenderId, _EnvId, __x<_Receiver>>
-        tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
-    {
-        return __future_op<_SenderId, _EnvId, __x<_Receiver>>{
-            (_Receiver&&)__rcvr, std::move(__self.__state_)};
-    }
+        explicit __t(
+            std::unique_ptr<__future_state<_Sender, _Env>> __state) noexcept :
+            __state_(std::move(__state))
+        {
+            std::unique_lock __guard{__state_->__mutex_};
+            __state_->__step_from_to_(__guard, __future_step::__created,
+                                      __future_step::__future);
+        }
 
-    template <__decays_to<__future> _Self, class _OtherEnv>
-    friend auto tag_invoke(get_completion_signatures_t, _Self&&, _OtherEnv&&)
-        -> __completions_t<_Self>
-    {
-        return {};
-    }
+        template <__decays_to<__t> _Self, receiver _Receiver>
+            requires receiver_of<_Receiver, __completions_t<_Self>>
+        friend __future_op_t<_Receiver> tag_invoke(connect_t, _Self&& __self,
+                                                   _Receiver __rcvr)
+        {
+            return __future_op_t<_Receiver>{(_Receiver&&)__rcvr,
+                                            std::move(__self.__state_)};
+        }
 
-    friend empty_env tag_invoke(get_env_t, const __future&) noexcept
-    {
-        return {};
-    }
+        template <__decays_to<__t> _Self, class _OtherEnv>
+        friend auto tag_invoke(get_completion_signatures_t, _Self&&,
+                               _OtherEnv&&) -> __completions_t<_Self>
+        {
+            return {};
+        }
 
-    std::unique_ptr<__future_state<_Sender, _Env>> __state_;
+        friend empty_env tag_invoke(get_env_t, const __t&) noexcept
+        {
+            return {};
+        }
+
+        std::unique_ptr<__future_state<_Sender, _Env>> __state_;
+    };
 };
 
 template <class _Sender, class _Env>
-using __future_t =
-    __future<__x<__nest_sender_t<_Sender>>, __x<__decay_t<_Env>>>;
+using __future_t = stdexec::__t<
+    __future<__id<__nest_sender_t<_Sender>>, __id<__decay_t<_Env>>>>;
 
 ////////////////////////////////////////////////////////////////////////////
 // async_scope::spawn implementation
@@ -719,7 +758,7 @@
 template <class _EnvId>
 struct __spawn_op_base
 {
-    using _Env = __t<_EnvId>;
+    using _Env = stdexec::__t<_EnvId>;
     __spawn_env_t<_Env> __env_;
     void (*__delete_)(__spawn_op_base*);
 };
@@ -727,70 +766,77 @@
 template <class _EnvId>
 struct __spawn_rcvr
 {
-    using receiver_concept = stdexec::receiver_t;
-    using _Env = __t<_EnvId>;
-    __spawn_op_base<_EnvId>* __op_;
-    const __impl* __scope_;
+    using _Env = stdexec::__t<_EnvId>;
 
-    template <__one_of<set_value_t, set_stopped_t> _Tag>
-    friend void tag_invoke(_Tag, __spawn_rcvr&& __self) noexcept
+    struct __t
     {
-        __self.__op_->__delete_(__self.__op_);
-    }
+        using __id = __spawn_rcvr;
+        using receiver_concept = stdexec::receiver_t;
+        __spawn_op_base<_EnvId>* __op_;
 
-    // BUGBUG NOT TO SPEC spawn shouldn't accept senders that can fail.
-    template <same_as<set_error_t> _Tag>
-    [[noreturn]] friend void tag_invoke(_Tag, __spawn_rcvr&&,
-                                        const std::exception_ptr&) noexcept
-    {
-        std::terminate();
-    }
+        template <__one_of<set_value_t, set_stopped_t> _Tag>
+        friend void tag_invoke(_Tag, __t&& __self) noexcept
+        {
+            __self.__op_->__delete_(__self.__op_);
+        }
 
-    friend const __spawn_env_t<_Env>&
-        tag_invoke(get_env_t, const __spawn_rcvr& __self) noexcept
-    {
-        return __self.__op_->__env_;
-    }
+        // BUGBUG NOT TO SPEC spawn shouldn't accept senders that can fail.
+        template <same_as<set_error_t> _Tag>
+        [[noreturn]] friend void tag_invoke(_Tag, __t&&,
+                                            const std::exception_ptr&) noexcept
+        {
+            std::terminate();
+        }
+
+        friend const __spawn_env_t<_Env>& tag_invoke(get_env_t,
+                                                     const __t& __self) noexcept
+        {
+            return __self.__op_->__env_;
+        }
+    };
 };
 
 template <class _Env>
-using __spawn_receiver_t = __spawn_rcvr<__x<_Env>>;
+using __spawn_receiver_t = stdexec::__t<__spawn_rcvr<__id<_Env>>>;
 
 template <class _SenderId, class _EnvId>
-struct __spawn_op : __spawn_op_base<_EnvId>
+struct __spawn_op
 {
-    using _Env = __t<_EnvId>;
-    using _Sender = __t<_SenderId>;
+    using _Env = stdexec::__t<_EnvId>;
+    using _Sender = stdexec::__t<_SenderId>;
 
-    template <__decays_to<_Sender> _Sndr>
-    __spawn_op(_Sndr&& __sndr, _Env __env, const __impl* __scope) :
-        __spawn_op_base<_EnvId>{
-            __join_env(
-                (_Env&&)__env,
-                __mkprop(__scope->__stop_source_.get_token(), get_stop_token),
-                __mkprop(__inln::__scheduler{}, get_scheduler)),
-            [](__spawn_op_base<_EnvId>* __op) {
-        delete static_cast<__spawn_op*>(__op);
-    }},
-        __op_(stdexec::connect((_Sndr&&)__sndr,
-                               __spawn_receiver_t<_Env>{this, __scope}))
-    {}
-
-    void __start_() noexcept
+    struct __t : __spawn_op_base<_EnvId>
     {
-        start(__op_);
-    }
+        template <__decays_to<_Sender> _Sndr>
+        __t(_Sndr&& __sndr, _Env __env, const __impl* __scope) :
+            __spawn_op_base<_EnvId>{
+                __join_env((_Env&&)__env,
+                           __mkprop(__scope->__stop_source_.get_token(),
+                                    get_stop_token),
+                           __mkprop(__inln::__scheduler{}, get_scheduler)),
+                [](__spawn_op_base<_EnvId>* __op) {
+            delete static_cast<__t*>(__op);
+        }},
+            __op_(stdexec::connect((_Sndr&&)__sndr,
+                                   __spawn_receiver_t<_Env>{this}))
+        {}
 
-    friend void tag_invoke(start_t, __spawn_op& __self) noexcept
-    {
-        return __self.__start_();
-    }
+        void __start_() noexcept
+        {
+            start(__op_);
+        }
 
-    connect_result_t<_Sender, __spawn_receiver_t<_Env>> __op_;
+        friend void tag_invoke(start_t, __t& __self) noexcept
+        {
+            return __self.__start_();
+        }
+
+        connect_result_t<_Sender, __spawn_receiver_t<_Env>> __op_;
+    };
 };
 
 template <class _Sender, class _Env>
-using __spawn_operation_t = __spawn_op<__x<_Sender>, __x<_Env>>;
+using __spawn_operation_t = stdexec::__t<__spawn_op<__id<_Sender>, __id<_Env>>>;
 
 ////////////////////////////////////////////////////////////////////////////
 // async_scope
diff --git a/include/sdbusplus/async/stdexec/commit.info b/include/sdbusplus/async/stdexec/commit.info
index 4ec6e44..6788e94 100644
--- a/include/sdbusplus/async/stdexec/commit.info
+++ b/include/sdbusplus/async/stdexec/commit.info
@@ -1 +1 @@
-e05f158445ef957dc53731607d417ec6a7d834d7
+98f1f7a18dba497859b7705ce60d31e0ba3d204d
diff --git a/include/sdbusplus/async/stdexec/env.hpp b/include/sdbusplus/async/stdexec/env.hpp
index 32532d6..c77176f 100644
--- a/include/sdbusplus/async/stdexec/env.hpp
+++ b/include/sdbusplus/async/stdexec/env.hpp
@@ -58,40 +58,49 @@
 struct read_with_default_t;
 
 template <class _Tag, class _DefaultId, class _ReceiverId>
-struct __operation : __immovable
+struct __operation
 {
-    using _Default = __t<_DefaultId>;
-    using _Receiver = __t<_ReceiverId>;
+    using _Default = stdexec::__t<_DefaultId>;
+    using _Receiver = stdexec::__t<_ReceiverId>;
 
-    STDEXEC_ATTRIBUTE((no_unique_address)) _Default __default_;
-    _Receiver __rcvr_;
-
-    friend void tag_invoke(start_t, __operation& __self) noexcept
+    struct __t : __immovable
     {
-        try
+        using __id = __operation;
+
+        STDEXEC_ATTRIBUTE((no_unique_address)) _Default __default_;
+        _Receiver __rcvr_;
+
+        friend void tag_invoke(start_t, __t& __self) noexcept
         {
-            if constexpr (__callable<_Tag, env_of_t<_Receiver>>)
+            try
             {
-                const auto& __env = get_env(__self.__rcvr_);
-                set_value(std::move(__self.__rcvr_), _Tag{}(__env));
+                if constexpr (__callable<_Tag, env_of_t<_Receiver>>)
+                {
+                    const auto& __env = get_env(__self.__rcvr_);
+                    set_value(std::move(__self.__rcvr_), _Tag{}(__env));
+                }
+                else
+                {
+                    set_value(std::move(__self.__rcvr_),
+                              std::move(__self.__default_));
+                }
             }
-            else
+            catch (...)
             {
-                set_value(std::move(__self.__rcvr_),
-                          std::move(__self.__default_));
+                set_error(std::move(__self.__rcvr_), std::current_exception());
             }
         }
-        catch (...)
-        {
-            set_error(std::move(__self.__rcvr_), std::current_exception());
-        }
-    }
+    };
 };
 
-template <class _Tag, class _DefaultId>
+template <class _Tag, class _Default, class _Receiver>
+using __operation_t = __t<__operation<_Tag, __id<_Default>, __id<_Receiver>>>;
+
+template <class _Tag, class _Default>
 struct __sender
 {
-    using _Default = __t<_DefaultId>;
+    using __id = __sender;
+    using __t = __sender;
     using sender_concept = stdexec::sender_t;
     STDEXEC_ATTRIBUTE((no_unique_address)) _Default __default_;
 
@@ -109,8 +118,7 @@
         requires receiver_of<_Receiver, __completions_t<env_of_t<_Receiver>>>
     friend auto tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr) //
         noexcept(std::is_nothrow_move_constructible_v<_Receiver>)
-            -> __operation<_Tag, __x<__default_t<env_of_t<_Receiver>>>,
-                           __x<_Receiver>>
+            -> __operation_t<_Tag, __default_t<env_of_t<_Receiver>>, _Receiver>
     {
         return {{}, ((_Self&&)__self).__default_, (_Receiver&&)__rcvr};
     }
@@ -127,7 +135,7 @@
 {
     template <class _Tag, class _Default>
     constexpr auto operator()(_Tag, _Default&& __default) const
-        -> __sender<_Tag, __x<__decay_t<_Default>>>
+        -> __sender<_Tag, __decay_t<_Default>>
     {
         return {(_Default&&)__default};
     }
diff --git a/include/sdbusplus/async/stdexec/execution.hpp b/include/sdbusplus/async/stdexec/execution.hpp
index dce1a9f..87abcd9 100644
--- a/include/sdbusplus/async/stdexec/execution.hpp
+++ b/include/sdbusplus/async/stdexec/execution.hpp
@@ -22,6 +22,7 @@
 #include "__detail/__intrusive_ptr.hpp"
 #include "__detail/__meta.hpp"
 #include "__detail/__scope.hpp"
+#include "__detail/__utility.hpp"
 #include "concepts.hpp"
 #include "coroutine.hpp"
 #include "functional.hpp"
@@ -41,8 +42,6 @@
 #include <variant>
 
 STDEXEC_PRAGMA_PUSH()
-STDEXEC_PRAGMA_IGNORE_GNU("-Wpragmas")
-STDEXEC_PRAGMA_IGNORE_GNU("-Wunknown-warning-option")
 STDEXEC_PRAGMA_IGNORE_GNU("-Wundefined-inline")
 STDEXEC_PRAGMA_IGNORE_GNU("-Wsubobject-linkage")
 STDEXEC_PRAGMA_IGNORE_GNU("-Wmissing-braces")
@@ -871,9 +870,17 @@
         static_assert(sizeof(_Env),
                       "Incomplete type used with get_completion_signatures");
 
-        if constexpr (__with_tag_invoke<_Sender, _Env>)
+        // Compute the type of the transformed sender:
+        using _TfxSender = __tfx_sender<_Sender, _Env>;
+
+        if constexpr (__merror<_TfxSender>)
         {
-            using _TfxSender = __tfx_sender<_Sender, _Env>;
+            // Computing the type of the transformed sender returned an error
+            // type. Propagate it.
+            return (_TfxSender(*)()) nullptr;
+        }
+        else if constexpr (__with_tag_invoke<_Sender, _Env>)
+        {
             using _Result = tag_invoke_result_t<get_completion_signatures_t,
                                                 _TfxSender, _Env>;
             return (_Result(*)()) nullptr;
@@ -3019,6 +3026,10 @@
 } // namespace __then
 
 using __then::then_t;
+
+/// @brief The then sender adaptor, which invokes a function with the result of
+///        a sender, making the result available to the next receiver.
+/// @hideinitializer
 inline constexpr then_t then{};
 
 template <>
@@ -3325,9 +3336,32 @@
 {};
 
 ////////////////////////////////////////////////////////////////////////////
-// [execution.senders.adaptors.split]
-namespace __split
+// shared components of split and ensure_started
+//
+// The split and ensure_started algorithms are very similar in implementation.
+// The salient differences are:
+//
+// split: the input async operation is always connected. It is only
+//   started when one of the split senders is connected and started.
+//   split senders are copyable, so there are multiple operation states
+//   to be notified on completion. These are stored in an instrusive
+//   linked list.
+//
+// ensure_started: the input async operation is always started, so
+//   the internal receiver will always be completed. The ensure_started
+//   sender is move-only and single-shot, so there will only ever be one
+//   operation state to be notified on completion.
+//
+// The shared state should add-ref itself when the input async
+// operation is started and release itself when its completion
+// is notified.
+namespace __shared
 {
+template <class _BaseEnv>
+using __env_t =            //
+    __make_env_t<_BaseEnv, // BUGBUG NOT TO SPEC
+                 __with<get_stop_token_t, in_place_stop_token>>;
+
 struct __on_stop_request
 {
     in_place_stop_source& __stop_source_;
@@ -3341,58 +3375,107 @@
 template <class _Receiver>
 auto __notify_visitor(_Receiver&& __rcvr) noexcept
 {
-    return [&](const auto& __tupl) noexcept -> void {
+    return [&]<class _Tuple>(_Tuple&& __tupl) noexcept -> void {
         __apply(
-            [&](auto __tag, const auto&... __args) noexcept -> void {
-            __tag(std::move(__rcvr), __args...);
+            [&](auto __tag, auto&&... __args) noexcept -> void {
+            __tag(std::move(__rcvr), __forward_like<_Tuple>(__args)...);
         },
             __tupl);
     };
 }
 
-struct __split_state_base : __immovable
+enum class __action_kind : bool
 {
-    using __notify_fn = void(__split_state_base*) noexcept;
-
-    __split_state_base* __next_{};
-    __notify_fn* __notify_{};
+    __notify,
+    __detach
 };
 
-template <class _Sender, class _Receiver>
-struct __split_state :
-    __split_state_base,
-    __enable_receiver_from_this<_Sender, _Receiver>
+struct __local_state_base : __immovable
 {
-    using __shared_state_ptr = __decay_t<__data_of<_Sender>>;
-    using __on_stop_cb = //
-        typename stop_token_of_t<env_of_t<_Receiver>&>::template callback_type<
-            __on_stop_request>;
+    using __action_fn = void(__local_state_base*, __action_kind) noexcept;
 
-    explicit __split_state(_Sender&& __sndr) noexcept :
-        __split_state_base{{}, nullptr, __notify}, __on_stop_(),
-        __shared_state_(STDEXEC_CALL_EXPLICIT_THIS_MEMFN(
-            (_Sender&&)__sndr, apply)(__detail::__get_data()))
-    {}
-
-    static void __notify(__split_state_base* __self) noexcept
-    {
-        __split_state* __op = static_cast<__split_state*>(__self);
-        __op->__on_stop_.reset();
-        std::visit(__split::__notify_visitor(__op->__receiver()),
-                   __op->__shared_state_->__data_);
-    }
-
-    std::optional<__on_stop_cb> __on_stop_;
-    __shared_state_ptr __shared_state_;
+    __action_fn* __action_{};
+    __local_state_base* __next_{};
 };
 
 template <class _CvrefSender, class _Env>
-struct __sh_state;
+struct __shared_state;
 
-template <class _BaseEnv>
-using __env_t =            //
-    __make_env_t<_BaseEnv, // BUGBUG NOT TO SPEC
-                 __with<get_stop_token_t, in_place_stop_token>>;
+// Each operation state of a split sender has one of these,
+// created when a split sender is connected. There are 0 or
+// more of them per input async operation. It is what
+// the split sender's `get_state` fn returns. It holds a
+// reference to the shared state of the input async operation.
+template <class _CvrefSender, class _Receiver>
+struct __local_state :
+    __local_state_base,
+    __enable_receiver_from_this<_CvrefSender, _Receiver>
+{
+    using __data_t = __decay_t<__data_of<_CvrefSender>>;
+    using __shared_state_t = __mapply<__q<__mfront>, __data_t>;
+    using __on_stop_cb_t = //
+        typename stop_token_of_t<env_of_t<_Receiver>&>::template callback_type<
+            __on_stop_request>;
+    using __tag_t = tag_of_t<_CvrefSender>;
+    static_assert(__one_of<__tag_t, __split::__split_t,
+                           __ensure_started::__ensure_started_t>);
+
+    explicit __local_state(_CvrefSender&& __sndr) noexcept :
+        __local_state::__local_state_base{{},
+                                          &__action<tag_of_t<_CvrefSender>>},
+        __shared_state_(
+            STDEXEC_CALL_EXPLICIT_THIS_MEMFN((_CvrefSender&&)__sndr,
+                                             apply)(__detail::__get_data())
+                .__shared_state)
+    {}
+
+    ~__local_state()
+    {
+        __action_(this, __action_kind::__detach);
+    }
+
+    // This is called when the input async operation completes; or,
+    // if it has already completed when start is called, it is called
+    // from start:
+    template <class _Tag>
+    static void __action(__local_state_base* __self,
+                         __action_kind __kind) noexcept
+    {
+        __local_state* const __op = static_cast<__local_state*>(__self);
+        if (__kind == __action_kind::__notify)
+        {
+            __op->__on_stop_.reset();
+
+            // The split algorithm sends by T const&. ensure_started sends by
+            // T&&.
+            if constexpr (same_as<__split::__split_t, _Tag>)
+            {
+                std::visit(__notify_visitor(__op->__receiver()),
+                           std::as_const(__op->__shared_state_->__data_));
+            }
+            else
+            {
+                std::visit(__notify_visitor(__op->__receiver()),
+                           std::move(__op->__shared_state_->__data_));
+            }
+        }
+        else
+        {
+            // This is a detach operation
+            if constexpr (same_as<__split::__split_t, _Tag>)
+            {
+                // no-op
+            }
+            else
+            {
+                __op->__shared_state_->__detach();
+            }
+        }
+    }
+
+    std::optional<__on_stop_cb_t> __on_stop_{};
+    __intrusive_ptr<__shared_state_t> __shared_state_;
+};
 
 template <class _CvrefSenderId, class _EnvId>
 struct __receiver
@@ -3402,281 +3485,18 @@
 
     struct __t
     {
-        __sh_state<_CvrefSender, _Env>& __sh_state_;
-
         using receiver_concept = receiver_t;
         using __id = __receiver;
 
-        template <__completion_tag _Tag, class... _As>
-        friend void tag_invoke(_Tag __tag, __t&& __self, _As&&... __as) noexcept
-        {
-            __sh_state<_CvrefSender, _Env>& __state = __self.__sh_state_;
-
-            try
-            {
-                using __tuple_t = __decayed_tuple<_Tag, _As...>;
-                __state.__data_.template emplace<__tuple_t>(__tag,
-                                                            (_As&&)__as...);
-            }
-            catch (...)
-            {
-                using __tuple_t =
-                    __decayed_tuple<set_error_t, std::exception_ptr>;
-                __state.__data_.template emplace<__tuple_t>(
-                    set_error, std::current_exception());
-            }
-            __state.__notify();
-        }
-
-        friend const __env_t<_Env>& tag_invoke(get_env_t,
-                                               const __t& __self) noexcept
-        {
-            return __self.__sh_state_.__env_;
-        }
-    };
-};
-
-template <class _CvrefSender, class _Env = empty_env>
-struct __sh_state
-{
-    using __t = __sh_state;
-    using __id = __sh_state;
-
-    template <class... _Ts>
-    using __bind_tuples =                          //
-        __mbind_front_q<__variant,
-                        std::tuple<set_stopped_t>, // Initial state of the
-                                                   // variant is set_stopped
-                        std::tuple<set_error_t, std::exception_ptr>, _Ts...>;
-
-    using __bound_values_t = //
-        __value_types_of_t<_CvrefSender, __env_t<_Env>,
-                           __mbind_front_q<__decayed_tuple, set_value_t>,
-                           __q<__bind_tuples>>;
-
-    using __variant_t = //
-        __error_types_of_t<
-            _CvrefSender, __env_t<_Env>,
-            __transform<__mbind_front_q<__decayed_tuple, set_error_t>,
-                        __bound_values_t>>;
-
-    using __receiver_t =
-        stdexec::__t<__receiver<__cvref_id<_CvrefSender>, stdexec::__id<_Env>>>;
-
-    in_place_stop_source __stop_source_{};
-    __variant_t __data_;
-    std::atomic<void*> __head_{nullptr};
-    __env_t<_Env> __env_;
-    connect_result_t<_CvrefSender, __receiver_t> __op_state2_;
-
-    explicit __sh_state(_CvrefSender&& __sndr, _Env __env = {}) :
-        __env_(__make_env((_Env&&)__env, __mkprop(__stop_source_.get_token(),
-                                                  get_stop_token))),
-        __op_state2_(connect((_CvrefSender&&)__sndr, __receiver_t{*this}))
-    {}
-
-    void __notify() noexcept
-    {
-        void* const __completion_state = static_cast<void*>(this);
-        void* __old = __head_.exchange(__completion_state,
-                                       std::memory_order_acq_rel);
-        __split_state_base* __state = static_cast<__split_state_base*>(__old);
-
-        while (__state != nullptr)
-        {
-            __split_state_base* __next = __state->__next_;
-            __state->__notify_(__state);
-            __state = __next;
-        }
-    }
-};
-
-template <class... _Tys>
-using __set_value_t =
-    completion_signatures<set_value_t(const __decay_t<_Tys>&...)>;
-
-template <class _Ty>
-using __set_error_t = completion_signatures<set_error_t(const __decay_t<_Ty>&)>;
-
-template <class _CvrefSenderId, class _EnvId>
-using __completions_t = //
-    __try_make_completion_signatures<
-        // NOT TO SPEC:
-        // See https://github.com/brycelelbach/wg21_p2300_execution/issues/26
-        __cvref_t<_CvrefSenderId>, __env_t<__t<_EnvId>>,
-        completion_signatures<set_error_t(const std::exception_ptr&),
-                              set_stopped_t()>, // NOT TO SPEC
-        __q<__set_value_t>, __q<__set_error_t>>;
-
-struct __split_t
-{};
-
-struct split_t : __split_t
-{
-    template <sender _Sender>
-        requires sender_in<_Sender, empty_env> &&
-                 __decay_copyable<env_of_t<_Sender>>
-    auto operator()(_Sender&& __sndr) const
-    {
-        auto __domain = __get_early_domain(__sndr);
-        return stdexec::transform_sender(
-            __domain, __make_sexpr<split_t>(__(), (_Sender&&)__sndr));
-    }
-
-    template <sender _Sender, class _Env>
-        requires sender_in<_Sender, _Env> && __decay_copyable<env_of_t<_Sender>>
-    auto operator()(_Sender&& __sndr, _Env&& __env) const
-    {
-        auto __domain = __get_late_domain(__sndr, __env);
-        return stdexec::transform_sender(
-            __domain, __make_sexpr<split_t>(__(), (_Sender&&)__sndr),
-            (_Env&&)__env);
-    }
-
-    STDEXEC_ATTRIBUTE((always_inline)) //
-    __binder_back<split_t> operator()() const
-    {
-        return {{}, {}, {}};
-    }
-
-    using _Sender = __1;
-    using __legacy_customizations_t = //
-        __types<tag_invoke_t(split_t,
-                             get_completion_scheduler_t<set_value_t>(
-                                 get_env_t(const _Sender&)),
-                             _Sender),
-                tag_invoke_t(split_t, _Sender)>;
-
-    template <class _Sender, class... _Env>
-    static auto transform_sender(_Sender&& __sndr, _Env... __env)
-    {
-        return __sexpr_apply(
-            (_Sender&&)__sndr,
-            [&]<class _Child>(__ignore, __ignore, _Child&& __child) {
-            using __sh_state_t = __sh_state<_Child, _Env...>;
-            auto __sh_state = std::make_shared<__sh_state_t>(
-                (_Child&&)__child, std::move(__env)...);
-            return __make_sexpr<__split_t>(std::move(__sh_state));
-        });
-    }
-};
-
-struct __split_impl : __sexpr_defaults
-{
-    static constexpr auto get_state = //
-        []<class _Sender, class _Receiver>(
-            _Sender&& __sndr,
-            _Receiver&) noexcept -> __split_state<_Sender, _Receiver> {
-        static_assert(sender_expr_for<_Sender, __split_t>);
-        return __split_state<_Sender, _Receiver>{(_Sender&&)__sndr};
-    };
-
-    static constexpr auto start = //
-        []<class _Sender, class _Receiver>(
-            __split_state<_Sender, _Receiver>& __state,
-            _Receiver& __rcvr) noexcept -> void {
-        auto* __shared_state = __state.__shared_state_.get();
-        std::atomic<void*>& __head = __shared_state->__head_;
-        void* const __completion_state = static_cast<void*>(__shared_state);
-        void* __old = __head.load(std::memory_order_acquire);
-
-        if (__old != __completion_state)
-        {
-            __state.__on_stop_.emplace(
-                get_stop_token(stdexec::get_env(__rcvr)),
-                __on_stop_request{__shared_state->__stop_source_});
-        }
-
-        do
-        {
-            if (__old == __completion_state)
-            {
-                __state.__notify(&__state);
-                return;
-            }
-            __state.__next_ = static_cast<__split_state_base*>(__old);
-        } while (!__head.compare_exchange_weak(
-            __old, static_cast<void*>(&__state), std::memory_order_release,
-            std::memory_order_acquire));
-
-        if (__old == nullptr)
-        {
-            // the inner sender isn't running
-            if (__shared_state->__stop_source_.stop_requested())
-            {
-                // 1. resets __head to completion state
-                // 2. notifies waiting threads
-                // 3. propagates "stopped" signal to `out_r'`
-                __shared_state->__notify();
-            }
-            else
-            {
-                stdexec::start(__shared_state->__op_state2_);
-            }
-        }
-    };
-
-    static constexpr auto __get_completion_signatures_fn =         //
-        []<class _ShState>(auto, const std::shared_ptr<_ShState>&) //
-        -> __mapply<__q<__completions_t>, __id<_ShState>> { return {}; };
-
-    static constexpr auto get_completion_signatures = //
-        []<class _Self, class _OtherEnv>(_Self&&, _OtherEnv&&) noexcept
-        -> __call_result_t<__sexpr_apply_t, _Self,
-                           __mtypeof<__get_completion_signatures_fn>> {
-        static_assert(sender_expr_for<_Self, __split_t>);
-        return {};
-    };
-};
-} // namespace __split
-
-using __split::split_t;
-inline constexpr split_t split{};
-
-template <>
-struct __sexpr_impl<__split::__split_t> : __split::__split_impl
-{};
-
-template <>
-struct __sexpr_impl<split_t> : __split::__split_impl
-{};
-
-/////////////////////////////////////////////////////////////////////////////
-// [execution.senders.adaptors.ensure_started]
-namespace __ensure_started
-{
-template <class _BaseEnv>
-using __env_t =            //
-    __make_env_t<_BaseEnv, // NOT TO SPEC
-                 __with<get_stop_token_t, in_place_stop_token>>;
-
-template <class _CvrefSenderId, class _EnvId>
-struct __sh_state;
-
-template <class _CvrefSenderId, class _EnvId = __id<empty_env>>
-struct __receiver
-{
-    using _CvrefSender = stdexec::__cvref_t<_CvrefSenderId>;
-    using _Env = stdexec::__t<_EnvId>;
-
-    class __t
-    {
-        __intrusive_ptr<stdexec::__t<__sh_state<_CvrefSenderId, _EnvId>>>
-            __shared_state_;
-
-      public:
-        using receiver_concept = receiver_t;
-        using __id = __receiver;
-
-        explicit __t(stdexec::__t<__sh_state<_CvrefSenderId, _EnvId>>&
-                         __shared_state) noexcept :
-            __shared_state_(__shared_state.__intrusive_from_this())
+        explicit __t(
+            __shared_state<_CvrefSender, _Env>* __shared_state) noexcept :
+            __shared_state_(__shared_state)
         {}
 
         template <__completion_tag _Tag, class... _As>
         friend void tag_invoke(_Tag __tag, __t&& __self, _As&&... __as) noexcept
         {
-            stdexec::__t<__sh_state<_CvrefSenderId, _EnvId>>& __state =
+            __shared_state<_CvrefSender, _Env>& __state =
                 *__self.__shared_state_;
 
             try
@@ -3694,7 +3514,6 @@
             }
 
             __state.__notify();
-            __self.__shared_state_.reset();
         }
 
         friend const __env_t<_Env>& tag_invoke(get_env_t,
@@ -3702,310 +3521,315 @@
         {
             return __self.__shared_state_->__env_;
         }
+
+        __shared_state<_CvrefSender, _Env>* __shared_state_;
     };
 };
 
-struct __operation_base
+template <class _CvrefSender, class _Env>
+struct __shared_state :
+    __enable_intrusive_from_this<__shared_state<_CvrefSender, _Env>>
 {
-    using __notify_fn = void(__operation_base*) noexcept;
-    __notify_fn* __notify_{};
-};
+    using __variant_t = __compl_sigs::__for_all_sigs<
+        __completion_signatures_of_t<_CvrefSender, _Env>, __q<__decayed_tuple>,
+        __mbind_front_q<__variant,
+                        std::tuple<set_stopped_t>, // Initial state of the
+                                                   // variant is set_stopped
+                        std::tuple<set_error_t, std::exception_ptr>>>;
 
-template <class _CvrefSenderId, class _EnvId = __id<empty_env>>
-struct __sh_state
-{
-    using _CvrefSender = stdexec::__cvref_t<_CvrefSenderId>;
-    using _Env = stdexec::__t<_EnvId>;
+    using __receiver_t = __t<__receiver<__cvref_id<_CvrefSender>, __id<_Env>>>;
 
-    struct __t : __enable_intrusive_from_this<__t>
-    {
-        using __id = __sh_state;
+    in_place_stop_source __stop_source_{};
+    __variant_t __data_;
+    std::atomic<void*> __head_{nullptr};
+    __env_t<_Env> __env_;
+    connect_result_t<_CvrefSender, __receiver_t> __op_state2_;
 
-        template <class... _Ts>
-        using __bind_tuples =                          //
-            __mbind_front_q<__variant,
-                            std::tuple<set_stopped_t>, // Initial state of the
-                                                       // variant is set_stopped
-                            std::tuple<set_error_t, std::exception_ptr>,
-                            _Ts...>;
-
-        using __bound_values_t = //
-            __value_types_of_t<_CvrefSender, __env_t<_Env>,
-                               __mbind_front_q<__decayed_tuple, set_value_t>,
-                               __q<__bind_tuples>>;
-
-        using __variant_t = //
-            __error_types_of_t<
-                _CvrefSender, __env_t<_Env>,
-                __transform<__mbind_front_q<__decayed_tuple, set_error_t>,
-                            __bound_values_t>>;
-
-        using __receiver_t = stdexec::__t<__receiver<_CvrefSenderId, _EnvId>>;
-
-        __variant_t __data_;
-        in_place_stop_source __stop_source_{};
-
-        std::atomic<void*> __op_state1_{nullptr};
-        __env_t<_Env> __env_;
-        connect_result_t<_CvrefSender, __receiver_t> __op_state2_;
-
-        explicit __t(_CvrefSender&& __sndr, _Env __env = {}) :
-            __env_(
-                __make_env((_Env&&)__env, __mkprop(__stop_source_.get_token(),
-                                                   get_stop_token))),
-            __op_state2_(connect((_CvrefSender&&)__sndr, __receiver_t{*this}))
-        {
-            start(__op_state2_);
-        }
-
-        void __notify() noexcept
-        {
-            void* const __completion_state = static_cast<void*>(this);
-            void* const __old = __op_state1_.exchange(
-                __completion_state, std::memory_order_acq_rel);
-            if (__old != nullptr)
-            {
-                auto* __op = static_cast<__operation_base*>(__old);
-                __op->__notify_(__op);
-            }
-        }
-
-        void __detach() noexcept
-        {
-            __stop_source_.request_stop();
-        }
-    };
-};
-
-template <class _CvrefSenderId, class _EnvId, class _ReceiverId>
-struct __operation
-{
-    using _CvrefSender = stdexec::__cvref_t<_CvrefSenderId>;
-    using _Env = stdexec::__t<_EnvId>;
-    using _Receiver = stdexec::__t<_ReceiverId>;
-
-    class __t : public __operation_base
-    {
-        struct __on_stop_request
-        {
-            in_place_stop_source& __stop_source_;
-
-            void operator()() noexcept
-            {
-                __stop_source_.request_stop();
-            }
-        };
-
-        using __on_stop = //
-            std::optional<typename stop_token_of_t<env_of_t<_Receiver>&>::
-                              template callback_type<__on_stop_request>>;
-
-        _Receiver __rcvr_;
-        __on_stop __on_stop_{};
-        __intrusive_ptr<stdexec::__t<__sh_state<_CvrefSenderId, _EnvId>>>
-            __shared_state_;
-
-      public:
-        using __id = __operation;
-
-        __t(                    //
-            _Receiver __rcvr,   //
-            __intrusive_ptr<stdexec::__t<__sh_state<_CvrefSenderId, _EnvId>>>
-                __shared_state) //
-            noexcept(std::is_nothrow_move_constructible_v<_Receiver>) :
-            __operation_base{__notify},
-            __rcvr_((_Receiver&&)__rcvr),
-            __shared_state_(std::move(__shared_state))
-        {}
-
-        ~__t()
-        {
-            // Check to see if this operation was ever started. If not,
-            // detach the (potentially still running) operation:
-            if (nullptr ==
-                __shared_state_->__op_state1_.load(std::memory_order_acquire))
-            {
-                __shared_state_->__detach();
-            }
-        }
-
-        STDEXEC_IMMOVABLE(__t);
-
-        static void __notify(__operation_base* __self) noexcept
-        {
-            __t* __op = static_cast<__t*>(__self);
-            __op->__on_stop_.reset();
-
-            std::visit(
-                [&](auto& __tupl) noexcept -> void {
-                __apply(
-                    [&](auto __tag, auto&... __args) noexcept -> void {
-                    __tag((_Receiver&&)__op->__rcvr_, std::move(__args)...);
-                },
-                    __tupl);
-            },
-                __op->__shared_state_->__data_);
-        }
-
-        friend void tag_invoke(start_t, __t& __self) noexcept
-        {
-            stdexec::__t<__sh_state<_CvrefSenderId, _EnvId>>* __shared_state =
-                __self.__shared_state_.get();
-            std::atomic<void*>& __op_state1 = __shared_state->__op_state1_;
-            void* const __completion_state = static_cast<void*>(__shared_state);
-            void* const __old = __op_state1.load(std::memory_order_acquire);
-            if (__old == __completion_state)
-            {
-                __self.__notify(&__self);
-            }
-            else
-            {
-                // register stop callback:
-                __self.__on_stop_.emplace(
-                    get_stop_token(get_env(__self.__rcvr_)),
-                    __on_stop_request{__shared_state->__stop_source_});
-                // Check if the stop_source has requested cancellation
-                if (__shared_state->__stop_source_.stop_requested())
-                {
-                    // Stop has already been requested. Don't bother starting
-                    // the child operations.
-                    stdexec::set_stopped((_Receiver&&)__self.__rcvr_);
-                }
-                else
-                {
-                    // Otherwise, the inner source hasn't notified completion.
-                    // Set this operation as the __op_state1 so it's notified.
-                    void* __old = nullptr;
-                    if (!__op_state1.compare_exchange_weak(
-                            __old, &__self, std::memory_order_release,
-                            std::memory_order_acquire))
-                    {
-                        // We get here when the task completed during the
-                        // execution of this function. Complete the operation
-                        // synchronously.
-                        STDEXEC_ASSERT(__old == __completion_state);
-                        __self.__notify(&__self);
-                    }
-                }
-            }
-        }
-    };
-};
-
-template <class _ShState>
-struct __data
-{
-    __data(__intrusive_ptr<_ShState> __sh_state) noexcept :
-        __sh_state_(std::move(__sh_state))
+    explicit __shared_state(_CvrefSender&& __sndr, _Env __env) :
+        __env_(__make_env((_Env&&)__env, __mkprop(__stop_source_.get_token(),
+                                                  get_stop_token))),
+        __op_state2_(connect((_CvrefSender&&)__sndr, __receiver_t{this}))
     {}
 
-    __data(__data&&) = default;
-    __data& operator=(__data&&) = default;
-
-    ~__data()
+    void __start_op() noexcept
     {
-        if (__sh_state_ != nullptr)
+        // the inner sender isn't running. if we reach here, then
+        // one way or the other, __shared_state::__notify() will be
+        // called, which decrements the ref count of *this.
+        // So we need to increment it here:
+        this->__inc_ref();
+
+        if (__stop_source_.stop_requested())
         {
-            // detach from the still-running operation.
-            // NOT TO SPEC: This also requests cancellation.
-            __sh_state_->__detach();
+            // 1. resets __head to completion state
+            // 2. notifies waiting threads
+            // 3. propagates "stopped" signal to `out_r'`
+            __notify();
+        }
+        else
+        {
+            stdexec::start(__op_state2_);
         }
     }
 
-    __intrusive_ptr<_ShState> __sh_state_;
+    // This is called when the shared async operation completes:
+    void __notify() noexcept
+    {
+        void* const __completion_state = static_cast<void*>(this);
+        void* const __old = __head_.exchange(__completion_state,
+                                             std::memory_order_acq_rel);
+        __local_state_base* __state = static_cast<__local_state_base*>(__old);
+
+        while (__state != nullptr)
+        {
+            __local_state_base* __next = __state->__next_;
+            __state->__action_(__state, __action_kind::__notify);
+            __state = __next;
+        }
+
+        // The async operation has completed, so we can release our
+        // ref-count on it:
+        this->__dec_ref();
+    }
+
+    void __detach() noexcept
+    {
+        // Check to see if this operation was ever started. If not,
+        // detach the (potentially still running) operation:
+        if (nullptr == __head_.load(std::memory_order_acquire))
+        {
+            __stop_source_.request_stop();
+        }
+    }
 };
 
-template <class... _Tys>
-using __set_value_t = completion_signatures<set_value_t(__decay_t<_Tys>&&...)>;
-
-template <class _Ty>
-using __set_error_t = completion_signatures<set_error_t(__decay_t<_Ty>&&)>;
-
-template <class _CvrefSenderId, class _EnvId>
+template <class _Cvref, class _CvrefSenderId, class _EnvId>
 using __completions_t = //
     __try_make_completion_signatures<
         // NOT TO SPEC:
-        // See https://github.com/brycelelbach/wg21_p2300_execution/issues/26
+        // See https://github.com/cplusplus/sender-receiver/issues/23
         __cvref_t<_CvrefSenderId>, __env_t<__t<_EnvId>>,
-        completion_signatures<set_error_t(std::exception_ptr&&),
+        completion_signatures<set_error_t(
+                                  __minvoke<_Cvref, std::exception_ptr>),
                               set_stopped_t()>, // NOT TO SPEC
-        __q<__set_value_t>, __q<__set_error_t>>;
+        __transform<_Cvref,
+                    __mcompose<__q<completion_signatures>, __qf<set_value_t>>>,
+        __transform<_Cvref,
+                    __mcompose<__q<completion_signatures>, __qf<set_error_t>>>>;
 
-template <class _Receiver>
-auto __connect_fn_(_Receiver& __rcvr) noexcept
-{
-    return [&]<class _ShState>(auto, __data<_ShState> __dat) //
-           noexcept(__nothrow_decay_copyable<_Receiver>)
-               -> __t<__mapply<__mbind_back_q<__operation, __id<_Receiver>>,
-                               __id<_ShState>>> {
-        return {(_Receiver&&)__rcvr, std::move(__dat.__sh_state_)};
-    };
-}
+template <class _Ty>
+using __clref_t = const __decay_t<_Ty>&;
 
-template <class _Receiver>
-auto __connect_fn(_Receiver& __rcvr) noexcept -> decltype(__connect_fn_(__rcvr))
-{
-    return __connect_fn_(__rcvr);
-}
+template <class _Ty>
+using __rref_t = __decay_t<_Ty>&&;
 
+template <class _Tag>
+using __cvref_results_t = //
+    __if_c<same_as<_Tag, __split::__split_t>, __q<__clref_t>, __q<__rref_t>>;
+
+template <class _Tag>
 inline auto __get_completion_signatures_fn() noexcept
-{
-    return []<class _ShState>(auto, const __data<_ShState>&) //
-           -> __mapply<__q<__completions_t>, __id<_ShState>> { return {}; };
+{ //
+    return
+        []<template <class> class _Data, class _ShState>(
+            auto, const _Data<_ShState>&) //
+        -> __mapply<__mbind_front_q<__completions_t, __cvref_results_t<_Tag>>,
+                    _ShState> { return {}; };
 }
 
-struct __ensure_started_t
-{};
-
-struct __ensure_started_impl : __sexpr_defaults
+template <class _Tag>
+struct __shared_impl : __sexpr_defaults
 {
-    static constexpr auto connect = //
-        []<class _Self, class _Receiver>(
-            _Self&& __self,
-            _Receiver
-                __rcvr) noexcept(__nothrow_callable<__sexpr_apply_t, _Self,
-                                                    decltype(__connect_fn(
-                                                        __declval<
-                                                            _Receiver&>()))>)
-        -> __call_result_t<__sexpr_apply_t, _Self,
-                           decltype(__connect_fn(__declval<_Receiver&>()))> {
-        static_assert(sender_expr_for<_Self, __ensure_started_t>);
-        return __sexpr_apply((_Self&&)__self, __connect_fn(__rcvr));
+    static constexpr auto get_state = //
+        []<class _Sender, class _Receiver>(
+            _Sender&& __sndr,
+            _Receiver&) noexcept -> __local_state<_Sender, _Receiver> {
+        static_assert(sender_expr_for<_Sender, _Tag>);
+        return __local_state<_Sender, _Receiver>{(_Sender&&)__sndr};
     };
 
     static constexpr auto get_completion_signatures = //
         []<class _Self, class _OtherEnv>(_Self&&, _OtherEnv&&) noexcept
         -> __call_result_t<__sexpr_apply_t, _Self,
-                           __result_of<__get_completion_signatures_fn>> {
-        static_assert(sender_expr_for<_Self, __ensure_started_t>);
+                           __result_of<__get_completion_signatures_fn<_Tag>>> {
+        static_assert(sender_expr_for<_Self, _Tag>);
         return {};
     };
+
+    static constexpr auto start = //
+        []<class _Sender, class _Receiver>(
+            __local_state<_Sender, _Receiver>& __state,
+            _Receiver& __rcvr) noexcept -> void {
+        auto* __shared_state = __state.__shared_state_.get();
+        std::atomic<void*>& __head = __shared_state->__head_;
+        void* const __completion_state = static_cast<void*>(__shared_state);
+        void* __old = __head.load(std::memory_order_acquire);
+
+        if (__old != __completion_state)
+        {
+            __state.__on_stop_.emplace(
+                get_stop_token(stdexec::get_env(__rcvr)),
+                __on_stop_request{__shared_state->__stop_source_});
+
+            if constexpr (same_as<_Tag, __ensure_started::__ensure_started_t>)
+            {
+                // Check if the stop_source has requested cancellation
+                if (__shared_state->__stop_source_.stop_requested())
+                {
+                    // Stop has already been requested. Don't bother starting
+                    // the child operations.
+                    stdexec::set_stopped((_Receiver&&)__rcvr);
+                    return;
+                }
+            }
+        }
+
+        // With the split algorithm, multiple split senders can be started
+        // simultaneously, but only one should start the async operation. The
+        // following loop atomically (re)tries to set the pointer to the head of
+        // the list to __state. When it finally succeeds, the prior value is
+        // checked. If it is nullptr, then this split sender has won the race
+        // and has the honor of starting the async operation.
+        do
+        {
+            if (__old == __completion_state)
+            {
+                __state.template __action<_Tag>(&__state,
+                                                __action_kind::__notify);
+                return;
+            }
+            __state.__next_ = static_cast<__local_state_base*>(__old);
+        } while (!__head.compare_exchange_weak(
+            __old, static_cast<void*>(&__state), std::memory_order_release,
+            std::memory_order_acquire));
+
+        if constexpr (same_as<_Tag, __split::__split_t>)
+        {
+            if (__old == nullptr)
+            {
+                __shared_state->__start_op();
+            }
+        }
+    };
 };
+} // namespace __shared
+
+////////////////////////////////////////////////////////////////////////////
+// [execution.senders.adaptors.split]
+namespace __split
+{
+using namespace __shared;
+
+template <class _ShState>
+struct __data
+{
+    explicit __data(__intrusive_ptr<_ShState> __shared_state) noexcept :
+        __shared_state(std::move(__shared_state))
+    {}
+
+    __intrusive_ptr<_ShState> __shared_state;
+};
+
+struct __split_t
+{};
+
+struct split_t
+{
+    template <sender _Sender, class _Env = empty_env>
+        requires sender_in<_Sender, _Env> && __decay_copyable<env_of_t<_Sender>>
+    auto operator()(_Sender&& __sndr, _Env&& __env = {}) const
+    {
+        auto __domain = __get_late_domain(__sndr, __env);
+        return stdexec::transform_sender(
+            __domain, __make_sexpr<split_t>((_Env&&)__env, (_Sender&&)__sndr));
+    }
+
+    STDEXEC_ATTRIBUTE((always_inline)) //
+    __binder_back<split_t> operator()() const
+    {
+        return {{}, {}, {}};
+    }
+
+    using _Sender = __1;
+    using __legacy_customizations_t = //
+        __types<tag_invoke_t(split_t,
+                             get_completion_scheduler_t<set_value_t>(
+                                 get_env_t(const _Sender&)),
+                             _Sender),
+                tag_invoke_t(split_t, _Sender)>;
+
+    template <class _CvrefSender, class _Env>
+    using __receiver_t =
+        __t<__meval<__receiver, __cvref_id<_CvrefSender>, __id<_Env>>>;
+
+    template <class _Sender>
+    static auto transform_sender(_Sender&& __sndr)
+    {
+        using _Receiver =
+            __receiver_t<__child_of<_Sender>, __decay_t<__data_of<_Sender>>>;
+        static_assert(sender_to<__child_of<_Sender>, _Receiver>);
+        return __sexpr_apply((_Sender&&)__sndr,
+                             [&]<class _Env, class _Child>(
+                                 __ignore, _Env&& __env, _Child&& __child) {
+            auto __state =
+                __make_intrusive<__shared_state<_Child, __decay_t<_Env>>>(
+                    (_Child&&)__child, (_Env&&)__env);
+            return __make_sexpr<__split_t>(__data{std::move(__state)});
+        });
+    }
+};
+} // namespace __split
+
+using __split::split_t;
+inline constexpr split_t split{};
+
+template <>
+struct __sexpr_impl<__split::__split_t> :
+    __shared::__shared_impl<__split::__split_t>
+{};
+
+/////////////////////////////////////////////////////////////////////////////
+// [execution.senders.adaptors.ensure_started]
+namespace __ensure_started
+{
+using namespace __shared;
+
+// Each ensure_started sender has one of these, created when
+// ensure_started() is called.
+template <class _ShState>
+struct __data
+{
+    explicit __data(__intrusive_ptr<_ShState> __ptr) noexcept :
+        __shared_state(std::move(__ptr))
+    {
+        // Eagerly launch the async operation.
+        __shared_state->__start_op();
+    }
+
+    __data(__data&&) noexcept = default;
+    __data& operator=(__data&&) noexcept = default;
+
+    ~__data()
+    {
+        if (__shared_state != nullptr)
+        {
+            // detach from the still-running operation.
+            // NOT TO SPEC: This also requests cancellation.
+            __shared_state->__detach();
+        }
+    }
+
+    __intrusive_ptr<_ShState> __shared_state;
+};
+
+struct __ensure_started_t
+{};
 
 struct ensure_started_t
 {
-    template <sender _Sender>
-        requires sender_in<_Sender, empty_env> &&
-                 __decay_copyable<env_of_t<_Sender>>
-    auto operator()(_Sender&& __sndr) const
-    {
-        if constexpr (sender_expr_for<_Sender, __ensure_started_t>)
-        {
-            return (_Sender&&)__sndr;
-        }
-        else
-        {
-            auto __domain = __get_early_domain(__sndr);
-            return stdexec::transform_sender(
-                __domain,
-                __make_sexpr<ensure_started_t>(__(), (_Sender&&)__sndr));
-        }
-        STDEXEC_UNREACHABLE();
-    }
-
-    template <sender _Sender, class _Env>
+    template <sender _Sender, class _Env = empty_env>
         requires sender_in<_Sender, _Env> && __decay_copyable<env_of_t<_Sender>>
-    auto operator()(_Sender&& __sndr, _Env&& __env) const
+    auto operator()(_Sender&& __sndr, _Env&& __env = {}) const
     {
         if constexpr (sender_expr_for<_Sender, __ensure_started_t>)
         {
@@ -4015,9 +3839,8 @@
         {
             auto __domain = __get_late_domain(__sndr, __env);
             return stdexec::transform_sender(
-                __domain,
-                __make_sexpr<ensure_started_t>(__(), (_Sender&&)__sndr),
-                (_Env&&)__env);
+                __domain, __make_sexpr<ensure_started_t>((_Env&&)__env,
+                                                         (_Sender&&)__sndr));
         }
         STDEXEC_UNREACHABLE();
     }
@@ -4036,24 +3859,23 @@
                              _Sender),
                 tag_invoke_t(ensure_started_t, _Sender)>;
 
-    template <class _CvrefSender, class... _Env>
-    using __receiver_t = stdexec::__t<
-        __meval<__receiver, __cvref_id<_CvrefSender>, __id<_Env>...>>;
+    template <class _CvrefSender, class _Env>
+    using __receiver_t =
+        __t<__meval<__receiver, __cvref_id<_CvrefSender>, __id<_Env>>>;
 
-    template <class _Sender, class... _Env>
-        requires sender_to<__child_of<_Sender>,
-                           __receiver_t<__child_of<_Sender>, _Env...>>
-    static auto transform_sender(_Sender&& __sndr, _Env... __env)
+    template <class _Sender>
+    static auto transform_sender(_Sender&& __sndr)
     {
-        return __sexpr_apply(
-            (_Sender&&)__sndr,
-            [&]<class _Child>(__ignore, __ignore, _Child&& __child) {
-            using __sh_state_t =
-                __t<__sh_state<__cvref_id<_Child>, __id<_Env>...>>;
-            auto __sh_state = __make_intrusive<__sh_state_t>(
-                (_Child&&)__child, std::move(__env)...);
-            return __make_sexpr<__ensure_started_t>(
-                __data{std::move(__sh_state)});
+        using _Receiver =
+            __receiver_t<__child_of<_Sender>, __decay_t<__data_of<_Sender>>>;
+        static_assert(sender_to<__child_of<_Sender>, _Receiver>);
+        return __sexpr_apply((_Sender&&)__sndr,
+                             [&]<class _Env, class _Child>(
+                                 __ignore, _Env&& __env, _Child&& __child) {
+            auto __state =
+                __make_intrusive<__shared_state<_Child, __decay_t<_Env>>>(
+                    (_Child&&)__child, (_Env&&)__env);
+            return __make_sexpr<__ensure_started_t>(__data{std::move(__state)});
         });
     }
 };
@@ -4064,7 +3886,7 @@
 
 template <>
 struct __sexpr_impl<__ensure_started::__ensure_started_t> :
-    __ensure_started::__ensure_started_impl
+    __shared::__shared_impl<__ensure_started::__ensure_started_t>
 {};
 
 STDEXEC_PRAGMA_PUSH()
@@ -4110,44 +3932,23 @@
 // [exec.let]
 namespace __let
 {
-template <class _SetTag, class _Domain = dependent_domain>
+template <class _Set, class _Domain = dependent_domain>
 struct __let_t;
 
 template <class _Set>
-struct __on_not_callable_
-{
-    using __t =
-        __callable_error<"In stdexec::let_value(Sender, Function)..."__csz>;
-};
+inline constexpr __mstring __in_which_let_msg{
+    "In stdexec::let_value(Sender, Function)..."};
 
 template <>
-struct __on_not_callable_<set_error_t>
-{
-    using __t =
-        __callable_error<"In stdexec::let_error(Sender, Function)..."__csz>;
-};
+inline constexpr __mstring __in_which_let_msg<set_error_t>{
+    "In stdexec::let_error(Sender, Function)..."};
 
 template <>
-struct __on_not_callable_<set_stopped_t>
-{
-    using __t =
-        __callable_error<"In stdexec::let_stopped(Sender, Function)..."__csz>;
-};
+inline constexpr __mstring __in_which_let_msg<set_stopped_t>{
+    "In stdexec::let_stopped(Sender, Function)..."};
 
 template <class _Set>
-using __on_not_callable = __t<__on_not_callable_<_Set>>;
-
-template <class _Tp>
-using __decay_ref = __decay_t<_Tp>&;
-
-// A metafunction that computes the result sender type for a given set of
-// argument types
-template <class _Fun, class _Set>
-using __result_sender_t = //
-    __transform<
-        __q<__decay_ref>,
-        __mbind_front<__mtry_catch_q<__call_result_t, __on_not_callable<_Set>>,
-                      _Fun>>;
+using __on_not_callable = __callable_error<__in_which_let_msg<_Set>>;
 
 // FUTURE: when we have a scheduler query for "always completes inline",
 // then we can use that instead of hard-coding `__inln::__scheduler` here.
@@ -4189,6 +3990,50 @@
            __env::__env_join_t<__env::__prop<_Scheduler(get_scheduler_t)>,
                                __env::__prop<void(get_domain_t)>, _Env>>;
 
+template <class _Tp>
+using __decay_ref = __decay_t<_Tp>&;
+
+template <__mstring _Where, __mstring _What>
+struct _FUNCTION_MUST_RETURN_A_VALID_SENDER_IN_THE_CURRENT_ENVIRONMENT_
+{};
+
+#if STDEXEC_NVHPC()
+template <class _Sender, class _Env, class _Set>
+struct __bad_result_sender_
+{
+    using __t = __mexception<
+        _FUNCTION_MUST_RETURN_A_VALID_SENDER_IN_THE_CURRENT_ENVIRONMENT_<
+            __in_which_let_msg<_Set>,
+            "The function must return a valid sender for the current environment"__csz>,
+        _WITH_SENDER_<_Sender>, _WITH_ENVIRONMENT_<_Env>>;
+};
+template <class _Sender, class _Env, class _Set>
+using __bad_result_sender = __t<__bad_result_sender_<_Sender, _Env, _Set>>;
+#else
+template <class _Sender, class _Env, class _Set>
+using __bad_result_sender = __mexception<
+    _FUNCTION_MUST_RETURN_A_VALID_SENDER_IN_THE_CURRENT_ENVIRONMENT_<
+        __in_which_let_msg<_Set>,
+        "The function must return a valid sender for the current environment"__csz>,
+    _WITH_SENDER_<_Sender>, _WITH_ENVIRONMENT_<_Env>>;
+#endif
+
+template <class _Sender, class _Env, class _Set>
+using __ensure_sender = //
+    __minvoke_if_c<sender_in<_Sender, _Env>, __q<__midentity>,
+                   __mbind_back_q<__bad_result_sender, _Set, _Env>, _Sender>;
+
+// A metafunction that computes the result sender type for a given set of
+// argument types
+template <class _Fun, class _Set, class _Env, class _Sched>
+using __result_sender_fn = //
+    __mcompose<
+        __mbind_back_q<__ensure_sender, __result_env_t<_Env, _Sched>, _Set>,
+        __transform<__q<__decay_ref>,
+                    __mbind_front<__mtry_catch_q<__call_result_t,
+                                                 __on_not_callable<_Set>>,
+                                  _Fun>>>;
+
 // The receiver that gets connected to the result sender is the input receiver,
 // possibly augmented with the input sender's completion scheduler (which is
 // where the result sender will be started).
@@ -4201,7 +4046,7 @@
 using __op_state_for = //
     __mcompose<__mbind_back_q<connect_result_t,
                               __result_receiver_t<_Receiver, _Sched>>,
-               __result_sender_t<_Fun, _Set>>;
+               __result_sender_fn<_Fun, _Set, env_of_t<_Receiver>, _Sched>>;
 
 template <class _Set, class _Sig>
 struct __tfx_signal_fn
@@ -4216,7 +4061,7 @@
     template <class _Env, class _Fun, class _Sched>
     using __f = //
         __try_make_completion_signatures<
-            __minvoke<__result_sender_t<_Fun, _Set>, _Args...>,
+            __minvoke<__result_sender_fn<_Fun, _Set, _Env, _Sched>, _Args...>,
             __result_env_t<_Env, _Sched>,
             // because we don't know if connect-ing the result sender will
             // throw:
@@ -4241,33 +4086,56 @@
                          __q<__concat_completion_signatures_t>>,
              __completion_signatures_of_t<_CvrefSender, _Env>>;
 
+template <__mstring _Where, __mstring _What>
+struct _NO_COMMON_DOMAIN_
+{};
+
+template <class _Set>
+using __no_common_domain_t = //
+    _NO_COMMON_DOMAIN_<
+        __in_which_let_msg<_Set>,
+        "The senders returned by Function do not all share a common domain"__csz>;
+
+template <class _Set>
+using __try_common_domain_fn = //
+    __mtry_catch_q<
+        __domain::__common_domain_t,
+        __mcompose<__mbind_front_q<__mexception, __no_common_domain_t<_Set>>,
+                   __q<_WITH_SENDERS_>>>;
+
 // Compute all the domains of all the result senders and make sure they're all
 // the same
-template <class _SetTag, class _Child, class _Fun, class _Env>
-using __result_domain_t = __gather_completions_for<
-    _SetTag, _Child, _Env,
-    __mtry_catch<
-        __transform<__q<__decay_ref>, __mbind_front_q<__call_result_t, _Fun>>,
-        __on_not_callable<_SetTag>>,
-    __q<__domain::__common_domain_t>>;
+template <class _Set, class _Child, class _Fun, class _Env, class _Sched>
+using __result_domain_t = //
+    __gather_completions_for<_Set, _Child, _Env,
+                             __result_sender_fn<_Fun, _Set, _Env, _Sched>,
+                             __try_common_domain_fn<_Set>>;
 
 template <class _LetTag, class _Env>
 auto __mk_transform_env_fn(const _Env& __env) noexcept
 {
-    using _SetTag = __t<_LetTag>;
-    return [&]<class _Fun, sender_in<_Env> _Child>(
-               __ignore, _Fun&&, _Child&& __child) -> decltype(auto) {
-        using _Scheduler = __completion_sched<_Child, _SetTag>;
-        if constexpr (__unknown_context<_Scheduler>)
+    using _Set = __t<_LetTag>;
+    return [&]<class _Fun, class _Child>(__ignore, _Fun&&,
+                                         _Child&& __child) -> decltype(auto) {
+        using __completions_t = __completion_signatures_of_t<_Child, _Env>;
+        if constexpr (__merror<__completions_t>)
         {
-            return (__env);
+            return __completions_t();
         }
         else
         {
-            return __join_env(__mkprop(get_completion_scheduler<_SetTag>(
-                                           stdexec::get_env(__child)),
-                                       get_scheduler),
-                              __mkprop(get_domain), __env);
+            using _Scheduler = __completion_sched<_Child, _Set>;
+            if constexpr (__unknown_context<_Scheduler>)
+            {
+                return (__env);
+            }
+            else
+            {
+                return __join_env(__mkprop(get_completion_scheduler<_Set>(
+                                               stdexec::get_env(__child)),
+                                           get_scheduler),
+                                  __mkprop(get_domain), __env);
+            }
         }
         STDEXEC_UNREACHABLE();
     };
@@ -4276,13 +4144,31 @@
 template <class _LetTag, class _Env>
 auto __mk_transform_sender_fn(const _Env&) noexcept
 {
-    using _SetTag = __t<_LetTag>;
-    return []<class _Fun, sender_in<_Env> _Child>(__ignore, _Fun&& __fun,
-                                                  _Child&& __child) {
-        using _Domain = __result_domain_t<_SetTag, _Child, _Fun, _Env>;
-        static_assert(__none_of<_Domain, __none_such, dependent_domain>);
-        return __make_sexpr<__let_t<_SetTag, _Domain>>((_Fun&&)__fun,
-                                                       (_Child&&)__child);
+    using _Set = __t<_LetTag>;
+    return
+        []<class _Fun, class _Child>(__ignore, _Fun&& __fun, _Child&& __child) {
+        using __completions_t = __completion_signatures_of_t<_Child, _Env>;
+        if constexpr (__merror<__completions_t>)
+        {
+            return __completions_t();
+        }
+        else
+        {
+            using _Sched = __completion_sched<_Child, _Set>;
+            using _Domain = __result_domain_t<_Set, _Child, _Fun, _Env, _Sched>;
+            if constexpr (__merror<_Domain>)
+            {
+                return _Domain();
+            }
+            else
+            {
+                static_assert(
+                    __none_of<_Domain, __none_such, dependent_domain>);
+                return __make_sexpr<__let_t<_Set, _Domain>>((_Fun&&)__fun,
+                                                            (_Child&&)__child);
+            }
+        }
+        STDEXEC_UNREACHABLE();
     };
 }
 
@@ -4319,11 +4205,11 @@
     __op_state_variant __op_state3_;
 };
 
-template <class _SetTag, class _Domain>
+template <class _Set, class _Domain>
 struct __let_t
 {
     using __domain_t = _Domain;
-    using __t = _SetTag;
+    using __t = _Set;
 
     template <sender _Sender, __movable_value _Fun>
     auto operator()(_Sender&& __sndr, _Fun __fun) const
@@ -4331,7 +4217,7 @@
         auto __domain = __get_early_domain(__sndr);
         return stdexec::transform_sender(
             __domain,
-            __make_sexpr<__let_t<_SetTag>>((_Fun&&)__fun, (_Sender&&)__sndr));
+            __make_sexpr<__let_t<_Set>>((_Fun&&)__fun, (_Sender&&)__sndr));
     }
 
     template <class _Fun>
@@ -4350,23 +4236,23 @@
                              _Sender, _Function),
                 tag_invoke_t(__let_t, _Sender, _Function)>;
 
-    template <sender_expr_for<__let_t<_SetTag>> _Sender, class _Env>
+    template <sender_expr_for<__let_t<_Set>> _Sender, class _Env>
     static decltype(auto) transform_env(_Sender&& __sndr, const _Env& __env)
     {
         return __sexpr_apply((_Sender&&)__sndr,
-                             __mk_transform_env_fn<__let_t<_SetTag>>(__env));
+                             __mk_transform_env_fn<__let_t<_Set>>(__env));
     }
 
-    template <sender_expr_for<__let_t<_SetTag>> _Sender, class _Env>
+    template <sender_expr_for<__let_t<_Set>> _Sender, class _Env>
         requires same_as<__early_domain_of_t<_Sender>, dependent_domain>
     static decltype(auto) transform_sender(_Sender&& __sndr, const _Env& __env)
     {
         return __sexpr_apply((_Sender&&)__sndr,
-                             __mk_transform_sender_fn<__let_t<_SetTag>>(__env));
+                             __mk_transform_sender_fn<__let_t<_Set>>(__env));
     }
 };
 
-template <class _SetTag, class _Domain>
+template <class _Set, class _Domain>
 struct __let_impl : __sexpr_defaults
 {
     static constexpr auto get_attrs = //
@@ -4377,27 +4263,26 @@
 
     static constexpr auto get_completion_signatures = //
         []<class _Self, class _Env>(_Self&&, _Env&&) noexcept
-        -> __completions<__child_of<_Self>, _Env, __let_t<_SetTag, _Domain>,
+        -> __completions<__child_of<_Self>, _Env, __let_t<_Set, _Domain>,
                          __data_of<_Self>> {
-        static_assert(sender_expr_for<_Self, __let_t<_SetTag, _Domain>>);
+        static_assert(sender_expr_for<_Self, __let_t<_Set, _Domain>>);
         return {};
     };
 
     static constexpr auto get_state = //
-        []<class _Sender, class _Receiver>(_Sender&& __sndr,
-                                           _Receiver& __rcvr) {
-        static_assert(sender_expr_for<_Sender, __let_t<_SetTag, _Domain>>);
+        []<class _Sender, class _Receiver>(_Sender&& __sndr, _Receiver&) {
+        static_assert(sender_expr_for<_Sender, __let_t<_Set, _Domain>>);
         using _Fun = __data_of<_Sender>;
-        using _Sched = __completion_sched<_Sender, _SetTag>;
+        using _Sched = __completion_sched<_Sender, _Set>;
         using __mk_let_state =
-            __mbind_front_q<__let_state, _Receiver, _Fun, _SetTag, _Sched>;
+            __mbind_front_q<__let_state, _Receiver, _Fun, _Set, _Sched>;
 
         using __let_state_t =
-            __gather_completions_for<_SetTag, __child_of<_Sender>,
+            __gather_completions_for<_Set, __child_of<_Sender>,
                                      env_of_t<_Receiver>, __q<__decayed_tuple>,
                                      __mk_let_state>;
 
-        _Sched __sched = query_or(get_completion_scheduler<_SetTag>,
+        _Sched __sched = query_or(get_completion_scheduler<_Set>,
                                   stdexec::get_env(__sndr), __none_such());
         return __let_state_t{
             STDEXEC_CALL_EXPLICIT_THIS_MEMFN((_Sender&&)__sndr,
@@ -4406,25 +4291,23 @@
     };
 
     template <class _State, class _Receiver, class... _As>
-    static void __bind(_State&& __state, _Receiver&& __rcvr,
+    static void __bind(_State& __state, _Receiver& __rcvr,
                        _As&&... __as) noexcept
     {
         try
         {
-            using __fun_t = typename _State::__fun_t;
-            using __sched_t = typename _State::__sched_t;
-            using __tuple_t = __decayed_tuple<_As...>;
-            using __op_state_t = __minvoke<
-                __op_state_for<_Receiver, __fun_t, _SetTag, __sched_t>, _As...>;
             auto& __args =
-                __state.__args_.template emplace<__tuple_t>((_As&&)__as...);
-            auto& __op =
-                __state.__op_state3_.template emplace<__op_state_t>(__conv{[&] {
-                return stdexec::connect(
-                    __apply(std::move(__state.__fun_), __args),
-                    __state.__get_result_receiver((_Receiver&&)__rcvr));
-            }});
-            stdexec::start(__op);
+                __state.__args_.template emplace<__decayed_tuple<_As...>>(
+                    (_As&&)__as...);
+            auto __sndr2 = __apply(std::move(__state.__fun_), __args);
+            auto __rcvr2 = __state.__get_result_receiver((_Receiver&&)__rcvr);
+            auto __mkop = [&] {
+                return stdexec::connect(std::move(__sndr2), std::move(__rcvr2));
+            };
+            auto& __op2 =
+                __state.__op_state3_.template emplace<decltype(__mkop())>(
+                    __conv{__mkop});
+            stdexec::start(__op2);
         }
         catch (...)
         {
@@ -4436,9 +4319,9 @@
         []<class _State, class _Receiver, class _Tag, class... _As>(
             __ignore, _State& __state, _Receiver& __rcvr, _Tag,
             _As&&... __as) noexcept -> void {
-        if constexpr (std::same_as<_Tag, _SetTag>)
+        if constexpr (std::same_as<_Tag, _Set>)
         {
-            __bind((_State&&)__state, (_Receiver&&)__rcvr, (_As&&)__as...);
+            __bind(__state, __rcvr, (_As&&)__as...);
         }
         else
         {
@@ -4457,9 +4340,9 @@
 using let_stopped_t = __let::__let_t<set_stopped_t>;
 inline constexpr let_stopped_t let_stopped{};
 
-template <class _SetTag, class _Domain>
-struct __sexpr_impl<__let::__let_t<_SetTag, _Domain>> :
-    __let::__let_impl<_SetTag, _Domain>
+template <class _Set, class _Domain>
+struct __sexpr_impl<__let::__let_t<_Set, _Domain>> :
+    __let::__let_impl<_Set, _Domain>
 {};
 
 /////////////////////////////////////////////////////////////////////////////
@@ -4835,200 +4718,10 @@
 //     tuple<set_error_t, __decay_t<_Error2>>,
 //        ...
 //   >
-template <class _State, class... _Tuples>
-using __make_bind_ = __mbind_back<_State, _Tuples...>;
-
-template <class _State>
-using __make_bind = __mbind_front_q<__make_bind_, _State>;
-
-template <class _Tag>
-using __tuple_t = __mbind_front_q<__decayed_tuple, _Tag>;
-
-template <class _Sender, class _Env, class _State, class _Tag>
-using __bind_completions_t =
-    __gather_completions_for<_Tag, _Sender, _Env, __tuple_t<_Tag>,
-                             __make_bind<_State>>;
-
-template <class _Sender, class _Env>
-using __variant_for_t = //
-    __minvoke<__minvoke<
-        __mfold_right<__nullable_variant_t,
-                      __mbind_front_q<__bind_completions_t, _Sender, _Env>>,
-        set_value_t, set_error_t, set_stopped_t>>;
-
-template <class _SchedulerId, class _VariantId, class _ReceiverId>
-struct __operation1_base;
-
-// This receiver is to be completed on the execution context
-// associated with the scheduler. When the source sender
-// completes, the completion information is saved off in the
-// operation state so that when this receiver completes, it can
-// read the completion out of the operation state and forward it
-// to the output receiver after transitioning to the scheduler's
-// context.
-template <class _SchedulerId, class _VariantId, class _ReceiverId>
-struct __receiver2
-{
-    using _Receiver = stdexec::__t<_ReceiverId>;
-
-    struct __t
-    {
-        using receiver_concept = receiver_t;
-        using __id = __receiver2;
-        __operation1_base<_SchedulerId, _VariantId, _ReceiverId>* __op_state_;
-
-        // If the work is successfully scheduled on the new execution
-        // context and is ready to run, forward the completion signal in
-        // the operation state
-        template <same_as<set_value_t> _Tag>
-        friend void tag_invoke(_Tag, __t&& __self) noexcept
-        {
-            __self.__op_state_->__complete();
-        }
-
-        template <__one_of<set_error_t, set_stopped_t> _Tag, class... _As>
-            requires __callable<_Tag, _Receiver, _As...>
-        friend void tag_invoke(_Tag, __t&& __self, _As&&... __as) noexcept
-        {
-            _Tag{}((_Receiver&&)__self.__op_state_->__rcvr_, (_As&&)__as...);
-        }
-
-        friend auto tag_invoke(get_env_t, const __t& __self) noexcept
-            -> env_of_t<_Receiver>
-        {
-            return get_env(__self.__op_state_->__rcvr_);
-        }
-    };
-};
-
-// This receiver is connected to the input sender. When that
-// sender completes (on whatever context it completes on), save
-// the completion information into the operation state. Then,
-// schedule a second operation to __complete on the execution
-// context of the scheduler. That second receiver will read the
-// completion information out of the operation state and propagate
-// it to the output receiver from within the desired context.
-template <class _SchedulerId, class _VariantId, class _ReceiverId>
-struct __receiver1
-{
-    using _Scheduler = stdexec::__t<_SchedulerId>;
-    using _Receiver = stdexec::__t<_ReceiverId>;
-    using __receiver2_t =
-        stdexec::__t<__receiver2<_SchedulerId, _VariantId, _ReceiverId>>;
-
-    struct __t
-    {
-        using __id = __receiver1;
-        using receiver_concept = receiver_t;
-        __operation1_base<_SchedulerId, _VariantId, _ReceiverId>* __op_state_;
-
-        template <class... _Args>
-        static constexpr bool __nothrow_complete_ =
-            (__nothrow_decay_copyable<_Args> && ...);
-
-        template <class _Tag, class... _Args>
-        static void __complete_(_Tag, __t&& __self, _Args&&... __args) //
-            noexcept(__nothrow_complete_<_Args...>)
-        {
-            // Write the tag and the args into the operation state so that
-            // we can forward the completion from within the scheduler's
-            // execution context.
-            __self.__op_state_->__data_
-                .template emplace<__decayed_tuple<_Tag, _Args...>>(
-                    _Tag{}, (_Args&&)__args...);
-            // Enqueue the schedule operation so the completion happens
-            // on the scheduler's execution context.
-            start(__self.__op_state_->__state2_);
-        }
-
-        template <__completion_tag _Tag, class... _Args>
-            requires __callable<_Tag, _Receiver, __decay_t<_Args>...>
-        friend void tag_invoke(_Tag __tag, __t&& __self,
-                               _Args&&... __args) noexcept
-        {
-            __try_call((_Receiver&&)__self.__op_state_->__rcvr_,
-                       __function_constant_v<__complete_<_Tag, _Args...>>,
-                       (_Tag&&)__tag, (__t&&)__self, (_Args&&)__args...);
-        }
-
-        friend auto tag_invoke(get_env_t, const __t& __self) noexcept
-            -> env_of_t<_Receiver>
-        {
-            return get_env(__self.__op_state_->__rcvr_);
-        }
-    };
-};
-
-template <class _SchedulerId, class _VariantId, class _ReceiverId>
-struct __operation1_base : __immovable
-{
-    using _Scheduler = stdexec::__t<_SchedulerId>;
-    using _Receiver = stdexec::__t<_ReceiverId>;
-    using _Variant = stdexec::__t<_VariantId>;
-    using __receiver2_t =
-        stdexec::__t<__receiver2<_SchedulerId, _VariantId, _ReceiverId>>;
-
-    _Scheduler __sched_;
-    _Receiver __rcvr_;
-    _Variant __data_;
-    connect_result_t<schedule_result_t<_Scheduler>, __receiver2_t> __state2_;
-
-    __operation1_base(_Scheduler __sched, _Receiver&& __rcvr) :
-        __sched_((_Scheduler&&)__sched), __rcvr_((_Receiver&&)__rcvr),
-        __state2_(connect(schedule(__sched_), __receiver2_t{this}))
-    {}
-
-    void __complete() noexcept
-    {
-        STDEXEC_ASSERT(!__data_.valueless_by_exception());
-        std::visit(
-            [this]<class _Tup>(_Tup& __tupl) -> void {
-            if constexpr (same_as<_Tup, std::monostate>)
-            {
-                std::terminate(); // reaching this indicates a bug in
-                                  // schedule_from
-            }
-            else
-            {
-                __apply(
-                    [&]<class... _Args>(auto __tag, _Args&... __args) -> void {
-                    __tag((_Receiver&&)__rcvr_, (_Args&&)__args...);
-                },
-                    __tupl);
-            }
-        },
-            __data_);
-    }
-};
-
-template <class _SchedulerId, class _CvrefSenderId, class _ReceiverId>
-struct __operation1
-{
-    using _Scheduler = stdexec::__t<_SchedulerId>;
-    using _CvrefSender = stdexec::__cvref_t<_CvrefSenderId>;
-    using _Receiver = stdexec::__t<_ReceiverId>;
-    using __variant_t = __variant_for_t<_CvrefSender, env_of_t<_Receiver>>;
-    using __receiver1_t = stdexec::__t<
-        __receiver1<_SchedulerId, stdexec::__id<__variant_t>, _ReceiverId>>;
-    using __base_t = __operation1_base<_SchedulerId, stdexec::__id<__variant_t>,
-                                       _ReceiverId>;
-
-    struct __t : __base_t
-    {
-        using __id = __operation1;
-        connect_result_t<_CvrefSender, __receiver1_t> __state1_;
-
-        __t(_Scheduler __sched, _CvrefSender&& __sndr, _Receiver&& __rcvr) :
-            __base_t{(_Scheduler&&)__sched, (_Receiver&&)__rcvr},
-            __state1_(connect((_CvrefSender&&)__sndr, __receiver1_t{this}))
-        {}
-
-        friend void tag_invoke(start_t, __t& __op_state) noexcept
-        {
-            start(__op_state.__state1_);
-        }
-    };
-};
+template <class _CvrefSender, class _Env>
+using __variant_for_t = __compl_sigs::__maybe_for_all_sigs<
+    __completion_signatures_of_t<_CvrefSender, _Env>, __q<__decayed_tuple>,
+    __nullable_variant_t>;
 
 template <class _Tp>
 using __decay_rvalue_ref = __decay_t<_Tp>&&;
@@ -5038,6 +4731,30 @@
     __transform<__q<__decay_rvalue_ref>,
                 __mcompose<__q<completion_signatures>, __qf<_Tag>>>;
 
+template <class... _Ts>
+using __all_nothrow_decay_copyable_ =
+    __mbool<(__nothrow_decay_copyable<_Ts> && ...)>;
+
+template <class _CvrefSender, class _Env>
+using __all_values_and_errors_nothrow_decay_copyable = //
+    __compl_sigs::__maybe_for_all_sigs<
+        __completion_signatures_of_t<_CvrefSender, _Env>,
+        __q<__all_nothrow_decay_copyable_>, __q<__mand>>;
+
+template <class _CvrefSender, class _Env>
+using __with_error_t = //
+    __if<__all_values_and_errors_nothrow_decay_copyable<_CvrefSender, _Env>,
+         completion_signatures<>, __with_exception_ptr>;
+
+template <class _Scheduler, class _CvrefSender, class _Env>
+using __completions_t = //
+    __try_make_completion_signatures<
+        _CvrefSender, _Env,
+        __try_make_completion_signatures<schedule_result_t<_Scheduler>, _Env,
+                                         __with_error_t<_CvrefSender, _Env>,
+                                         __mconst<completion_signatures<>>>,
+        __decay_signature<set_value_t>, __decay_signature<set_error_t>>;
+
 template <class _SchedulerId>
 struct __environ
 {
@@ -5056,31 +4773,75 @@
     };
 };
 
-template <class... _Ts>
-using __all_nothrow_decay_copyable =
-    __mbool<(__nothrow_decay_copyable<_Ts> && ...)>;
+template <class _Scheduler, class _Sexpr, class _Receiver>
+struct __state;
 
-template <class _CvrefSender, class _Env>
-using __all_values_and_errors_nothrow_decay_copyable = //
-    __mand<
-        __try_error_types_of_t<_CvrefSender, _Env,
-                               __q<__all_nothrow_decay_copyable>>,
-        __try_value_types_of_t<_CvrefSender, _Env,
-                               __q<__all_nothrow_decay_copyable>, __q<__mand>>>;
+// This receiver is to be completed on the execution context
+// associated with the scheduler. When the source sender
+// completes, the completion information is saved off in the
+// operation state so that when this receiver completes, it can
+// read the completion out of the operation state and forward it
+// to the output receiver after transitioning to the scheduler's
+// context.
+template <class _Scheduler, class _Sexpr, class _Receiver>
+struct __receiver2 :
+    receiver_adaptor<__receiver2<_Scheduler, _Sexpr, _Receiver>>
+{
+    explicit __receiver2(
+        __state<_Scheduler, _Sexpr, _Receiver>* __state) noexcept :
+        __state_{__state}
+    {}
 
-template <class _CvrefSender, class _Env>
-using __with_error_t = //
-    __if<__all_values_and_errors_nothrow_decay_copyable<_CvrefSender, _Env>,
-         completion_signatures<>, __with_exception_ptr>;
+    _Receiver&& base() && noexcept
+    {
+        return std::move(__state_->__receiver());
+    }
 
-template <class _Scheduler, class _CvrefSender, class _Env>
-using __completions_t = //
-    __try_make_completion_signatures<
-        _CvrefSender, _Env,
-        __try_make_completion_signatures<schedule_result_t<_Scheduler>, _Env,
-                                         __with_error_t<_CvrefSender, _Env>,
-                                         __mconst<completion_signatures<>>>,
-        __decay_signature<set_value_t>, __decay_signature<set_error_t>>;
+    const _Receiver& base() const& noexcept
+    {
+        return __state_->__receiver();
+    }
+
+    void set_value() && noexcept
+    {
+        STDEXEC_ASSERT(!__state_->__data_.valueless_by_exception());
+        std::visit(
+            [__state = __state_]<class _Tup>(_Tup& __tupl) noexcept -> void {
+            if constexpr (same_as<_Tup, std::monostate>)
+            {
+                std::terminate(); // reaching this indicates a bug in
+                                  // schedule_from
+            }
+            else
+            {
+                __apply(
+                    [&]<class... _Args>(auto __tag,
+                                        _Args&... __args) noexcept -> void {
+                    __tag(std::move(__state->__receiver()), (_Args&&)__args...);
+                },
+                    __tupl);
+            }
+        },
+            __state_->__data_);
+    }
+
+    __state<_Scheduler, _Sexpr, _Receiver>* __state_;
+};
+
+template <class _Scheduler, class _Sexpr, class _Receiver>
+struct __state : __enable_receiver_from_this<_Sexpr, _Receiver>
+{
+    using __variant_t =
+        __variant_for_t<__child_of<_Sexpr>, env_of_t<_Receiver>>;
+    using __receiver2_t = __receiver2<_Scheduler, _Sexpr, _Receiver>;
+
+    __variant_t __data_;
+    connect_result_t<schedule_result_t<_Scheduler>, __receiver2_t> __state2_;
+
+    explicit __state(_Scheduler __sched) :
+        __data_(), __state2_(connect(schedule(__sched), __receiver2_t{this}))
+    {}
+};
 
 struct schedule_from_t
 {
@@ -5104,30 +4865,10 @@
 
 struct __schedule_from_impl : __sexpr_defaults
 {
-    template <class, class _Data, class _Child>
-    using __env_ = __env::__env_join_t<_Data, env_of_t<_Child>>;
-
-    template <class _Sender>
-    using __env_t = __mapply<__q<__env_>, _Sender>;
-
     template <class _Sender>
     using __scheduler_t =
         __decay_t<__call_result_t<get_completion_scheduler_t<set_value_t>,
-                                  __env_t<_Sender>>>;
-
-    template <class _Sender, class _Receiver>
-    using __receiver_t = //
-        stdexec::__t<__receiver1<stdexec::__id<__scheduler_t<_Sender>>,
-                                 stdexec::__id<__variant_for_t<
-                                     __child_of<_Sender>, env_of_t<_Receiver>>>,
-                                 stdexec::__id<_Receiver>>>;
-
-    template <class _Sender, class _Receiver>
-    using __operation_t =          //
-        stdexec::__t<__operation1< //
-            stdexec::__id<__scheduler_t<_Sender>>,
-            stdexec::__cvref_id<__child_of<_Sender>>,
-            stdexec::__id<_Receiver>>>;
+                                  env_of_t<_Sender>>>;
 
     static constexpr auto get_attrs = //
         []<class _Data, class _Child>(const _Data& __data,
@@ -5142,21 +4883,45 @@
         return {};
     };
 
-    static constexpr auto connect =                             //
-        []<class _Sender, receiver _Receiver>(_Sender&& __sndr,
-                                              _Receiver __rcvr) //
-        -> __operation_t<_Sender, _Receiver>                    //
-        requires sender_to<__child_of<_Sender>,
-                           __receiver_t<_Sender, _Receiver>>
-    {
+    static constexpr auto get_state =
+        []<class _Sender, class _Receiver>(_Sender&& __sndr, _Receiver&) {
         static_assert(sender_expr_for<_Sender, schedule_from_t>);
-        return __sexpr_apply((_Sender&&)__sndr,
-                             [&]<class _Data, class _Child>(
-                                 __ignore, _Data&& __data, _Child&& __child)
-                                 -> __operation_t<_Sender, _Receiver> {
-            auto __sched = get_completion_scheduler<set_value_t>(__data);
-            return {__sched, (_Child&&)__child, (_Receiver&&)__rcvr};
-        });
+        auto __sched =
+            get_completion_scheduler<set_value_t>(stdexec::get_env(__sndr));
+        using _Scheduler = decltype(__sched);
+        return __state<_Scheduler, _Sender, _Receiver>{__sched};
+    };
+
+    static constexpr auto complete =
+        []<class _Tag, class... _Args>(__ignore, auto& __state, auto& __rcvr,
+                                       _Tag,
+                                       _Args&&... __args) noexcept -> void {
+        // Write the tag and the args into the operation state so that
+        // we can forward the completion from within the scheduler's
+        // execution context.
+        using __async_result = __decayed_tuple<_Tag, _Args...>;
+        if constexpr (__nothrow_constructible_from<__async_result, _Tag,
+                                                   _Args...>)
+        {
+            __state.__data_.template emplace<__async_result>(
+                _Tag(), (_Args&&)__args...);
+        }
+        else
+        {
+            try
+            {
+                __state.__data_.template emplace<__async_result>(
+                    _Tag(), (_Args&&)__args...);
+            }
+            catch (...)
+            {
+                set_error(std::move(__rcvr), std::current_exception());
+                return;
+            }
+        }
+        // Enqueue the schedule operation so the completion happens
+        // on the scheduler's execution context.
+        stdexec::start(__state.__state2_);
     };
 };
 } // namespace __schedule_from
@@ -5330,44 +5095,6 @@
 // __write adaptor
 namespace __write_
 {
-template <class _ReceiverId, class _Env>
-struct __operation_base
-{
-    using _Receiver = __t<_ReceiverId>;
-    _Receiver __rcvr_;
-    const _Env __env_;
-};
-
-inline constexpr auto __get_env = [](auto* __op) noexcept -> decltype(auto) {
-    return (__op->__env_);
-};
-
-template <class _ReceiverId, class _Env>
-using __receiver_t = //
-    __t<__detail::__receiver_with<&__operation_base<_ReceiverId, _Env>::__rcvr_,
-                                  __get_env>>;
-
-template <class _CvrefSenderId, class _ReceiverId, class _Env>
-struct __operation : __operation_base<_ReceiverId, _Env>
-{
-    using _CvrefSender = __cvref_t<_CvrefSenderId>;
-    using _Receiver = __t<_ReceiverId>;
-    using __base_t = __operation_base<_ReceiverId, _Env>;
-    using __receiver_t = __write_::__receiver_t<_ReceiverId, _Env>;
-    connect_result_t<_CvrefSender, __receiver_t> __state_;
-
-    __operation(_CvrefSender&& __sndr, _Receiver&& __rcvr, auto&& __env) :
-        __base_t{(_Receiver&&)__rcvr, (decltype(__env))__env},
-        __state_{
-            stdexec::connect((_CvrefSender&&)__sndr, __receiver_t{{}, this})}
-    {}
-
-    friend void tag_invoke(start_t, __operation& __self) noexcept
-    {
-        start(__self.__state_);
-    }
-};
-
 struct __write_t
 {
     template <sender _Sender, class... _Envs>
@@ -5388,8 +5115,8 @@
     STDEXEC_ATTRIBUTE((always_inline))
     static auto __transform_env_fn(_Env&& __env) noexcept
     {
-        return [&](__ignore, const auto& __data, __ignore) noexcept {
-            return __join_env(__data, (_Env&&)__env);
+        return [&](__ignore, const auto& __state, __ignore) noexcept {
+            return __join_env(__state, (_Env&&)__env);
         };
     }
 
@@ -5402,27 +5129,9 @@
 
 struct __write_impl : __sexpr_defaults
 {
-    template <class _Self, class _Receiver>
-    using __receiver_t =
-        __write_::__receiver_t<__id<_Receiver>, __decay_t<__data_of<_Self>>>;
-
-    template <class _Self, class _Receiver>
-    using __operation_t =
-        __operation<__cvref_id<__child_of<_Self>>, __id<_Receiver>,
-                    __decay_t<__data_of<_Self>>>;
-
-    static constexpr auto connect =                                           //
-        []<class _Self, receiver _Receiver>(_Self&& __self, _Receiver __rcvr) //
-        -> __operation_t<_Self, _Receiver>                                    //
-        requires sender_to<__child_of<_Self>, __receiver_t<_Self, _Receiver>>
-    {
-        static_assert(sender_expr_for<_Self, __write_t>);
-        return __sexpr_apply((_Self&&)__self,
-                             [&]<class _Env, class _Child>(
-                                 __write_t, _Env&& __env, _Child&& __child) //
-                             -> __operation_t<_Self, _Receiver> {
-            return {(_Child&&)__child, (_Receiver&&)__rcvr, (_Env&&)__env};
-        });
+    static constexpr auto get_env = //
+        [](__ignore, const auto& __state, const auto& __rcvr) noexcept {
+        return __join_env(__state, stdexec::get_env(__rcvr));
     };
 
     static constexpr auto get_completion_signatures = //
@@ -5457,201 +5166,26 @@
 
     return __env_t{__env};
 }
+
+template <class _Ty, class = __name_of<__decay_t<_Ty>>>
+struct __always
+{
+    _Ty __val_;
+
+    _Ty operator()() noexcept
+    {
+        return static_cast<_Ty&&>(__val_);
+    }
+};
+
+template <class _Ty>
+__always(_Ty) -> __always<_Ty>;
 } // namespace __detail
 
 /////////////////////////////////////////////////////////////////////////////
 // [execution.senders.adaptors.on]
 namespace __on
 {
-template <class _SchedulerId, class _SenderId, class _ReceiverId>
-struct __operation;
-
-template <class _SchedulerId, class _SenderId, class _ReceiverId>
-struct __operation_base : __immovable
-{
-    using _Scheduler = stdexec::__t<_SchedulerId>;
-    using _Sender = stdexec::__t<_SenderId>;
-    using _Receiver = stdexec::__t<_ReceiverId>;
-
-    _Scheduler __scheduler_;
-    _Sender __sndr_;
-    _Receiver __rcvr_;
-};
-
-inline constexpr auto __sched_prop = [](auto* __op) noexcept {
-    return __mkprop(__op->__scheduler_, get_scheduler);
-};
-inline constexpr auto __domain_prop = [](auto*) noexcept {
-    return __mkprop(get_domain);
-};
-
-template <class _SchedulerId, class _SenderId, class _ReceiverId>
-using __receiver_ref_t = __t<__detail::__receiver_with<
-    &__operation_base<_SchedulerId, _SenderId, _ReceiverId>::__rcvr_,
-    __sched_prop, __domain_prop>>;
-
-template <class _SchedulerId, class _SenderId, class _ReceiverId>
-struct __receiver
-{
-    using _Scheduler = stdexec::__t<_SchedulerId>;
-    using _Sender = stdexec::__t<_SenderId>;
-    using _Receiver = stdexec::__t<_ReceiverId>;
-
-    struct __t : receiver_adaptor<__t>
-    {
-        using __id = __receiver;
-        using __receiver_ref_t =
-            __on::__receiver_ref_t<_SchedulerId, _SenderId, _ReceiverId>;
-        stdexec::__t<__operation<_SchedulerId, _SenderId, _ReceiverId>>*
-            __op_state_;
-
-        _Receiver&& base() && noexcept
-        {
-            return (_Receiver&&)__op_state_->__rcvr_;
-        }
-
-        const _Receiver& base() const& noexcept
-        {
-            return __op_state_->__rcvr_;
-        }
-
-        void set_value() && noexcept
-        {
-            // cache this locally since *this is going bye-bye.
-            auto* __op_state = __op_state_;
-            try
-            {
-                // This line will invalidate *this:
-                start(__op_state->__data_.template emplace<1>(
-                    __conv{[__op_state] {
-                    return connect((_Sender&&)__op_state->__sndr_,
-                                   __receiver_ref_t{{}, __op_state});
-                }}));
-            }
-            catch (...)
-            {
-                set_error((_Receiver&&)__op_state->__rcvr_,
-                          std::current_exception());
-            }
-        }
-    };
-};
-
-template <class _SchedulerId, class _SenderId, class _ReceiverId>
-struct __operation
-{
-    using _Scheduler = stdexec::__t<_SchedulerId>;
-    using _Sender = stdexec::__t<_SenderId>;
-    using _Receiver = stdexec::__t<_ReceiverId>;
-
-    struct __t : __operation_base<_SchedulerId, _SenderId, _ReceiverId>
-    {
-        using _Base = __operation_base<_SchedulerId, _SenderId, _ReceiverId>;
-        using __id = __operation;
-        using __receiver_t =
-            stdexec::__t<__receiver<_SchedulerId, _SenderId, _ReceiverId>>;
-        using __receiver_ref_t =
-            __on::__receiver_ref_t<_SchedulerId, _SenderId, _ReceiverId>;
-        using __schedule_sender_t = __result_of<schedule, _Scheduler>;
-
-        friend void tag_invoke(start_t, __t& __self) noexcept
-        {
-            start(std::get<0>(__self.__data_));
-        }
-
-        template <class _Sender2, class _Receiver2>
-        __t(_Scheduler __sched, _Sender2&& __sndr, _Receiver2&& __rcvr) :
-            _Base{{},
-                  (_Scheduler&&)__sched,
-                  (_Sender2&&)__sndr,
-                  (_Receiver2&&)__rcvr},
-            __data_{std::in_place_index<0>, __conv{[this] {
-            return connect(schedule(this->__scheduler_),
-                           __receiver_t{{}, this});
-        }}}
-        {}
-
-        std::variant<connect_result_t<__schedule_sender_t, __receiver_t>,
-                     connect_result_t<_Sender, __receiver_ref_t>>
-            __data_;
-    };
-};
-
-template <class _SchedulerId, class _SenderId>
-struct __sender;
-
-struct on_t;
-
-template <class _SchedulerId, class _SenderId>
-struct __sender
-{
-    using _Scheduler = stdexec::__t<_SchedulerId>;
-    using _Sender = stdexec::__t<_SenderId>;
-
-    struct __t
-    {
-        using __id = __sender;
-        using sender_concept = sender_t;
-
-        using __schedule_sender_t = __result_of<schedule, _Scheduler>;
-        template <class _ReceiverId>
-        using __receiver_ref_t =
-            __on::__receiver_ref_t<_SchedulerId, _SenderId, _ReceiverId>;
-        template <class _ReceiverId>
-        using __receiver_t =
-            stdexec::__t<__receiver<_SchedulerId, _SenderId, _ReceiverId>>;
-        template <class _ReceiverId>
-        using __operation_t =
-            stdexec::__t<__operation<_SchedulerId, _SenderId, _ReceiverId>>;
-
-        _Scheduler __scheduler_;
-        _Sender __sndr_;
-
-        template <__decays_to<__t> _Self, receiver _Receiver>
-            requires constructible_from<_Sender,
-                                        __copy_cvref_t<_Self, _Sender>> &&
-                     sender_to<__schedule_sender_t,
-                               __receiver_t<stdexec::__id<_Receiver>>> &&
-                     sender_to<_Sender,
-                               __receiver_ref_t<stdexec::__id<_Receiver>>>
-        friend auto tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
-            -> __operation_t<stdexec::__id<_Receiver>>
-        {
-            return {((_Self&&)__self).__scheduler_, ((_Self&&)__self).__sndr_,
-                    (_Receiver&&)__rcvr};
-        }
-
-        friend auto tag_invoke(get_env_t, const __t& __self) noexcept
-            -> env_of_t<const _Sender&>
-        {
-            return get_env(__self.__sndr_);
-        }
-
-        template <__decays_to<__t> _Self, class _Env>
-        friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
-            -> __try_make_completion_signatures<
-                __schedule_sender_t, _Env,
-                __try_make_completion_signatures<
-                    __copy_cvref_t<_Self, _Sender>,
-                    __make_env_t<_Env, __with<get_scheduler_t, _Scheduler>>,
-                    completion_signatures<set_error_t(std::exception_ptr)>>,
-                __mconst<completion_signatures<>>>
-        {
-            return {};
-        }
-
-        // BUGBUG better would be to port the `on` algorithm to __sexpr
-        template <class _Self, class _Fun, class _OnTag = on_t>
-        static auto apply(_Self&& __self, _Fun __fun)
-            -> __call_result_t<_Fun, _OnTag, __copy_cvref_t<_Self, _Scheduler>,
-                               __copy_cvref_t<_Self, _Sender>>
-        {
-            return ((_Fun&&)__fun)(_OnTag(), ((_Self&&)__self).__scheduler_,
-                                   ((_Self&&)__self).__sndr_);
-        }
-    };
-};
-
 struct on_t
 {
     using _Sender = __1;
@@ -5668,10 +5202,6 @@
             __make_sexpr<on_t>((_Scheduler&&)__sched, (_Sender&&)__sndr));
     }
 
-    template <class _Scheduler, class _Sender>
-    using __sender_t = __t<__sender<stdexec::__id<__decay_t<_Scheduler>>,
-                                    stdexec::__id<__decay_t<_Sender>>>>;
-
     template <class _Env>
     STDEXEC_ATTRIBUTE((always_inline))
     static auto __transform_env_fn(_Env&& __env) noexcept
@@ -5688,14 +5218,13 @@
     }
 
     template <class _Sender, class _Env>
-        requires __is_not_instance_of<__id<__decay_t<_Sender>>, __sender>
     static auto transform_sender(_Sender&& __sndr, const _Env&)
     {
         return __sexpr_apply((_Sender&&)__sndr,
                              []<class _Data, class _Child>(
                                  __ignore, _Data&& __data, _Child&& __child) {
-            return __sender_t<_Data, _Child>{(_Data&&)__data,
-                                             (_Child&&)__child};
+            return let_value(schedule(__data),
+                             __detail::__always{(_Child&&)__child});
         });
     }
 };
@@ -5704,14 +5233,6 @@
 using __on::on_t;
 inline constexpr on_t on{};
 
-// BUGBUG this will also be unnecessary when `on` returns a __sexpr
-namespace __detail
-{
-template <class _SchedulerId, class _SenderId>
-extern __mconst<__on::__sender<__t<_SchedulerId>, __name_of<__t<_SenderId>>>>
-    __name_of_v<__on::__sender<_SchedulerId, _SenderId>>;
-}
-
 /////////////////////////////////////////////////////////////////////////////
 // [execution.senders.adaptors.into_variant]
 namespace __into_variant
@@ -5873,20 +5394,19 @@
                       __single_values_of_t<_Env, _Senders>...>>;
 
 template <class... _Args>
-using __all_nothrow_decay_copyable =
+using __all_nothrow_decay_copyable_ =
     __mbool<(__nothrow_decay_copyable<_Args> && ...)>;
 
 template <class _Env, class... _Senders>
-using __all_value_and_error_args_nothrow_decay_copyable = //
+using __all_nothrow_decay_copyable = //
     __mand<__compl_sigs::__maybe_for_all_sigs<
         __completion_signatures_of_t<_Senders, _Env>,
-        __q<__all_nothrow_decay_copyable>, __q<__mand>>...>;
+        __q<__all_nothrow_decay_copyable_>, __q<__mand>>...>;
 
 template <class _Env, class... _Senders>
 using __completions_t = //
     __concat_completion_signatures_t<
-        __if<__all_value_and_error_args_nothrow_decay_copyable<_Env,
-                                                               _Senders...>,
+        __if<__all_nothrow_decay_copyable<_Env, _Senders...>,
              completion_signatures<set_stopped_t()>,
              completion_signatures<set_stopped_t(),
                                    set_error_t(std::exception_ptr&&)>>,
@@ -5958,9 +5478,7 @@
                   error_types_of_t<_Senders, __env_t<_Env>, __types>...>;
 
     using __errors_variant = //
-        __if<__all_value_and_error_args_nothrow_decay_copyable<_Env,
-                                                               _Senders...>,
-             __error_types,
+        __if<__all_nothrow_decay_copyable<_Env, _Senders...>, __error_types,
              __minvoke<__push_back_unique<__q<std::variant>>, __error_types,
                        std::exception_ptr>>;
 };
@@ -6159,14 +5677,14 @@
     }
 
     static constexpr auto complete = //
-        []<class _Index, class _State, class _Receiver, class _SetTag,
-           class... _Args>(_Index, _State& __state, _Receiver& __rcvr, _SetTag,
+        []<class _Index, class _State, class _Receiver, class _Set,
+           class... _Args>(_Index, _State& __state, _Receiver& __rcvr, _Set,
                            _Args&&... __args) noexcept -> void {
-        if constexpr (same_as<_SetTag, set_error_t>)
+        if constexpr (same_as<_Set, set_error_t>)
         {
             __set_error(__state, __rcvr, (_Args&&)__args...);
         }
-        else if constexpr (same_as<_SetTag, set_stopped_t>)
+        else if constexpr (same_as<_Set, set_stopped_t>)
         {
             __state_t __expected = __started;
             // Transition to the "stopped" state if and only if we're in the
@@ -6598,20 +6116,6 @@
 
 STDEXEC_PRAGMA_POP()
 
-template <class _Ty, class = __name_of<__decay_t<_Ty>>>
-struct __always
-{
-    _Ty __val_;
-
-    _Ty operator()() noexcept
-    {
-        return static_cast<_Ty&&>(__val_);
-    }
-};
-
-template <class _Ty>
-__always(_Ty) -> __always<_Ty>;
-
 struct on_t : __no_scheduler_in_environment
 {
     template <scheduler _Scheduler, sender _Sender>
@@ -6651,7 +6155,7 @@
                                                 _Child&& __child) {
             auto __old = get_scheduler(__env);
             return transfer(let_value(transfer_just(std::move(__sched)),
-                                      __always{(_Child&&)__child}),
+                                      __detail::__always{(_Child&&)__child}),
                             std::move(__old));
         });
     }
@@ -6757,6 +6261,11 @@
 using __sync_wait_result_t =
     __mtry_eval<__sync_wait_result_impl, _Sender, __q<std::tuple>>;
 
+template <class _Sender>
+using __sync_wait_with_variant_result_t =
+    __mtry_eval<__sync_wait_result_impl, __result_of<into_variant, _Sender>,
+                __q<__midentity>>;
+
 template <class... _Values>
 struct __state
 {
@@ -6846,7 +6355,7 @@
 template <class _Sender>
 struct __variant_for
 {
-    using __t = __sync_wait_result_t<__result_of<into_variant, _Sender>>;
+    using __t = __sync_wait_with_variant_result_t<_Sender>;
 };
 template <class _Sender>
 using __variant_for_t = __t<__variant_for<_Sender>>;
@@ -6888,7 +6397,7 @@
     if constexpr (!sender_in<_Sender, __env>)
     {
         using _Completions = __completion_signatures_of_t<_Sender, __env>;
-        if constexpr (!__ok<_Completions>)
+        if constexpr (__merror<_Completions>)
         {
             return _Completions();
         }
@@ -6935,11 +6444,8 @@
     auto operator()(_Sender&& __sndr) const
         -> std::optional<__value_tuple_for_t<_Sender>>
     {
-        using _SD = __early_domain_of_t<_Sender>;
-        constexpr bool __has_custom_impl =
-            __callable<apply_sender_t, _SD, sync_wait_t, _Sender>;
-        using _Domain = __if_c<__has_custom_impl, _SD, default_domain>;
-        return stdexec::apply_sender(_Domain(), *this, (_Sender&&)__sndr);
+        auto __domain = __get_early_domain(__sndr);
+        return stdexec::apply_sender(__domain, *this, (_Sender&&)__sndr);
     }
 
 #if STDEXEC_NVHPC()
@@ -7024,7 +6530,11 @@
     auto apply_sender(_Sender&& __sndr) const
         -> std::optional<__variant_for_t<_Sender>>
     {
-        return sync_wait_t()(into_variant((_Sender&&)__sndr));
+        if (auto __opt_values = sync_wait_t()(into_variant((_Sender&&)__sndr)))
+        {
+            return std::move(std::get<0>(*__opt_values));
+        }
+        return std::nullopt;
     }
 };
 } // namespace __sync_wait
diff --git a/include/sdbusplus/async/stdexec/task.hpp b/include/sdbusplus/async/stdexec/task.hpp
index 379941e..56ad9aa 100644
--- a/include/sdbusplus/async/stdexec/task.hpp
+++ b/include/sdbusplus/async/stdexec/task.hpp
@@ -30,7 +30,6 @@
 #include <variant>
 
 STDEXEC_PRAGMA_PUSH()
-STDEXEC_PRAGMA_IGNORE_GNU("-Wpragmas")
 STDEXEC_PRAGMA_IGNORE_GNU("-Wundefined-inline")
 
 namespace exec