async: add scope

In order to be able to create multiple subtasks as a collection
and in order to make sure we keep good track of any tasks that
are created by the context, add a `scope` as a container of subtasks.
This is partially modelled after the C++ P2519 proposal.

Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I6e99b2fa2829d80c320491991f7e038f122963a9
diff --git a/include/sdbusplus/async.hpp b/include/sdbusplus/async.hpp
index 70b8abf..6c9e834 100644
--- a/include/sdbusplus/async.hpp
+++ b/include/sdbusplus/async.hpp
@@ -4,5 +4,6 @@
 #include <sdbusplus/async/execution.hpp>
 #include <sdbusplus/async/match.hpp>
 #include <sdbusplus/async/proxy.hpp>
+#include <sdbusplus/async/scope.hpp>
 #include <sdbusplus/async/task.hpp>
 #include <sdbusplus/async/timer.hpp>
diff --git a/include/sdbusplus/async/context.hpp b/include/sdbusplus/async/context.hpp
index a59dc72..02235d3 100644
--- a/include/sdbusplus/async/context.hpp
+++ b/include/sdbusplus/async/context.hpp
@@ -1,6 +1,7 @@
 #pragma once
 
 #include <sdbusplus/async/execution.hpp>
+#include <sdbusplus/async/scope.hpp>
 #include <sdbusplus/async/task.hpp>
 #include <sdbusplus/bus.hpp>
 #include <sdbusplus/event.hpp>
@@ -58,6 +59,16 @@
     template <execution::sender_of<execution::set_value_t()> Snd>
     void run(Snd&& startup);
 
+    /** Spawn a Sender to run on the context.
+     *
+     * @param[in] sender - The Sender to run.
+     */
+    template <execution::sender_of<execution::set_value_t()> Snd>
+    void spawn(Snd&& sender)
+    {
+        pending_tasks.spawn(std::forward<Snd>(sender));
+    }
+
     bus_t& get_bus() noexcept
     {
         return bus;
@@ -76,6 +87,7 @@
     bus_t bus;
     event_source_t dbus_source;
     event_t event_loop{};
+    scope pending_tasks{};
 
     /** The async run-loop from std::execution. */
     execution::run_loop loop{};
diff --git a/include/sdbusplus/async/scope.hpp b/include/sdbusplus/async/scope.hpp
new file mode 100644
index 0000000..c217a01
--- /dev/null
+++ b/include/sdbusplus/async/scope.hpp
@@ -0,0 +1,161 @@
+#pragma once
+
+#include <sdbusplus/async/execution.hpp>
+
+#include <atomic>
+
+namespace sdbusplus::async
+{
+
+namespace scope_ns
+{
+template <execution::sender Sender>
+struct scope_receiver;
+}
+
+/** A collection of tasks.
+ *
+ *  It is sometimes useful to run a group of tasks, possibly even detached
+ *  from an execution context, but the only option in std::execution is
+ *  start_detached, which does not have any mechanism to deal with errors
+ *  from those tasks and can potentially leak tasks.  Alternatively, you
+ *  can save the result of `execution::connect`, but you need to maintain a
+ *  lifetime of it until the connected-task completes.
+ *
+ *  The scope maintains the lifetime of the tasks it `spawns` so that the
+ *  operational-state created by `connect` is preserved (and deleted) as
+ *  appropriate.
+ */
+struct scope
+{
+    scope() = default;
+
+    // The scope destructor can throw if it was destructed while there are
+    // outstanding tasks.
+    ~scope() noexcept(false);
+
+    /** Spawn a Sender to run on the scope.
+     *
+     *  @param[in] sender - The sender to run.
+     */
+    template <execution::sender_of<execution::set_value_t()> Sender>
+    void spawn(Sender&& sender);
+
+    template <execution::sender>
+    friend struct scope_ns::scope_receiver;
+
+  private:
+    void started_task() noexcept;
+    void ended_task() noexcept;
+
+    std::atomic<size_t> count = 0;
+};
+
+namespace scope_ns
+{
+
+/** The Sender-completion Reciever. */
+template <execution::sender Sender>
+struct scope_receiver
+{
+    template <typename OpState>
+    explicit scope_receiver(OpState* os, scope* s) : op_state(os), s(s)
+    {}
+
+    friend void tag_invoke(execution::set_value_t, scope_receiver&& self,
+                           auto&&...) noexcept
+    {
+        self.complete();
+    }
+
+    friend void tag_invoke(execution::set_error_t, scope_receiver&& self,
+                           auto&&) noexcept
+    {
+        std::terminate(); // TODO: save the exception back into the scope.
+        self.complete();
+    }
+
+    friend void tag_invoke(execution::set_stopped_t,
+                           scope_receiver&& self) noexcept
+    {
+        // std::terminate(); // TODO: this implies a child had an
+        // unhandled_stop.  Need to turn it into an exception.
+        self.complete();
+    }
+
+    friend decltype(auto) tag_invoke(execution::get_env_t,
+                                     const scope_receiver& self) noexcept
+    {
+        return self;
+    }
+
+    void complete() noexcept;
+
+    void* op_state;
+    scope* s = nullptr;
+};
+
+/** The holder of the connect operational-state. */
+template <execution::sender Sender>
+struct scope_operation_state
+{
+    using state_t = execution::connect_result_t<Sender, scope_receiver<Sender>>;
+    std::optional<state_t> op_state;
+};
+
+template <execution::sender Sender>
+void scope_receiver<Sender>::complete() noexcept
+{
+    // The Sender is complete, so we need to clean up the saved operational
+    // state.
+
+    // Save the scope (since we're going to delete `this`).
+    auto owning_scope = s;
+
+    // Delete the operational state, which triggers deleting this.
+    delete static_cast<scope_ns::scope_operation_state<Sender>*>(op_state);
+
+    // Inform the scope that a task has completed.
+    owning_scope->ended_task();
+}
+
+// Most (non-movable) receivers cannot be emplaced without this template magic.
+// Ex. `spawn(std::execution::just())` doesnt' work without this.
+template <typename Fn>
+struct in_place_construct
+{
+    Fn fn;
+    using result_t = decltype(std::declval<Fn>()());
+
+    operator result_t()
+    {
+        return ((Fn &&) fn)();
+    }
+};
+
+} // namespace scope_ns
+
+template <execution::sender_of<execution::set_value_t()> Sender>
+void scope::spawn(Sender&& sender)
+{
+    // Create a holder of the operational state.  Keep it in a unique_ptr
+    // so it is cleaned up if there are any exceptions in this function.
+    auto s = std::make_unique<
+        scope_ns::scope_operation_state<std::decay_t<Sender>>>();
+
+    // Associate the state and scope with the receiver and connect to the
+    // Sender.
+    s->op_state.emplace(scope_ns::in_place_construct{[&] {
+        return execution::connect(
+            std::forward<Sender>(sender),
+            scope_ns::scope_receiver<std::decay_t<Sender>>{s.get(), this});
+    }});
+
+    started_task();
+
+    // Start is noexcept, so it is safe to release the pointer which is now
+    // contained in the receiver.
+    execution::start(s.release()->op_state.value());
+}
+
+} // namespace sdbusplus::async
diff --git a/meson.build b/meson.build
index af8cdf3..448d167 100644
--- a/meson.build
+++ b/meson.build
@@ -26,6 +26,7 @@
 libsdbusplus_src = files(
     'src/async/context.cpp',
     'src/async/match.cpp',
+    'src/async/scope.cpp',
     'src/bus.cpp',
     'src/event.cpp',
     'src/exception.cpp',
diff --git a/src/async/context.cpp b/src/async/context.cpp
index 007a266..38d3a89 100644
--- a/src/async/context.cpp
+++ b/src/async/context.cpp
@@ -164,10 +164,10 @@
     // This shouldn't start detached because we want to be able to forward
     // failures back to the 'run'.  execution::ensure_started isn't
     // implemented yet, so we don't have a lot of other options.
-    execution::start_detached(std::move(startup));
+    spawn(std::move(startup));
 
     // Also start up the sdbus 'wait/process' loop.
-    execution::start_detached(details::wait_process_completion::loop(*this));
+    spawn(details::wait_process_completion::loop(*this));
 
     // Run the execution::run_loop to handle all the tasks.
     loop.run();
diff --git a/src/async/scope.cpp b/src/async/scope.cpp
new file mode 100644
index 0000000..efdc5f6
--- /dev/null
+++ b/src/async/scope.cpp
@@ -0,0 +1,26 @@
+#include <sdbusplus/async/scope.hpp>
+
+#include <exception>
+
+namespace sdbusplus::async
+{
+scope::~scope() noexcept(false)
+{
+    if (count != 0)
+    {
+        throw std::logic_error(
+            "sdbusplus::async::scope destructed while tasks are pending.");
+    }
+}
+
+void scope::started_task() noexcept
+{
+    ++count;
+}
+
+void scope::ended_task() noexcept
+{
+    --count;
+}
+
+} // namespace sdbusplus::async
diff --git a/test/async/context.cpp b/test/async/context.cpp
index 60d3d8d..83cfde4 100644
--- a/test/async/context.cpp
+++ b/test/async/context.cpp
@@ -8,3 +8,13 @@
     ctx.run(std::execution::just() |
             std::execution::then([&ctx]() { ctx.request_stop(); }));
 }
+
+TEST(Context, SpawnedTask)
+{
+    sdbusplus::async::context ctx;
+
+    ctx.spawn(std::execution::just());
+
+    ctx.run(std::execution::just() |
+            std::execution::then([&ctx]() { ctx.request_stop(); }));
+}