async: scope: add completion sender

Add a mechanism to await the completion of an async::scope,
equivalent to a 'join' on the spawned tasks it contains.

Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I669680441704cdb59aa982d97bed610178c498b9
diff --git a/include/sdbusplus/async/scope.hpp b/include/sdbusplus/async/scope.hpp
index c217a01..7962bea 100644
--- a/include/sdbusplus/async/scope.hpp
+++ b/include/sdbusplus/async/scope.hpp
@@ -2,7 +2,7 @@
 
 #include <sdbusplus/async/execution.hpp>
 
-#include <atomic>
+#include <mutex>
 
 namespace sdbusplus::async
 {
@@ -11,7 +11,11 @@
 {
 template <execution::sender Sender>
 struct scope_receiver;
-}
+
+struct scope_sender;
+
+struct scope_completion;
+} // namespace scope_ns
 
 /** A collection of tasks.
  *
@@ -41,14 +45,21 @@
     template <execution::sender_of<execution::set_value_t()> Sender>
     void spawn(Sender&& sender);
 
+    /** Get a Sender that awaits for all tasks to complete. */
+    scope_ns::scope_sender empty() noexcept;
+
     template <execution::sender>
     friend struct scope_ns::scope_receiver;
 
+    friend scope_ns::scope_completion;
+
   private:
     void started_task() noexcept;
     void ended_task() noexcept;
 
-    std::atomic<size_t> count = 0;
+    std::mutex lock{};
+    size_t pending_count = 0;
+    scope_ns::scope_completion* pending = nullptr;
 };
 
 namespace scope_ns
@@ -119,6 +130,67 @@
     owning_scope->ended_task();
 }
 
+// Virtual class to handle the scope completions.
+struct scope_completion
+{
+    scope_completion() = delete;
+    scope_completion(scope_completion&&) = delete;
+
+    explicit scope_completion(scope& s) : s(s){};
+    virtual ~scope_completion() = default;
+
+    friend scope;
+
+    friend void tag_invoke(execution::start_t, scope_completion& self) noexcept
+    {
+        self.arm();
+    }
+
+  private:
+    virtual void complete() noexcept = 0;
+    void arm() noexcept;
+
+    scope& s;
+};
+
+// Implementation (templated based on Reciever) of scope_completion.
+template <execution::receiver Reciever>
+struct scope_operation : scope_completion
+{
+    scope_operation(scope& s, Reciever r) :
+        scope_completion(s), receiver(std::move(r))
+    {}
+
+  private:
+    void complete() noexcept override final
+    {
+        execution::set_value(std::move(receiver));
+    }
+
+    Reciever receiver;
+};
+
+// Scope completion Sender implementation.
+struct scope_sender
+{
+    scope_sender() = delete;
+    explicit scope_sender(scope& m) noexcept : m(m){};
+
+    friend auto tag_invoke(execution::get_completion_signatures_t,
+                           const scope_sender&, auto)
+        -> execution::completion_signatures<execution::set_value_t()>;
+
+    template <execution::receiver R>
+    friend auto tag_invoke(execution::connect_t, scope_sender&& self, R r)
+        -> scope_operation<R>
+    {
+        return {self.m, std::move(r)};
+    }
+
+  private:
+    scope& m;
+};
+
 // Most (non-movable) receivers cannot be emplaced without this template magic.
 // Ex. `spawn(std::execution::just())` doesnt' work without this.
 template <typename Fn>
diff --git a/src/async/context.cpp b/src/async/context.cpp
index 38d3a89..60d9e56 100644
--- a/src/async/context.cpp
+++ b/src/async/context.cpp
@@ -150,6 +150,9 @@
         details::wait_process_completion::wait_once(*this);
     }
 
+    // Wait for all the tasks to complete.
+    std::this_thread::sync_wait(pending_tasks.empty());
+
     // Stop has been requested, so finish up the loop.
     loop.finish();
     if (worker_thread.joinable())
diff --git a/src/async/scope.cpp b/src/async/scope.cpp
index efdc5f6..908def3 100644
--- a/src/async/scope.cpp
+++ b/src/async/scope.cpp
@@ -6,7 +6,7 @@
 {
 scope::~scope() noexcept(false)
 {
-    if (count != 0)
+    if (pending_count != 0)
     {
         throw std::logic_error(
             "sdbusplus::async::scope destructed while tasks are pending.");
@@ -15,12 +15,58 @@
 
 void scope::started_task() noexcept
 {
-    ++count;
+    std::lock_guard l{lock};
+    ++pending_count;
 }
 
 void scope::ended_task() noexcept
 {
-    --count;
+    scope_ns::scope_completion* p = nullptr;
+
+    {
+        std::lock_guard l{lock};
+        --pending_count;
+
+        if (pending_count == 0)
+        {
+            p = std::exchange(pending, nullptr);
+        }
+    }
+
+    if (p)
+    {
+        p->complete();
+    }
 }
 
+scope_ns::scope_sender scope::empty() noexcept
+{
+    return scope_ns::scope_sender(*this);
+}
+
+namespace scope_ns
+{
+void scope_completion::arm() noexcept
+{
+    bool done = false;
+
+    {
+        std::lock_guard l{s.lock};
+        if (s.pending_count == 0)
+        {
+            done = true;
+        }
+        else
+        {
+            s.pending = this;
+        }
+    }
+
+    if (done)
+    {
+        this->complete();
+    }
+}
+} // namespace scope_ns
+
 } // namespace sdbusplus::async