async: context: handle shutdown better

When the context has been requested to stop we don't actually want
to stop it until all work has completed.  Work that is pending
could require the dbus/event to function properly so we need to keep
that running until that time.  If we shutdown those too early, we
can end up with a spawned task which never completes because it is
relying on an event which is never processed.

In order to keep all of these straight, split the scope/stop_tokens
into two: pending and internal.  Pending tasks are for those 'spawned'
due to the user requests and internal tasks are those maintained by the
context itself to aid in processing.  Defer shutting down the
dbus/event handlers until all 'pending' work is completed.

Add a test-case which use to hang and now completes.

Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I399d065ae15b24ad3971c526c27b73757af14b34
diff --git a/include/sdbusplus/async/context.hpp b/include/sdbusplus/async/context.hpp
index 02235d3..093fe79 100644
--- a/include/sdbusplus/async/context.hpp
+++ b/include/sdbusplus/async/context.hpp
@@ -66,6 +66,11 @@
     template <execution::sender_of<execution::set_value_t()> Snd>
     void spawn(Snd&& sender)
     {
+        if (stop_requested())
+        {
+            throw std::logic_error(
+                "sdbusplus::async::context spawn called while already stopped.");
+        }
         pending_tasks.spawn(std::forward<Snd>(sender));
     }
 
@@ -77,7 +82,7 @@
     bool request_stop() noexcept;
     bool stop_requested() noexcept
     {
-        return stop.stop_requested();
+        return initial_stop.stop_requested();
     }
 
     friend details::wait_process_completion;
@@ -94,7 +99,13 @@
     /** The worker thread to handle async tasks. */
     std::thread worker_thread{};
     /** Stop source */
-    std::stop_source stop{};
+    std::stop_source initial_stop{};
+
+    // In order to coordinate final completion of work, we keep some tasks
+    // on a separate scope (the ones which maintain the sd-event/dbus state
+    // and keep a final stop-source for them.
+    scope internal_tasks{};
+    std::stop_source final_stop{};
 
     // Lock and condition variable to signal `caller`.
     std::mutex lock{};
diff --git a/src/async/context.cpp b/src/async/context.cpp
index 60d9e56..a5ed091 100644
--- a/src/async/context.cpp
+++ b/src/async/context.cpp
@@ -101,7 +101,7 @@
 
 task<> wait_process_completion::loop(context& ctx)
 {
-    while (!ctx.stop_requested())
+    while (!ctx.final_stop.stop_requested())
     {
         // Handle the next sdbus event.
         co_await wait_process_sender(ctx);
@@ -125,12 +125,17 @@
 
 bool context::request_stop() noexcept
 {
-    auto first_stop = stop.request_stop();
+    auto first_stop = initial_stop.request_stop();
 
     if (first_stop)
     {
-        caller_wait.notify_one();
-        event_loop.break_run();
+        // Now that the workers have been requested to stop, we need to wait
+        // until they all drain and then stop the internal tasks.
+        internal_tasks.spawn(pending_tasks.empty() | execution::then([this]() {
+                                 final_stop.request_stop();
+                                 caller_wait.notify_one();
+                                 event_loop.break_run();
+                             }));
     }
 
     return first_stop;
@@ -144,14 +149,14 @@
     }};
 
     // Run until the context requested to stop.
-    while (!stop_requested())
+    while (!final_stop.stop_requested())
     {
         // Handle waiting on all the sd-events.
         details::wait_process_completion::wait_once(*this);
     }
 
-    // Wait for all the tasks to complete.
-    std::this_thread::sync_wait(pending_tasks.empty());
+    // Wait for all the internal tasks to complete.
+    std::this_thread::sync_wait(internal_tasks.empty());
 
     // Stop has been requested, so finish up the loop.
     loop.finish();
@@ -169,8 +174,8 @@
     // implemented yet, so we don't have a lot of other options.
     spawn(std::move(startup));
 
-    // Also start up the sdbus 'wait/process' loop.
-    spawn(details::wait_process_completion::loop(*this));
+    // Also start the sdbus 'wait/process' loop; treat it as an internal task.
+    internal_tasks.spawn(details::wait_process_completion::loop(*this));
 
     // Run the execution::run_loop to handle all the tasks.
     loop.run();
@@ -222,7 +227,7 @@
         // don't have the required parameters).
         ctx.caller_wait.wait(lock, [&] {
             return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
-                   (ctx.stop_requested());
+                   (ctx.final_stop.stop_requested());
         });
 
         // Save the waiter as pending.
@@ -234,7 +239,7 @@
 
     // If the context has been requested to be stopped, exit now instead of
     // running the context event loop.
-    if (ctx.stop_requested())
+    if (ctx.final_stop.stop_requested())
     {
         return;
     }
@@ -243,7 +248,7 @@
     ctx.event_loop.run_one(ctx.pending->timeout);
 
     // If there is a stop requested, we need to stop the pending operation.
-    if (ctx.stop_requested())
+    if (ctx.final_stop.stop_requested())
     {
         decltype(ctx.pending) pending = nullptr;
 
@@ -278,7 +283,7 @@
     // deadlocks.
     if (pending != nullptr)
     {
-        if (self->stop_requested())
+        if (self->final_stop.stop_requested())
         {
             pending->stop();
         }
diff --git a/test/async/context.cpp b/test/async/context.cpp
index 83cfde4..4092de5 100644
--- a/test/async/context.cpp
+++ b/test/async/context.cpp
@@ -2,19 +2,51 @@
 
 #include <gtest/gtest.h>
 
-TEST(Context, RunSimple)
+struct Context : public testing::Test
 {
-    sdbusplus::async::context ctx;
-    ctx.run(std::execution::just() |
-            std::execution::then([&ctx]() { ctx.request_stop(); }));
+    ~Context() noexcept = default;
+
+    void TearDown() override
+    {
+        // Destructing the context can throw, so we have to do it in
+        // the TearDown in order to make our destructor noexcept.
+        ctx.reset();
+    }
+
+    std::optional<sdbusplus::async::context> ctx{std::in_place};
+};
+
+TEST_F(Context, RunSimple)
+{
+    ctx->run(std::execution::just() |
+             std::execution::then([this]() { ctx->request_stop(); }));
 }
 
-TEST(Context, SpawnedTask)
+TEST_F(Context, SpawnedTask)
 {
-    sdbusplus::async::context ctx;
+    ctx->spawn(std::execution::just());
 
-    ctx.spawn(std::execution::just());
+    ctx->run(std::execution::just() |
+             std::execution::then([this]() { ctx->request_stop(); }));
+}
 
-    ctx.run(std::execution::just() |
-            std::execution::then([&ctx]() { ctx.request_stop(); }));
+TEST_F(Context, SpawnDelayedTask)
+{
+    using namespace std::literals;
+    static constexpr auto timeout = 500ms;
+
+    auto start = std::chrono::steady_clock::now();
+
+    bool ran = false;
+    ctx->spawn(sdbusplus::async::sleep_for(*ctx, timeout) |
+               std::execution::then([&ran]() { ran = true; }));
+
+    ctx->run(std::execution::just() |
+             std::execution::then([this]() { ctx->request_stop(); }));
+
+    auto stop = std::chrono::steady_clock::now();
+
+    EXPECT_TRUE(ran);
+    EXPECT_GT(stop - start, timeout);
+    EXPECT_LT(stop - start, timeout * 2);
 }