async: handle exceptions and stops

Add support in `context` and `scope` to handle exceptions and stop
conditions:

  * When an unhandled_stop occurs, turn it into an UnhandledStop
    exception.
  * When a `scope`-spawned task throws an exception, save it and cause
    `set_error` on the `scope::empty()`'s Receiver.
  * When anything in the `context` completes with `set_error` propagate
    that out to the caller of `context::run`.
  * If multiple exceptions occur within a `scope`, terminate.

Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I25285f7ece5c0675864489bbe1345fa8e7afa70c
diff --git a/include/sdbusplus/async/context.hpp b/include/sdbusplus/async/context.hpp
index 093fe79..36671d5 100644
--- a/include/sdbusplus/async/context.hpp
+++ b/include/sdbusplus/async/context.hpp
@@ -28,8 +28,6 @@
  *  both the startup routine and any further asynchronous operations until
  *  the context is stopped.
  *
- *  TODO: Stopping is not currently supported.
- *
  *  The context has two threads:
  *      - The thread which called `run`, often from `main`, and named the
  *        `caller`.
diff --git a/include/sdbusplus/async/scope.hpp b/include/sdbusplus/async/scope.hpp
index 7962bea..afd4fb0 100644
--- a/include/sdbusplus/async/scope.hpp
+++ b/include/sdbusplus/async/scope.hpp
@@ -1,6 +1,7 @@
 #pragma once
 
 #include <sdbusplus/async/execution.hpp>
+#include <sdbusplus/exception.hpp>
 
 #include <mutex>
 
@@ -55,10 +56,11 @@
 
   private:
     void started_task() noexcept;
-    void ended_task() noexcept;
+    void ended_task(std::exception_ptr&&) noexcept;
 
     std::mutex lock{};
     size_t pending_count = 0;
+    std::exception_ptr pending_exception = {};
     scope_ns::scope_completion* pending = nullptr;
 };
 
@@ -80,18 +82,15 @@
     }
 
     friend void tag_invoke(execution::set_error_t, scope_receiver&& self,
-                           auto&&) noexcept
+                           std::exception_ptr&& e) noexcept
     {
-        std::terminate(); // TODO: save the exception back into the scope.
-        self.complete();
+        self.complete(std::move(e));
     }
 
     friend void tag_invoke(execution::set_stopped_t,
                            scope_receiver&& self) noexcept
     {
-        // std::terminate(); // TODO: this implies a child had an
-        // unhandled_stop.  Need to turn it into an exception.
-        self.complete();
+        self.complete(std::make_exception_ptr(exception::UnhandledStop{}));
     }
 
     friend decltype(auto) tag_invoke(execution::get_env_t,
@@ -100,7 +99,7 @@
         return self;
     }
 
-    void complete() noexcept;
+    void complete(std::exception_ptr&& = {}) noexcept;
 
     void* op_state;
     scope* s = nullptr;
@@ -115,7 +114,7 @@
 };
 
 template <execution::sender Sender>
-void scope_receiver<Sender>::complete() noexcept
+void scope_receiver<Sender>::complete(std::exception_ptr&& e) noexcept
 {
     // The Sender is complete, so we need to clean up the saved operational
     // state.
@@ -127,7 +126,7 @@
     delete static_cast<scope_ns::scope_operation_state<Sender>*>(op_state);
 
     // Inform the scope that a task has completed.
-    owning_scope->ended_task();
+    owning_scope->ended_task(std::move(e));
 }
 
 // Virtual class to handle the scope completions.
@@ -147,7 +146,7 @@
     }
 
   private:
-    virtual void complete() noexcept = 0;
+    virtual void complete(std::exception_ptr&&) noexcept = 0;
     void arm() noexcept;
 
     scope& s;
@@ -162,9 +161,16 @@
     {}
 
   private:
-    void complete() noexcept override final
+    void complete(std::exception_ptr&& e) noexcept override final
     {
-        execution::set_value(std::move(receiver));
+        if (e)
+        {
+            execution::set_error(std::move(receiver), std::move(e));
+        }
+        else
+        {
+            execution::set_value(std::move(receiver));
+        }
     }
 
     Reciever receiver;
diff --git a/include/sdbusplus/exception.hpp b/include/sdbusplus/exception.hpp
index d7eb03f..b7d7d50 100644
--- a/include/sdbusplus/exception.hpp
+++ b/include/sdbusplus/exception.hpp
@@ -129,6 +129,23 @@
     const std::string errWhatDetailed;
 };
 
+class UnhandledStop final : public internal_exception
+{
+  public:
+    static constexpr auto errName =
+        "xyz.openbmc_project.sdbusplus.Error.UnhandledStop";
+    static constexpr auto errDesc =
+        "An async Sender failed to handle a stop condition.";
+    static constexpr auto errWhat =
+        "xyz.openbmc_project.sdbusplus.Error.UnhandledStop: "
+        "An async Sender failed to handle a stop condition.";
+
+    const char* name() const noexcept override;
+    const char* description() const noexcept override;
+    const char* what() const noexcept override;
+    int get_errno() const noexcept override;
+};
+
 } // namespace exception
 
 using exception_t = exception::exception;
diff --git a/src/async/context.cpp b/src/async/context.cpp
index a5ed091..4807a6a 100644
--- a/src/async/context.cpp
+++ b/src/async/context.cpp
@@ -131,10 +131,17 @@
     {
         // Now that the workers have been requested to stop, we need to wait
         // until they all drain and then stop the internal tasks.
-        internal_tasks.spawn(pending_tasks.empty() | execution::then([this]() {
-                                 final_stop.request_stop();
-                                 caller_wait.notify_one();
-                                 event_loop.break_run();
+
+        auto complete = [this]() {
+            final_stop.request_stop();
+            caller_wait.notify_one();
+            event_loop.break_run();
+        };
+
+        internal_tasks.spawn(pending_tasks.empty() | execution::then(complete) |
+                             execution::upon_error([=](auto&& e) {
+                                 complete();
+                                 std::rethrow_exception(e);
                              }));
     }
 
@@ -155,8 +162,13 @@
         details::wait_process_completion::wait_once(*this);
     }
 
+    std::exception_ptr pending_exception{};
+
     // Wait for all the internal tasks to complete.
-    std::this_thread::sync_wait(internal_tasks.empty());
+    std::this_thread::sync_wait(internal_tasks.empty() |
+                                execution::upon_error([&](auto&& e) {
+                                    pending_exception = std::move(e);
+                                }));
 
     // Stop has been requested, so finish up the loop.
     loop.finish();
@@ -164,6 +176,12 @@
     {
         worker_thread.join();
     }
+
+    // If there was an exception inside the context, rethrow it.
+    if (pending_exception)
+    {
+        std::rethrow_exception(std::move(pending_exception));
+    }
 }
 
 void context::worker_run(task<> startup)
diff --git a/src/async/scope.cpp b/src/async/scope.cpp
index 908def3..353c52b 100644
--- a/src/async/scope.cpp
+++ b/src/async/scope.cpp
@@ -19,23 +19,44 @@
     ++pending_count;
 }
 
-void scope::ended_task() noexcept
+void scope::ended_task(std::exception_ptr&& e) noexcept
 {
     scope_ns::scope_completion* p = nullptr;
 
     {
         std::lock_guard l{lock};
-        --pending_count;
+        --pending_count; // decrement count.
 
-        if (pending_count == 0)
+        if (e && pending_exception)
+        {
+            // Received a second exception without delivering the first
+            // to a pending completion.  Terminate using the first one.
+            try
+            {
+                std::rethrow_exception(std::exchange(pending_exception, {}));
+            }
+            catch (...)
+            {
+                std::terminate();
+            }
+        }
+
+        // If the scope is complete, get the pending completion, if it exists.
+        if (e || (pending_count == 0))
         {
             p = std::exchange(pending, nullptr);
         }
+
+        // If we have an exception but no pending completion, save it away.
+        if (e && !p)
+        {
+            pending_exception = std::move(e);
+        }
     }
 
     if (p)
     {
-        p->complete();
+        p->complete(std::move(e));
     }
 }
 
@@ -49,6 +70,7 @@
 void scope_completion::arm() noexcept
 {
     bool done = false;
+    std::exception_ptr e{};
 
     {
         std::lock_guard l{s.lock};
@@ -56,6 +78,11 @@
         {
             done = true;
         }
+        else if (s.pending_exception)
+        {
+            e = std::exchange(s.pending_exception, {});
+            done = true;
+        }
         else
         {
             s.pending = this;
@@ -64,7 +91,7 @@
 
     if (done)
     {
-        this->complete();
+        this->complete(std::move(e));
     }
 }
 } // namespace scope_ns
diff --git a/src/exception.cpp b/src/exception.cpp
index 1407147..05647aa 100644
--- a/src/exception.cpp
+++ b/src/exception.cpp
@@ -178,6 +178,26 @@
     return EINVAL;
 }
 
+const char* UnhandledStop::name() const noexcept
+{
+    return errName;
+}
+
+const char* UnhandledStop::description() const noexcept
+{
+    return errDesc;
+}
+
+const char* UnhandledStop::what() const noexcept
+{
+    return errWhat;
+}
+
+int UnhandledStop::get_errno() const noexcept
+{
+    return ECANCELED;
+}
+
 } // namespace exception
 } // namespace sdbusplus
 
diff --git a/test/async/context.cpp b/test/async/context.cpp
index fe273d1..9dea016 100644
--- a/test/async/context.cpp
+++ b/test/async/context.cpp
@@ -69,8 +69,7 @@
     ctx->spawn(sdbusplus::async::sleep_for(*ctx, 1ms) |
                std::execution::then([&m](...) { m.reset(); }));
 
-    runToStop();
-
+    EXPECT_THROW(runToStop(), sdbusplus::exception::UnhandledStop);
     EXPECT_FALSE(ran);
 }
 
@@ -98,7 +97,6 @@
     ctx->spawn(sdbusplus::async::sleep_for(*ctx, 1ms) |
                std::execution::then([&]() { m.reset(); }));
 
-    runToStop();
-
+    EXPECT_THROW(runToStop(), sdbusplus::exception::UnhandledStop);
     EXPECT_FALSE(ran);
 }