async: switch to stdexec async_scope

Use the `async_scope` implementation from stdexec instead of the
custom `scope` class here, which greatly reduces our own code and
also aligns better with the C++ STL direction.  The major changes are
around exception paths:

    - Spawned tasks which end with an exception will now std::terminate,
      so any task expected to throw should have an `upon_error` chain.

    - Spawned tasks which are stopped will be no longer throw an
      `UnexpectedStop` exception, so they should be chained with an
      `upon_stopped` if this is important.

Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I4e0c85712652efa5b296b898dcc2b0026ba4c625
diff --git a/include/sdbusplus/async.hpp b/include/sdbusplus/async.hpp
index 6c9e834..70b8abf 100644
--- a/include/sdbusplus/async.hpp
+++ b/include/sdbusplus/async.hpp
@@ -4,6 +4,5 @@
 #include <sdbusplus/async/execution.hpp>
 #include <sdbusplus/async/match.hpp>
 #include <sdbusplus/async/proxy.hpp>
-#include <sdbusplus/async/scope.hpp>
 #include <sdbusplus/async/task.hpp>
 #include <sdbusplus/async/timer.hpp>
diff --git a/include/sdbusplus/async/context.hpp b/include/sdbusplus/async/context.hpp
index 17a8a37..9101df3 100644
--- a/include/sdbusplus/async/context.hpp
+++ b/include/sdbusplus/async/context.hpp
@@ -1,7 +1,6 @@
 #pragma once
 
 #include <sdbusplus/async/execution.hpp>
-#include <sdbusplus/async/scope.hpp>
 #include <sdbusplus/async/task.hpp>
 #include <sdbusplus/bus.hpp>
 #include <sdbusplus/event.hpp>
@@ -98,11 +97,11 @@
     /** Stop source */
     std::stop_source initial_stop{};
 
-    scope pending_tasks{loop};
+    async_scope pending_tasks{};
     // In order to coordinate final completion of work, we keep some tasks
     // on a separate scope (the ones which maintain the sd-event/dbus state
     // and keep a final stop-source for them.
-    scope internal_tasks{loop};
+    async_scope internal_tasks{};
     std::stop_source final_stop{};
 
     // Lock and condition variable to signal `caller`.
diff --git a/include/sdbusplus/async/execution.hpp b/include/sdbusplus/async/execution.hpp
index 1de17e0..5fc34d5 100644
--- a/include/sdbusplus/async/execution.hpp
+++ b/include/sdbusplus/async/execution.hpp
@@ -8,6 +8,7 @@
 #pragma GCC diagnostic ignored "-Wnon-template-friend"
 #endif
 #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
+#include <sdbusplus/async/stdexec/async_scope.hpp>
 #include <sdbusplus/async/stdexec/coroutine.hpp>
 #include <sdbusplus/async/stdexec/execution.hpp>
 #pragma GCC diagnostic pop
@@ -17,4 +18,5 @@
 namespace sdbusplus::async
 {
 namespace execution = stdexec;
-}
+using async_scope = exec::async_scope;
+} // namespace sdbusplus::async
diff --git a/include/sdbusplus/async/scope.hpp b/include/sdbusplus/async/scope.hpp
deleted file mode 100644
index 8975194..0000000
--- a/include/sdbusplus/async/scope.hpp
+++ /dev/null
@@ -1,261 +0,0 @@
-#pragma once
-
-#include <sdbusplus/async/execution.hpp>
-#include <sdbusplus/exception.hpp>
-
-#include <deque>
-#include <mutex>
-
-namespace sdbusplus::async
-{
-
-namespace scope_ns
-{
-template <execution::sender Sender>
-struct scope_receiver;
-
-struct scope_sender;
-
-struct scope_completion;
-} // namespace scope_ns
-
-/** A collection of tasks.
- *
- *  It is sometimes useful to run a group of tasks, possibly even detached
- *  from an execution context, but the only option in std::execution is
- *  start_detached, which does not have any mechanism to deal with errors
- *  from those tasks and can potentially leak tasks.  Alternatively, you
- *  can save the result of `execution::connect`, but you need to maintain a
- *  lifetime of it until the connected-task completes.
- *
- *  The scope maintains the lifetime of the tasks it `spawns` so that the
- *  operational-state created by `connect` is preserved (and deleted) as
- *  appropriate.
- */
-struct scope
-{
-    scope() = delete;
-    explicit scope(execution::run_loop& loop) : loop(loop) {}
-
-    // The scope destructor can throw if it was destructed while there are
-    // outstanding tasks.
-    ~scope() noexcept(false);
-
-    /** Spawn a Sender to run on the scope.
-     *
-     *  @param[in] sender - The sender to run.
-     */
-    template <execution::sender_of<execution::set_value_t()> Sender>
-    void spawn(Sender&& sender);
-
-    /** Get a Sender that awaits for all tasks to complete. */
-    scope_ns::scope_sender empty() noexcept;
-
-    template <execution::sender>
-    friend struct scope_ns::scope_receiver;
-
-    friend scope_ns::scope_completion;
-
-  private:
-    void started_task() noexcept;
-    void ended_task(std::exception_ptr&&) noexcept;
-
-    std::mutex lock{};
-    bool started = false;
-    size_t pending_count = 0;
-    std::deque<std::exception_ptr> pending_exceptions = {};
-    scope_ns::scope_completion* pending = nullptr;
-
-    execution::run_loop& loop;
-};
-
-namespace scope_ns
-{
-
-/** The Sender-completion Reciever. */
-template <execution::sender Sender>
-struct scope_receiver
-{
-    template <typename OpState>
-    explicit scope_receiver(OpState* os, scope* s) : op_state(os), s(s)
-    {}
-
-    friend void tag_invoke(execution::set_value_t, scope_receiver&& self,
-                           auto&&...) noexcept
-    {
-        self.complete();
-    }
-
-    friend void tag_invoke(execution::set_error_t, scope_receiver&& self,
-                           std::exception_ptr&& e) noexcept
-    {
-        self.complete(std::move(e));
-    }
-
-    friend void tag_invoke(execution::set_stopped_t,
-                           scope_receiver&& self) noexcept
-    {
-        self.complete(std::make_exception_ptr(exception::UnhandledStop{}));
-    }
-
-    friend decltype(auto) tag_invoke(execution::get_env_t,
-                                     const scope_receiver& self) noexcept
-    {
-        return self;
-    }
-
-    friend decltype(auto) tag_invoke(execution::get_scheduler_t,
-                                     const scope_receiver& self) noexcept
-    {
-        return self.get_scheduler();
-    }
-
-    void complete(std::exception_ptr&& = {}) noexcept;
-
-    void* op_state;
-    scope* s = nullptr;
-
-  private:
-    decltype(auto) get_scheduler() const
-    {
-        return s->loop.get_scheduler();
-    }
-};
-
-/** The holder of the connect operational-state. */
-template <execution::sender Sender>
-struct scope_operation_state
-{
-    using state_t = execution::connect_result_t<Sender, scope_receiver<Sender>>;
-    std::optional<state_t> op_state;
-};
-
-template <execution::sender Sender>
-void scope_receiver<Sender>::complete(std::exception_ptr&& e) noexcept
-{
-    // The Sender is complete, so we need to clean up the saved operational
-    // state.
-
-    // Save the scope (since we're going to delete `this`).
-    auto owning_scope = s;
-    // We also need to save the exception, which is likely owned by `this`.
-    std::exception_ptr ex{std::move(e)};
-
-    // Delete the operational state, which triggers deleting this.
-    delete static_cast<scope_ns::scope_operation_state<Sender>*>(op_state);
-
-    // Inform the scope that a task has completed.
-    owning_scope->ended_task(std::move(ex));
-}
-
-// Virtual class to handle the scope completions.
-struct scope_completion
-{
-    scope_completion() = delete;
-    scope_completion(scope_completion&&) = delete;
-
-    explicit scope_completion(scope& s) : s(s){};
-    virtual ~scope_completion() = default;
-
-    friend scope;
-
-    friend void tag_invoke(execution::start_t, scope_completion& self) noexcept
-    {
-        self.arm();
-    }
-
-  private:
-    virtual void complete(std::exception_ptr&&) noexcept = 0;
-    void arm() noexcept;
-
-    scope& s;
-};
-
-// Implementation (templated based on Reciever) of scope_completion.
-template <execution::receiver Reciever>
-struct scope_operation : scope_completion
-{
-    scope_operation(scope& s, Reciever r) :
-        scope_completion(s), receiver(std::move(r))
-    {}
-
-  private:
-    void complete(std::exception_ptr&& e) noexcept override final
-    {
-        if (e)
-        {
-            execution::set_error(std::move(receiver), std::move(e));
-        }
-        else
-        {
-            execution::set_value(std::move(receiver));
-        }
-    }
-
-    Reciever receiver;
-};
-
-// Scope completion Sender implementation.
-struct scope_sender
-{
-    scope_sender() = delete;
-    explicit scope_sender(scope& m) noexcept : m(m){};
-
-    friend auto tag_invoke(execution::get_completion_signatures_t,
-                           const scope_sender&, auto)
-        -> execution::completion_signatures<execution::set_value_t(),
-                                            execution::set_stopped_t()>;
-
-    template <execution::receiver R>
-    friend auto tag_invoke(execution::connect_t, scope_sender&& self, R r)
-        -> scope_operation<R>
-    {
-        return {self.m, std::move(r)};
-    }
-
-  private:
-    scope& m;
-};
-
-// Most (non-movable) receivers cannot be emplaced without this template magic.
-// Ex. `spawn(std::execution::just())` doesnt' work without this.
-template <typename Fn>
-struct in_place_construct
-{
-    Fn fn;
-    using result_t = decltype(std::declval<Fn>()());
-
-    operator result_t()
-    {
-        return ((Fn&&)fn)();
-    }
-
-    explicit in_place_construct(Fn&& fn) : fn(std::move(fn)){};
-};
-
-} // namespace scope_ns
-
-template <execution::sender_of<execution::set_value_t()> Sender>
-void scope::spawn(Sender&& sender)
-{
-    // Create a holder of the operational state.  Keep it in a unique_ptr
-    // so it is cleaned up if there are any exceptions in this function.
-    auto s = std::make_unique<
-        scope_ns::scope_operation_state<std::decay_t<Sender>>>();
-
-    // Associate the state and scope with the receiver and connect to the
-    // Sender.
-    s->op_state.emplace(scope_ns::in_place_construct{[&] {
-        return execution::connect(
-            std::forward<Sender>(sender),
-            scope_ns::scope_receiver<std::decay_t<Sender>>{s.get(), this});
-    }});
-
-    started_task();
-
-    // Start is noexcept, so it is safe to release the pointer which is now
-    // contained in the receiver.
-    execution::start(s.release()->op_state.value());
-}
-
-} // namespace sdbusplus::async
diff --git a/meson.build b/meson.build
index 448d167..af8cdf3 100644
--- a/meson.build
+++ b/meson.build
@@ -26,7 +26,6 @@
 libsdbusplus_src = files(
     'src/async/context.cpp',
     'src/async/match.cpp',
-    'src/async/scope.cpp',
     'src/bus.cpp',
     'src/event.cpp',
     'src/exception.cpp',
diff --git a/src/async/context.cpp b/src/async/context.cpp
index 1da888c..85e0200 100644
--- a/src/async/context.cpp
+++ b/src/async/context.cpp
@@ -39,7 +39,10 @@
     context& ctx;
     event_t::time_resolution timeout{};
 
-    static task<> loop(context& ctx);
+    // TODO: It seems like we should be able to do a normal `task<>` here
+    //       but spawn on it compile fails.
+    static exec::basic_task<void, exec::__task::__raw_task_context<void>>
+        loop(context& ctx);
     static void wait_once(context& ctx);
 };
 
@@ -99,7 +102,8 @@
     context& ctx;
 };
 
-task<> wait_process_completion::loop(context& ctx)
+exec::basic_task<void, exec::__task::__raw_task_context<void>>
+    wait_process_completion::loop(context& ctx)
 {
     while (!ctx.final_stop.stop_requested())
     {
@@ -142,10 +146,7 @@
     wait_for_wait_process_stopped();
 
     // Wait for all the internal tasks to complete.
-    stdexec::sync_wait(internal_tasks.empty() |
-                       execution::upon_error([&](auto&& e) {
-                           pending_exceptions.emplace_back(std::move(e));
-                       }));
+    stdexec::sync_wait(internal_tasks.on_empty());
 
     // Finish up the loop and join the thread.
     // (There shouldn't be anything going on by this point anyhow.)
@@ -162,8 +163,7 @@
 void context::worker_run()
 {
     // Start the sdbus 'wait/process' loop; treat it as an internal task.
-    internal_tasks.spawn(details::wait_process_completion::loop(*this) |
-                         execution::upon_stopped([]() {}));
+    internal_tasks.spawn(details::wait_process_completion::loop(*this));
 
     // Run the execution::run_loop to handle all the tasks.
     loop.run();
@@ -212,11 +212,8 @@
     }
 
     // Spawn the watch for completion / exceptions.
-    internal_tasks.spawn(pending_tasks.empty() |
-                         execution::then([this]() { spawn_complete(); }) |
-                         execution::upon_error([this](auto&& e) {
-                             spawn_complete(std::move(e));
-                         }));
+    internal_tasks.spawn(pending_tasks.on_empty() |
+                         execution::then([this]() { spawn_complete(); }));
 }
 
 void context::caller_run()
diff --git a/src/async/scope.cpp b/src/async/scope.cpp
deleted file mode 100644
index d9eb8a7..0000000
--- a/src/async/scope.cpp
+++ /dev/null
@@ -1,96 +0,0 @@
-#include <sdbusplus/async/scope.hpp>
-
-#include <exception>
-
-namespace sdbusplus::async
-{
-scope::~scope() noexcept(false)
-{
-    if (pending_count != 0)
-    {
-        throw std::logic_error(
-            "sdbusplus::async::scope destructed while tasks are pending.");
-    }
-}
-
-void scope::started_task() noexcept
-{
-    std::lock_guard l{lock};
-    started = true;
-    ++pending_count;
-}
-
-void scope::ended_task(std::exception_ptr&& e) noexcept
-{
-    scope_ns::scope_completion* p = nullptr;
-
-    {
-        std::lock_guard l{lock};
-        --pending_count; // decrement count.
-
-        if (e)
-        {
-            pending_exceptions.emplace_back(std::exchange(e, {}));
-        }
-
-        // If the scope is complete, get the pending completion, if it exists.
-        if (!pending_exceptions.empty() || (pending_count == 0))
-        {
-            p = std::exchange(pending, nullptr);
-        }
-    }
-
-    if (p)
-    {
-        std::exception_ptr fwd_exception = {};
-        if (!pending_exceptions.empty())
-        {
-            fwd_exception = pending_exceptions.front();
-            pending_exceptions.pop_front();
-        }
-        p->complete(std::move(fwd_exception));
-    }
-}
-
-scope_ns::scope_sender scope::empty() noexcept
-{
-    return scope_ns::scope_sender(*this);
-}
-
-namespace scope_ns
-{
-void scope_completion::arm() noexcept
-{
-    bool done = false;
-    std::exception_ptr e{};
-
-    {
-        std::lock_guard l{s.lock};
-        if (!s.started)
-        {
-            done = false;
-        }
-        else if (!s.pending_exceptions.empty())
-        {
-            e = s.pending_exceptions.front();
-            s.pending_exceptions.pop_front();
-            done = true;
-        }
-        else if (s.pending_count == 0)
-        {
-            done = true;
-        }
-        else
-        {
-            s.pending = this;
-        }
-    }
-
-    if (done)
-    {
-        this->complete(std::move(e));
-    }
-}
-} // namespace scope_ns
-
-} // namespace sdbusplus::async
diff --git a/test/async/context.cpp b/test/async/context.cpp
index 7d7f6fa..8cdd93c 100644
--- a/test/async/context.cpp
+++ b/test/async/context.cpp
@@ -48,48 +48,6 @@
     }
 }
 
-TEST_F(Context, SpawnThrowingTask)
-{
-    ctx->spawn(stdexec::just() |
-               stdexec::then([]() { throw std::logic_error("Oops"); }));
-
-    EXPECT_THROW(runToStop(), std::logic_error);
-    ctx->run();
-}
-
-TEST_F(Context, SpawnThrowingCoroutine)
-{
-    struct _
-    {
-        static auto one() -> sdbusplus::async::task<>
-        {
-            throw std::logic_error("Oops");
-            co_return;
-        }
-    };
-
-    ctx->spawn(_::one());
-    EXPECT_THROW(runToStop(), std::logic_error);
-    ctx->run();
-};
-
-TEST_F(Context, SpawnManyThrowingTasks)
-{
-    static constexpr size_t count = 100;
-    for (size_t i = 0; i < count; ++i)
-    {
-        ctx->spawn(stdexec::just() |
-                   stdexec::then([]() { throw std::logic_error("Oops"); }));
-    }
-    spawnStop();
-
-    for (size_t i = 0; i < count; ++i)
-    {
-        EXPECT_THROW(ctx->run(), std::logic_error);
-    }
-    ctx->run();
-}
-
 TEST_F(Context, SpawnDelayedTask)
 {
     using namespace std::literals;
@@ -153,8 +111,7 @@
     ctx->spawn(sdbusplus::async::sleep_for(*ctx, 1ms) |
                stdexec::then([&m](...) { m.reset(); }));
 
-    EXPECT_THROW(runToStop(), sdbusplus::exception::UnhandledStop);
-    EXPECT_NO_THROW(ctx->run());
+    runToStop();
     EXPECT_FALSE(ran);
 }
 
@@ -182,7 +139,6 @@
     ctx->spawn(sdbusplus::async::sleep_for(*ctx, 1ms) |
                stdexec::then([&]() { m.reset(); }));
 
-    EXPECT_THROW(runToStop(), sdbusplus::exception::UnhandledStop);
-    EXPECT_NO_THROW(ctx->run());
+    runToStop();
     EXPECT_FALSE(ran);
 }