async: context: allow run re-entrance

Switch the context call so that it can be re-entrant.  A single call
to `run` will either:

    1. Process until the context is successfully stopped.
    2. An exception is raised from a spawned task.

If an exception is raised, it could be caught outside the run call
and then run called again.

Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: Icd05df421a1f0592bd2390fbe8e21a16252eafed
diff --git a/include/sdbusplus/async/context.hpp b/include/sdbusplus/async/context.hpp
index 12fc98c..c2d6b81 100644
--- a/include/sdbusplus/async/context.hpp
+++ b/include/sdbusplus/async/context.hpp
@@ -60,13 +60,12 @@
     template <execution::sender_of<execution::set_value_t()> Snd>
     void spawn(Snd&& sender)
     {
-        if (stop_requested())
-        {
-            throw std::logic_error(
-                "sdbusplus::async::context spawn called while already stopped.");
-        }
+        check_stop_requested();
+
         pending_tasks.spawn(
             std::move(execution::on(loop.get_scheduler(), std::move(sender))));
+
+        spawn_watcher();
     }
 
     bus_t& get_bus() noexcept
@@ -74,7 +73,10 @@
         return bus;
     }
 
-    bool request_stop() noexcept;
+    bool request_stop() noexcept
+    {
+        return initial_stop.request_stop();
+    }
     bool stop_requested() noexcept
     {
         return initial_stop.stop_requested();
@@ -106,11 +108,22 @@
     std::mutex lock{};
     std::condition_variable caller_wait{};
 
+    std::exception_ptr pending_exception{};
+    bool spawn_watcher_running = false;
+
     /** Completion object to signal the worker that 'sd_bus_wait' is done. */
     details::wait_process_completion* staged = nullptr;
     details::wait_process_completion* pending = nullptr;
+    bool wait_process_stopped = false;
 
     void worker_run();
+    void spawn_complete(std::exception_ptr&& = {});
+    void check_stop_requested();
+    void spawn_watcher();
+
+    void caller_run();
+    void rethrow_pending_exception();
+    void wait_for_wait_process_stopped();
 
     static int dbus_event_handle(sd_event_source*, int, uint32_t, void*);
 };
diff --git a/src/async/context.cpp b/src/async/context.cpp
index 36f5552..b16cf3e 100644
--- a/src/async/context.cpp
+++ b/src/async/context.cpp
@@ -110,6 +110,11 @@
         // we need to switch to the worker thread.
         co_await execution::schedule(ctx.loop.get_scheduler());
     }
+
+    {
+        std::lock_guard lock{ctx.lock};
+        ctx.wait_process_stopped = true;
+    }
 }
 
 } // namespace details
@@ -123,44 +128,18 @@
     }
 }
 
-bool context::request_stop() noexcept
-{
-    auto first_stop = initial_stop.request_stop();
-
-    if (first_stop)
-    {
-        // Now that the workers have been requested to stop, we need to wait
-        // until they all drain and then stop the internal tasks.
-
-        auto complete = [this]() {
-            final_stop.request_stop();
-            caller_wait.notify_one();
-            event_loop.break_run();
-        };
-
-        internal_tasks.spawn(pending_tasks.empty() | execution::then(complete) |
-                             execution::upon_error([=](auto&& e) {
-                                 complete();
-                                 std::rethrow_exception(e);
-                             }));
-    }
-
-    return first_stop;
-}
-
 void context::run()
 {
-    // Start up the worker thread.
-    worker_thread = std::thread{[this]() { worker_run(); }};
+    // Run the primary portion of the run-loop.
+    caller_run();
 
-    // Run until the context requested to stop.
-    while (!final_stop.stop_requested())
-    {
-        // Handle waiting on all the sd-events.
-        details::wait_process_completion::wait_once(*this);
-    }
+    // Rethrow the pending exception (if it exists).
+    rethrow_pending_exception();
 
-    std::exception_ptr pending_exception{};
+    // Otherwise this should be final_stop...
+
+    // We need to wait for the pending wait process and stop it.
+    wait_for_wait_process_stopped();
 
     // Wait for all the internal tasks to complete.
     std::this_thread::sync_wait(internal_tasks.empty() |
@@ -168,29 +147,150 @@
                                     pending_exception = std::move(e);
                                 }));
 
-    // Stop has been requested, so finish up the loop.
+    // Finish up the loop and join the thread.
+    // (There shouldn't be anything going on by this point anyhow.)
     loop.finish();
     if (worker_thread.joinable())
     {
         worker_thread.join();
     }
 
-    // If there was an exception inside the context, rethrow it.
-    if (pending_exception)
-    {
-        std::rethrow_exception(std::move(pending_exception));
-    }
+    // Check for one last exception.
+    rethrow_pending_exception();
 }
 
 void context::worker_run()
 {
     // Start the sdbus 'wait/process' loop; treat it as an internal task.
-    internal_tasks.spawn(details::wait_process_completion::loop(*this));
+    internal_tasks.spawn(details::wait_process_completion::loop(*this) |
+                         execution::upon_stopped([]() {}));
 
     // Run the execution::run_loop to handle all the tasks.
     loop.run();
 }
 
+void context::spawn_complete(std::exception_ptr&& e)
+{
+    {
+        std::lock_guard l{lock};
+        spawn_watcher_running = false;
+
+        if (e)
+        {
+            pending_exception = std::move(e);
+        }
+    }
+
+    if (stop_requested())
+    {
+        final_stop.request_stop();
+    }
+
+    caller_wait.notify_one();
+    event_loop.break_run();
+}
+
+void context::check_stop_requested()
+{
+    if (stop_requested())
+    {
+        throw std::logic_error(
+            "sdbusplus::async::context spawn called while already stopped.");
+    }
+}
+
+void context::spawn_watcher()
+{
+    {
+        std::lock_guard l{lock};
+        if (spawn_watcher_running)
+        {
+            return;
+        }
+
+        spawn_watcher_running = true;
+    }
+
+    // Spawn the watch for completion / exceptions.
+    internal_tasks.spawn(pending_tasks.empty() |
+                         execution::then([this]() { spawn_complete(); }) |
+                         execution::upon_error([this](auto&& e) {
+                             spawn_complete(std::move(e));
+                         }));
+}
+
+void context::caller_run()
+{
+    // We are able to run the loop until the context is requested to stop or
+    // we get an exception.
+    auto keep_running = [this]() {
+        std::lock_guard l{lock};
+        return !final_stop.stop_requested() && !pending_exception;
+    };
+
+    // If we are suppose to keep running, start the run loop.
+    if (keep_running())
+    {
+        // Start up the worker thread.
+        if (!worker_thread.joinable())
+        {
+            worker_thread = std::thread{[this]() { worker_run(); }};
+        }
+        else
+        {
+            // We've already been running and there might an exception or
+            // completion pending.  Spawn a new watcher that checks for these.
+            spawn_watcher();
+        }
+
+        while (keep_running())
+        {
+            // Handle waiting on all the sd-events.
+            details::wait_process_completion::wait_once(*this);
+        }
+    }
+    else
+    {
+        // There might be pending exceptions still, so spawn a watcher for them.
+        spawn_watcher();
+    }
+}
+
+void context::wait_for_wait_process_stopped()
+{
+    auto worker = std::exchange(pending, nullptr);
+    while (worker == nullptr)
+    {
+        std::lock_guard l{lock};
+        if (wait_process_stopped)
+        {
+            break;
+        }
+
+        worker = std::exchange(staged, nullptr);
+        if (!worker)
+        {
+            std::this_thread::yield();
+        }
+    }
+    if (worker)
+    {
+        worker->stop();
+        wait_process_stopped = true;
+    }
+}
+
+void context::rethrow_pending_exception()
+{
+    {
+        std::lock_guard l{lock};
+        if (pending_exception)
+        {
+            std::rethrow_exception(std::exchange(pending_exception, {}));
+        }
+    }
+}
+
 void details::wait_process_completion::arm() noexcept
 {
     // Call process.  True indicates something was handled and we do not
@@ -237,7 +337,8 @@
         // don't have the required parameters).
         ctx.caller_wait.wait(lock, [&] {
             return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
-                   (ctx.final_stop.stop_requested());
+                   ctx.final_stop.stop_requested() ||
+                   (ctx.pending_exception != nullptr);
         });
 
         // Save the waiter as pending.
@@ -247,32 +348,11 @@
         }
     }
 
-    // If the context has been requested to be stopped, exit now instead of
-    // running the context event loop.
-    if (ctx.final_stop.stop_requested())
-    {
-        return;
-    }
-
     // Run the event loop to process one request.
-    ctx.event_loop.run_one(ctx.pending->timeout);
-
-    // If there is a stop requested, we need to stop the pending operation.
-    if (ctx.final_stop.stop_requested())
+    // If the context has been requested to be stopped, skip the event loop.
+    if (!ctx.final_stop.stop_requested() && ctx.pending)
     {
-        decltype(ctx.pending) pending = nullptr;
-
-        {
-            std::lock_guard lock{ctx.lock};
-            pending = std::exchange(ctx.pending, nullptr);
-        }
-
-        // Do the stop outside the lock to prevent potential deadlocks due to
-        // the stop handler running.
-        if (pending != nullptr)
-        {
-            pending->stop();
-        }
+        ctx.event_loop.run_one(ctx.pending->timeout);
     }
 }
 
@@ -280,27 +360,10 @@
 {
     auto self = static_cast<context*>(data);
 
-    decltype(self->pending) pending = nullptr;
-    {
-        std::lock_guard lock{self->lock};
-        pending = std::exchange(self->pending, nullptr);
-    }
-
-    // Outside the lock complete the pending operation.
-    //
-    // This can cause the Receiver task (the worker) to start executing (on
-    // this thread!), hence we do not want the lock held in order to avoid
-    // deadlocks.
+    auto pending = std::exchange(self->pending, nullptr);
     if (pending != nullptr)
     {
-        if (self->final_stop.stop_requested())
-        {
-            pending->stop();
-        }
-        else
-        {
-            pending->complete();
-        }
+        pending->complete();
     }
 
     return 0;
diff --git a/test/async/context.cpp b/test/async/context.cpp
index 34e5bb0..b60cc38 100644
--- a/test/async/context.cpp
+++ b/test/async/context.cpp
@@ -34,6 +34,24 @@
     runToStop();
 }
 
+TEST_F(Context, ReentrantRun)
+{
+    runToStop();
+    for (int i = 0; i < 100; ++i)
+    {
+        ctx->run();
+    }
+}
+
+TEST_F(Context, SpawnThrowingTask)
+{
+    ctx->spawn(std::execution::just() |
+               std::execution::then([]() { throw std::logic_error("Oops"); }));
+
+    EXPECT_THROW(runToStop(), std::logic_error);
+    ctx->run();
+}
+
 TEST_F(Context, SpawnDelayedTask)
 {
     using namespace std::literals;
@@ -99,6 +117,7 @@
                std::execution::then([&m](...) { m.reset(); }));
 
     EXPECT_THROW(runToStop(), sdbusplus::exception::UnhandledStop);
+    EXPECT_NO_THROW(ctx->run());
     EXPECT_FALSE(ran);
 }
 
@@ -127,5 +146,6 @@
                std::execution::then([&]() { m.reset(); }));
 
     EXPECT_THROW(runToStop(), sdbusplus::exception::UnhandledStop);
+    EXPECT_NO_THROW(ctx->run());
     EXPECT_FALSE(ran);
 }