/*
 * Copyright (c) 2021-2022 NVIDIA Corporation
 *
 * Licensed under the Apache License Version 2.0 with LLVM Exceptions
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *   https://llvm.org/LICENSE.txt
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#pragma once

#include "../stdexec/__detail/__intrusive_queue.hpp"
#include "../stdexec/execution.hpp"
#include "../stdexec/stop_token.hpp"
#include "env.hpp"

#include <mutex>

namespace exec
{
/////////////////////////////////////////////////////////////////////////////
// async_scope
namespace __scope
{
using namespace stdexec;

struct __impl;
struct async_scope;

struct __task : __immovable
{
    const __impl* __scope_;
    void (*__notify_waiter)(__task*) noexcept;
    __task* __next_ = nullptr;
};

template <class _BaseEnv>
using __env_t =
    make_env_t<_BaseEnv, with_t<get_stop_token_t, in_place_stop_token>>;

struct __impl
{
    in_place_stop_source __stop_source_{};
    mutable std::mutex __lock_{};
    mutable std::ptrdiff_t __active_ = 0;
    mutable __intrusive_queue<&__task::__next_> __waiters_{};

    ~__impl()
    {
        std::unique_lock __guard{__lock_};
        STDEXEC_ASSERT(__active_ == 0);
        STDEXEC_ASSERT(__waiters_.empty());
    }
};

////////////////////////////////////////////////////////////////////////////
// async_scope::when_empty implementation
template <class _ConstrainedId, class _ReceiverId>
struct __when_empty_op
{
    using _Constrained = __cvref_t<_ConstrainedId>;
    using _Receiver = stdexec::__t<_ReceiverId>;

    struct __t : __task
    {
        using __id = __when_empty_op;

        explicit __t(const __impl* __scope, _Constrained&& __sndr,
                     _Receiver __rcvr) :
            __task{{}, __scope, __notify_waiter},
            __op_(stdexec::connect((_Constrained&&)__sndr, (_Receiver&&)__rcvr))
        {}

      private:
        static void __notify_waiter(__task* __self) noexcept
        {
            start(static_cast<__t*>(__self)->__op_);
        }

        void __start_() noexcept
        {
            std::unique_lock __guard{this->__scope_->__lock_};
            auto& __active = this->__scope_->__active_;
            auto& __waiters = this->__scope_->__waiters_;
            if (__active != 0)
            {
                __waiters.push_back(this);
                return;
            }
            __guard.unlock();
            start(this->__op_);
        }

        friend void tag_invoke(start_t, __t& __self) noexcept
        {
            return __self.__start_();
        }

        STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
        connect_result_t<_Constrained, _Receiver> __op_;
    };
};

template <class _ConstrainedId>
struct __when_empty_sender
{
    using _Constrained = stdexec::__t<_ConstrainedId>;

    struct __t
    {
        using __id = __when_empty_sender;
        using sender_concept = stdexec::sender_t;

        template <class _Self, class _Receiver>
        using __when_empty_op_t =
            stdexec::__t<__when_empty_op<__cvref_id<_Self, _Constrained>,
                                         stdexec::__id<_Receiver>>>;

        template <__decays_to<__t> _Self, receiver _Receiver>
            requires sender_to<__copy_cvref_t<_Self, _Constrained>, _Receiver>
        [[nodiscard]] friend __when_empty_op_t<_Self, _Receiver>
            tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
        {
            return __when_empty_op_t<_Self, _Receiver>{
                __self.__scope_, ((_Self&&)__self).__c_, (_Receiver&&)__rcvr};
        }

        template <__decays_to<__t> _Self, class _Env>
        friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
            -> completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
                                          __env_t<_Env>>
        {
            return {};
        }

        friend empty_env tag_invoke(get_env_t, const __t&) noexcept
        {
            return {};
        }

        const __impl* __scope_;
        STDEXEC_ATTRIBUTE((no_unique_address)) _Constrained __c_;
    };
};

template <class _Constrained>
using __when_empty_sender_t =
    stdexec::__t<__when_empty_sender<__id<__decay_t<_Constrained>>>>;

////////////////////////////////////////////////////////////////////////////
// async_scope::nest implementation
template <class _ReceiverId>
struct __nest_op_base : __immovable
{
    using _Receiver = stdexec::__t<_ReceiverId>;
    const __impl* __scope_;
    STDEXEC_ATTRIBUTE((no_unique_address)) _Receiver __rcvr_;
};

template <class _ReceiverId>
struct __nest_rcvr
{
    using _Receiver = stdexec::__t<_ReceiverId>;

    struct __t
    {
        using __id = __nest_rcvr;
        using receiver_concept = stdexec::receiver_t;
        __nest_op_base<_ReceiverId>* __op_;

        static void __complete(const __impl* __scope) noexcept
        {
            std::unique_lock __guard{__scope->__lock_};
            auto& __active = __scope->__active_;
            if (--__active == 0)
            {
                auto __local = std::move(__scope->__waiters_);
                __guard.unlock();
                __scope = nullptr;
                // do not access __scope
                while (!__local.empty())
                {
                    auto* __next = __local.pop_front();
                    __next->__notify_waiter(__next);
                    // __scope must be considered deleted
                }
            }
        }

        template <__completion_tag _Tag, class... _As>
            requires __callable<_Tag, _Receiver, _As...>
        friend void tag_invoke(_Tag, __t&& __self, _As&&... __as) noexcept
        {
            auto __scope = __self.__op_->__scope_;
            _Tag{}(std::move(__self.__op_->__rcvr_), (_As&&)__as...);
            // do not access __op_
            // do not access this
            __complete(__scope);
        }

        friend __env_t<env_of_t<_Receiver>>
            tag_invoke(get_env_t, const __t& __self) noexcept
        {
            return make_env(
                get_env(__self.__op_->__rcvr_),
                with(get_stop_token,
                     __self.__op_->__scope_->__stop_source_.get_token()));
        }
    };
};

template <class _ConstrainedId, class _ReceiverId>
struct __nest_op
{
    using _Constrained = stdexec::__t<_ConstrainedId>;
    using _Receiver = stdexec::__t<_ReceiverId>;

    struct __t : __nest_op_base<_ReceiverId>
    {
        using __id = __nest_op;
        using __nest_rcvr_t = stdexec::__t<__nest_rcvr<_ReceiverId>>;
        STDEXEC_IMMOVABLE_NO_UNIQUE_ADDRESS
        connect_result_t<_Constrained, __nest_rcvr_t> __op_;

        template <__decays_to<_Constrained> _Sender,
                  __decays_to<_Receiver> _Rcvr>
        explicit __t(const __impl* __scope, _Sender&& __c, _Rcvr&& __rcvr) :
            __nest_op_base<_ReceiverId>{{}, __scope, (_Rcvr&&)__rcvr},
            __op_(stdexec::connect((_Sender&&)__c, __nest_rcvr_t{this}))
        {}

      private:
        void __start_() noexcept
        {
            STDEXEC_ASSERT(this->__scope_);
            std::unique_lock __guard{this->__scope_->__lock_};
            auto& __active = this->__scope_->__active_;
            ++__active;
            __guard.unlock();
            start(__op_);
        }

        friend void tag_invoke(start_t, __t& __self) noexcept
        {
            return __self.__start_();
        }
    };
};

template <class _ConstrainedId>
struct __nest_sender
{
    using _Constrained = stdexec::__t<_ConstrainedId>;

    struct __t
    {
        using __id = __nest_sender;
        using sender_concept = stdexec::sender_t;

        const __impl* __scope_;
        STDEXEC_ATTRIBUTE((no_unique_address)) _Constrained __c_;

        template <class _Receiver>
        using __nest_operation_t =
            stdexec::__t<__nest_op<_ConstrainedId, stdexec::__id<_Receiver>>>;
        template <class _Receiver>
        using __nest_receiver_t =
            stdexec::__t<__nest_rcvr<stdexec::__id<_Receiver>>>;

        template <__decays_to<__t> _Self, receiver _Receiver>
            requires sender_to<__copy_cvref_t<_Self, _Constrained>,
                               __nest_receiver_t<_Receiver>>
        [[nodiscard]] friend __nest_operation_t<_Receiver>
            tag_invoke(connect_t, _Self&& __self, _Receiver __rcvr)
        {
            return __nest_operation_t<_Receiver>{
                __self.__scope_, ((_Self&&)__self).__c_, (_Receiver&&)__rcvr};
        }

        template <__decays_to<__t> _Self, class _Env>
        friend auto tag_invoke(get_completion_signatures_t, _Self&&, _Env&&)
            -> completion_signatures_of_t<__copy_cvref_t<_Self, _Constrained>,
                                          __env_t<_Env>>
        {
            return {};
        }

        friend empty_env tag_invoke(get_env_t, const __t&) noexcept
        {
            return {};
        }
    };
};

template <class _Constrained>
using __nest_sender_t =
    stdexec::__t<__nest_sender<__id<__decay_t<_Constrained>>>>;

////////////////////////////////////////////////////////////////////////////
// async_scope::spawn_future implementation
enum class __future_step
{
    __invalid = 0,
    __created,
    __future,
    __no_future,
    __deleted
};

template <class _Sender, class _Env>
struct __future_state;

struct __forward_stopped
{
    in_place_stop_source* __stop_source_;

    void operator()() noexcept
    {
        __stop_source_->request_stop();
    }
};

struct __subscription : __immovable
{
    void (*__complete_)(__subscription*) noexcept = nullptr;

    void __complete() noexcept
    {
        __complete_(this);
    }

    __subscription* __next_ = nullptr;
};

template <class _SenderId, class _EnvId, class _ReceiverId>
struct __future_op
{
    using _Sender = stdexec::__t<_SenderId>;
    using _Env = stdexec::__t<_EnvId>;
    using _Receiver = stdexec::__t<_ReceiverId>;

    class __t : __subscription
    {
        using __forward_consumer = typename stop_token_of_t<
            env_of_t<_Receiver>>::template callback_type<__forward_stopped>;

        friend void tag_invoke(start_t, __t& __self) noexcept
        {
            __self.__start_();
        }

        void __complete_() noexcept
        {
            try
            {
                auto __state = std::move(__state_);
                STDEXEC_ASSERT(__state != nullptr);
                std::unique_lock __guard{__state->__mutex_};
                // either the future is still in use or it has passed ownership
                // to __state->__no_future_
                if (__state->__no_future_.get() != nullptr ||
                    __state->__step_ != __future_step::__future)
                {
                    // invalid state - there is a code bug in the state machine
                    std::terminate();
                }
                else if (get_stop_token(get_env(__rcvr_)).stop_requested())
                {
                    __guard.unlock();
                    set_stopped((_Receiver&&)__rcvr_);
                    __guard.lock();
                }
                else
                {
                    std::visit(
                        [this, &__guard]<class _Tup>(_Tup& __tup) {
                        if constexpr (same_as<_Tup, std::monostate>)
                        {
                            std::terminate();
                        }
                        else
                        {
                            std::apply(
                                [this, &__guard]<class... _As>(auto tag,
                                                               _As&... __as) {
                                __guard.unlock();
                                tag((_Receiver&&)__rcvr_, (_As&&)__as...);
                                __guard.lock();
                            },
                                __tup);
                        }
                    },
                        __state->__data_);
                }
            }
            catch (...)
            {
                set_error((_Receiver&&)__rcvr_, std::current_exception());
            }
        }

        void __start_() noexcept
        {
            try
            {
                if (!!__state_)
                {
                    std::unique_lock __guard{__state_->__mutex_};
                    if (__state_->__data_.index() != 0)
                    {
                        __guard.unlock();
                        __complete_();
                    }
                    else
                    {
                        __state_->__subscribers_.push_back(this);
                    }
                }
            }
            catch (...)
            {
                set_error((_Receiver&&)__rcvr_, std::current_exception());
            }
        }

        STDEXEC_ATTRIBUTE((no_unique_address)) _Receiver __rcvr_;
        std::unique_ptr<__future_state<_Sender, _Env>> __state_;
        STDEXEC_ATTRIBUTE((no_unique_address))
        __forward_consumer __forward_consumer_;

      public:
        using __id = __future_op;

        ~__t() noexcept
        {
            if (__state_ != nullptr)
            {
                auto __raw_state = __state_.get();
                std::unique_lock __guard{__raw_state->__mutex_};
                if (__raw_state->__data_.index() > 0)
                {
                    // completed given sender
                    // state is no longer needed
                    return;
                }
                __raw_state->__no_future_ = std::move(__state_);
                __raw_state->__step_from_to_(__guard, __future_step::__future,
                                             __future_step::__no_future);
            }
        }

        template <class _Receiver2>
        explicit __t(_Receiver2&& __rcvr,
                     std::unique_ptr<__future_state<_Sender, _Env>> __state) :
            __subscription{{},
                           [](__subscription* __self) noexcept -> void {
            static_cast<__t*>(__self)->__complete_();
        }},
            __rcvr_((_Receiver2&&)__rcvr), __state_(std::move(__state)),
            __forward_consumer_(get_stop_token(get_env(__rcvr_)),
                                __forward_stopped{&__state_->__stop_source_})
        {}
    };
};

#if STDEXEC_NVHPC()
template <class _Fn>
struct __completion_as_tuple2_;

template <class _Tag, class... _Ts>
struct __completion_as_tuple2_<_Tag(_Ts&&...)>
{
    using __t = std::tuple<_Tag, _Ts...>;
};
template <class _Fn>
using __completion_as_tuple_t = stdexec::__t<__completion_as_tuple2_<_Fn>>;

#else

template <class _Tag, class... _Ts>
std::tuple<_Tag, _Ts...> __completion_as_tuple_(_Tag (*)(_Ts&&...));
template <class _Fn>
using __completion_as_tuple_t =
    decltype(__scope::__completion_as_tuple_((_Fn*)nullptr));
#endif

template <class... _Ts>
using __decay_values_t =
    completion_signatures<set_value_t(__decay_t<_Ts>&&...)>;

template <class _Ty>
using __decay_error_t = completion_signatures<set_error_t(__decay_t<_Ty>&&)>;

template <class _Sender, class _Env>
using __future_completions_t = //
    make_completion_signatures<
        _Sender, __env_t<_Env>,
        completion_signatures<set_stopped_t(),
                              set_error_t(std::exception_ptr&&)>,
        __decay_values_t, __decay_error_t>;

template <class _Completions>
using __completions_as_variant = //
    __mapply<__transform<__q<__completion_as_tuple_t>,
                         __mbind_front_q<std::variant, std::monostate>>,
             _Completions>;

template <class _Ty>
struct __dynamic_delete
{
    __dynamic_delete() : __delete_([](_Ty* __p) { delete __p; }) {}

    template <class _Uy>
        requires convertible_to<_Uy*, _Ty*>
    __dynamic_delete(std::default_delete<_Uy>) :
        __delete_([](_Ty* __p) { delete static_cast<_Uy*>(__p); })
    {}

    template <class _Uy>
        requires convertible_to<_Uy*, _Ty*>
    __dynamic_delete& operator=(std::default_delete<_Uy> __d)
    {
        __delete_ = __dynamic_delete{__d}.__delete_;
        return *this;
    }

    void operator()(_Ty* __p)
    {
        __delete_(__p);
    }

    void (*__delete_)(_Ty*);
};

template <class _Completions, class _Env>
struct __future_state_base
{
    __future_state_base(_Env __env, const __impl* __scope) :
        __forward_scope_{std::in_place, __scope->__stop_source_.get_token(),
                         __forward_stopped{&__stop_source_}},
        __env_(
            make_env((_Env&&)__env,
                     with(get_stop_token, __scope->__stop_source_.get_token())))
    {}

    ~__future_state_base()
    {
        std::unique_lock __guard{__mutex_};
        if (__step_ == __future_step::__created)
        {
            // exception during connect() will end up here
            __step_from_to_(__guard, __future_step::__created,
                            __future_step::__deleted);
        }
        else if (__step_ != __future_step::__deleted)
        {
            // completing the given sender before the future is dropped will end
            // here
            __step_from_to_(__guard, __future_step::__future,
                            __future_step::__deleted);
        }
    }

    void __step_from_to_(std::unique_lock<std::mutex>& __guard,
                         __future_step __from, __future_step __to)
    {
        STDEXEC_ASSERT(__guard.owns_lock());
        auto actual = std::exchange(__step_, __to);
        STDEXEC_ASSERT(actual == __from);
    }

    in_place_stop_source __stop_source_;
    std::optional<in_place_stop_callback<__forward_stopped>> __forward_scope_;
    std::mutex __mutex_;
    __future_step __step_ = __future_step::__created;
    std::unique_ptr<__future_state_base, __dynamic_delete<__future_state_base>>
        __no_future_;
    __completions_as_variant<_Completions> __data_;
    __intrusive_queue<&__subscription::__next_> __subscribers_;
    __env_t<_Env> __env_;
};

template <class _Completions, class _EnvId>
struct __future_rcvr
{
    using _Env = stdexec::__t<_EnvId>;

    struct __t
    {
        using __id = __future_rcvr;
        using receiver_concept = stdexec::receiver_t;
        __future_state_base<_Completions, _Env>* __state_;
        const __impl* __scope_;

        void __dispatch_result_() noexcept
        {
            auto& __state = *__state_;
            std::unique_lock __guard{__state.__mutex_};
            auto __local = std::move(__state.__subscribers_);
            __state.__forward_scope_ = std::nullopt;
            if (__state.__no_future_.get() != nullptr)
            {
                // nobody is waiting for the results
                // delete this and return
                __state.__step_from_to_(__guard, __future_step::__no_future,
                                        __future_step::__deleted);
                __guard.unlock();
                __state.__no_future_.reset();
                return;
            }
            __guard.unlock();
            while (!__local.empty())
            {
                auto* __sub = __local.pop_front();
                __sub->__complete();
            }
        }

        template <__completion_tag _Tag, __movable_value... _As>
        friend void tag_invoke(_Tag, __t&& __self, _As&&... __as) noexcept
        {
            auto& __state = *__self.__state_;
            try
            {
                std::unique_lock __guard{__state.__mutex_};
                using _Tuple = __decayed_tuple<_Tag, _As...>;
                __state.__data_.template emplace<_Tuple>(_Tag{},
                                                         (_As&&)__as...);
                __guard.unlock();
                __self.__dispatch_result_();
            }
            catch (...)
            {
                using _Tuple = std::tuple<set_error_t, std::exception_ptr>;
                __state.__data_.template emplace<_Tuple>(
                    set_error_t{}, std::current_exception());
            }
        }

        friend const __env_t<_Env>& tag_invoke(get_env_t,
                                               const __t& __self) noexcept
        {
            return __self.__state_->__env_;
        }
    };
};

template <class _Sender, class _Env>
using __future_receiver_t =
    __t<__future_rcvr<__future_completions_t<_Sender, _Env>, __id<_Env>>>;

template <class _Sender, class _Env>
struct __future_state :
    __future_state_base<__future_completions_t<_Sender, _Env>, _Env>
{
    using _Completions = __future_completions_t<_Sender, _Env>;

    __future_state(_Sender __sndr, _Env __env, const __impl* __scope) :
        __future_state_base<_Completions, _Env>((_Env&&)__env, __scope),
        __op_(
            stdexec::connect((_Sender&&)__sndr,
                             __future_receiver_t<_Sender, _Env>{this, __scope}))
    {}

    connect_result_t<_Sender, __future_receiver_t<_Sender, _Env>> __op_;
};

template <class _SenderId, class _EnvId>
struct __future
{
    using _Sender = stdexec::__t<_SenderId>;
    using _Env = stdexec::__t<_EnvId>;

    struct __t
    {
        using __id = __future;
        using sender_concept = stdexec::sender_t;

        __t(__t&&) = default;
        __t& operator=(__t&&) = default;

        ~__t() noexcept
        {
            if (__state_ != nullptr)
            {
                auto __raw_state = __state_.get();
                std::unique_lock __guard{__raw_state->__mutex_};
                if (__raw_state->__data_.index() != 0)
                {
                    // completed given sender
                    // state is no longer needed
                    return;
                }
                __raw_state->__no_future_ = std::move(__state_);
                __raw_state->__step_from_to_(__guard, __future_step::__future,
                                             __future_step::__no_future);
            }
        }

      private:
        friend struct async_scope;
        template <class _Self>
        using __completions_t =
            __future_completions_t<__mfront<_Sender, _Self>, _Env>;

        template <class _Receiver>
        using __future_op_t = stdexec::__t<
            __future_op<_SenderId, _EnvId, stdexec::__id<_Receiver>>>;

        explicit __t(
            std::unique_ptr<__future_state<_Sender, _Env>> __state) noexcept :
            __state_(std::move(__state))
        {
            std::unique_lock __guard{__state_->__mutex_};
            __state_->__step_from_to_(__guard, __future_step::__created,
                                      __future_step::__future);
        }

        template <__decays_to<__t> _Self, receiver _Receiver>
            requires receiver_of<_Receiver, __completions_t<_Self>>
        friend __future_op_t<_Receiver> tag_invoke(connect_t, _Self&& __self,
                                                   _Receiver __rcvr)
        {
            return __future_op_t<_Receiver>{(_Receiver&&)__rcvr,
                                            std::move(__self.__state_)};
        }

        template <__decays_to<__t> _Self, class _OtherEnv>
        friend auto tag_invoke(get_completion_signatures_t, _Self&&,
                               _OtherEnv&&) -> __completions_t<_Self>
        {
            return {};
        }

        friend empty_env tag_invoke(get_env_t, const __t&) noexcept
        {
            return {};
        }

        std::unique_ptr<__future_state<_Sender, _Env>> __state_;
    };
};

template <class _Sender, class _Env>
using __future_t = stdexec::__t<
    __future<__id<__nest_sender_t<_Sender>>, __id<__decay_t<_Env>>>>;

////////////////////////////////////////////////////////////////////////////
// async_scope::spawn implementation
template <class _Env>
using __spawn_env_t =
    __env::__join_t<_Env, __env::__with<in_place_stop_token, get_stop_token_t>,
                    __env::__with<__inln::__scheduler, get_scheduler_t>>;

template <class _EnvId>
struct __spawn_op_base
{
    using _Env = stdexec::__t<_EnvId>;
    __spawn_env_t<_Env> __env_;
    void (*__delete_)(__spawn_op_base*);
};

template <class _EnvId>
struct __spawn_rcvr
{
    using _Env = stdexec::__t<_EnvId>;

    struct __t
    {
        using __id = __spawn_rcvr;
        using receiver_concept = stdexec::receiver_t;
        __spawn_op_base<_EnvId>* __op_;

        template <__one_of<set_value_t, set_stopped_t> _Tag>
        friend void tag_invoke(_Tag, __t&& __self) noexcept
        {
            __self.__op_->__delete_(__self.__op_);
        }

        // BUGBUG NOT TO SPEC spawn shouldn't accept senders that can fail.
        template <same_as<set_error_t> _Tag>
        [[noreturn]] friend void tag_invoke(_Tag, __t&&,
                                            std::exception_ptr __eptr) noexcept
        {
            std::rethrow_exception(std::move(__eptr));
        }

        friend const __spawn_env_t<_Env>& tag_invoke(get_env_t,
                                                     const __t& __self) noexcept
        {
            return __self.__op_->__env_;
        }
    };
};

template <class _Env>
using __spawn_receiver_t = stdexec::__t<__spawn_rcvr<__id<_Env>>>;

template <class _SenderId, class _EnvId>
struct __spawn_op
{
    using _Env = stdexec::__t<_EnvId>;
    using _Sender = stdexec::__t<_SenderId>;

    struct __t : __spawn_op_base<_EnvId>
    {
        template <__decays_to<_Sender> _Sndr>
        __t(_Sndr&& __sndr, _Env __env, const __impl* __scope) :
            __spawn_op_base<_EnvId>{
                __env::__join(
                    (_Env&&)__env,
                    __env::__with(__scope->__stop_source_.get_token(),
                                  get_stop_token),
                    __env::__with(__inln::__scheduler{}, get_scheduler)),
                [](__spawn_op_base<_EnvId>* __op) {
            delete static_cast<__t*>(__op);
        }},
            __op_(stdexec::connect((_Sndr&&)__sndr,
                                   __spawn_receiver_t<_Env>{this}))
        {}

        void __start_() noexcept
        {
            start(__op_);
        }

        friend void tag_invoke(start_t, __t& __self) noexcept
        {
            return __self.__start_();
        }

        connect_result_t<_Sender, __spawn_receiver_t<_Env>> __op_;
    };
};

template <class _Sender, class _Env>
using __spawn_operation_t = stdexec::__t<__spawn_op<__id<_Sender>, __id<_Env>>>;

////////////////////////////////////////////////////////////////////////////
// async_scope
struct async_scope : __immovable
{
    async_scope() = default;

    template <sender _Constrained>
    [[nodiscard]] __when_empty_sender_t<_Constrained>
        when_empty(_Constrained&& __c) const
    {
        return __when_empty_sender_t<_Constrained>{&__impl_,
                                                   (_Constrained&&)__c};
    }

    [[nodiscard]] auto on_empty() const
    {
        return when_empty(just());
    }

    template <sender _Constrained>
    using nest_result_t = __nest_sender_t<_Constrained>;

    template <sender _Constrained>
    [[nodiscard]] nest_result_t<_Constrained> nest(_Constrained&& __c)
    {
        return nest_result_t<_Constrained>{&__impl_, (_Constrained&&)__c};
    }

    template <__movable_value _Env = empty_env,
              sender_in<__spawn_env_t<_Env>> _Sender>
        requires sender_to<nest_result_t<_Sender>, __spawn_receiver_t<_Env>>
    void spawn(_Sender&& __sndr, _Env __env = {})
    {
        using __op_t = __spawn_operation_t<nest_result_t<_Sender>, _Env>;
        // start is noexcept so we can assume that the operation will complete
        // after this, which means we can rely on its self-ownership to ensure
        // that it is eventually deleted
        stdexec::start(
            *new __op_t{nest((_Sender&&)__sndr), (_Env&&)__env, &__impl_});
    }

    template <__movable_value _Env = empty_env,
              sender_in<__env_t<_Env>> _Sender>
    __future_t<_Sender, _Env> spawn_future(_Sender&& __sndr, _Env __env = {})
    {
        using __state_t = __future_state<nest_result_t<_Sender>, _Env>;
        auto __state = std::make_unique<__state_t>(nest((_Sender&&)__sndr),
                                                   (_Env&&)__env, &__impl_);
        stdexec::start(__state->__op_);
        return __future_t<_Sender, _Env>{std::move(__state)};
    }

    in_place_stop_source& get_stop_source() noexcept
    {
        return __impl_.__stop_source_;
    }

    in_place_stop_token get_stop_token() const noexcept
    {
        return __impl_.__stop_source_.get_token();
    }

    bool request_stop() noexcept
    {
        return __impl_.__stop_source_.request_stop();
    }

  private:
    __impl __impl_;
};
} // namespace __scope

using __scope::async_scope;
} // namespace exec
