async: context: allow many exceptions

Allow many spawned tasks to throw exceptions and queue them up,
re-throwing them one-by-one from `context::run` calls.  Similarly
enhance `context::scope` so that many `context::empty` completions
can complete with error.

Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I8744e0a3b8b0663ebd5ca26db0f5943688afe62a
diff --git a/include/sdbusplus/async/context.hpp b/include/sdbusplus/async/context.hpp
index c2d6b81..552d74a 100644
--- a/include/sdbusplus/async/context.hpp
+++ b/include/sdbusplus/async/context.hpp
@@ -7,6 +7,7 @@
 #include <sdbusplus/event.hpp>
 
 #include <condition_variable>
+#include <deque>
 #include <mutex>
 #include <stop_token>
 #include <thread>
@@ -108,7 +109,7 @@
     std::mutex lock{};
     std::condition_variable caller_wait{};
 
-    std::exception_ptr pending_exception{};
+    std::deque<std::exception_ptr> pending_exceptions = {};
     bool spawn_watcher_running = false;
 
     /** Completion object to signal the worker that 'sd_bus_wait' is done. */
diff --git a/include/sdbusplus/async/scope.hpp b/include/sdbusplus/async/scope.hpp
index e220cfb..4fc5bdb 100644
--- a/include/sdbusplus/async/scope.hpp
+++ b/include/sdbusplus/async/scope.hpp
@@ -3,6 +3,7 @@
 #include <sdbusplus/async/execution.hpp>
 #include <sdbusplus/exception.hpp>
 
+#include <deque>
 #include <mutex>
 
 namespace sdbusplus::async
@@ -61,7 +62,7 @@
     std::mutex lock{};
     bool started = false;
     size_t pending_count = 0;
-    std::exception_ptr pending_exception = {};
+    std::deque<std::exception_ptr> pending_exceptions = {};
     scope_ns::scope_completion* pending = nullptr;
 };
 
diff --git a/src/async/context.cpp b/src/async/context.cpp
index b16cf3e..6d50301 100644
--- a/src/async/context.cpp
+++ b/src/async/context.cpp
@@ -142,10 +142,10 @@
     wait_for_wait_process_stopped();
 
     // Wait for all the internal tasks to complete.
-    std::this_thread::sync_wait(internal_tasks.empty() |
-                                execution::upon_error([&](auto&& e) {
-                                    pending_exception = std::move(e);
-                                }));
+    std::this_thread::sync_wait(
+        internal_tasks.empty() | execution::upon_error([&](auto&& e) {
+            pending_exceptions.emplace_back(std::move(e));
+        }));
 
     // Finish up the loop and join the thread.
     // (There shouldn't be anything going on by this point anyhow.)
@@ -177,7 +177,7 @@
 
         if (e)
         {
-            pending_exception = std::move(e);
+            pending_exceptions.emplace_back(std::move(e));
         }
     }
 
@@ -225,7 +225,7 @@
     // we get an exception.
     auto keep_running = [this]() {
         std::lock_guard l{lock};
-        return !final_stop.stop_requested() && !pending_exception;
+        return !final_stop.stop_requested() && pending_exceptions.empty();
     };
 
     // If we are suppose to keep running, start the run loop.
@@ -284,9 +284,11 @@
 {
     {
         std::lock_guard l{lock};
-        if (pending_exception)
+        if (!pending_exceptions.empty())
         {
-            std::rethrow_exception(std::exchange(pending_exception, {}));
+            auto e = pending_exceptions.front();
+            pending_exceptions.pop_front();
+            std::rethrow_exception(std::move(e));
         }
     }
 }
@@ -338,7 +340,7 @@
         ctx.caller_wait.wait(lock, [&] {
             return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
                    ctx.final_stop.stop_requested() ||
-                   (ctx.pending_exception != nullptr);
+                   !ctx.pending_exceptions.empty();
         });
 
         // Save the waiter as pending.
diff --git a/src/async/scope.cpp b/src/async/scope.cpp
index a990e41..d9eb8a7 100644
--- a/src/async/scope.cpp
+++ b/src/async/scope.cpp
@@ -28,36 +28,27 @@
         std::lock_guard l{lock};
         --pending_count; // decrement count.
 
-        if (e && pending_exception)
+        if (e)
         {
-            // Received a second exception without delivering the first
-            // to a pending completion.  Terminate using the first one.
-            try
-            {
-                std::rethrow_exception(std::exchange(pending_exception, {}));
-            }
-            catch (...)
-            {
-                std::terminate();
-            }
+            pending_exceptions.emplace_back(std::exchange(e, {}));
         }
 
         // If the scope is complete, get the pending completion, if it exists.
-        if (e || (pending_count == 0))
+        if (!pending_exceptions.empty() || (pending_count == 0))
         {
             p = std::exchange(pending, nullptr);
         }
-
-        // If we have an exception but no pending completion, save it away.
-        if (e && !p)
-        {
-            pending_exception = std::move(e);
-        }
     }
 
     if (p)
     {
-        p->complete(std::move(e));
+        std::exception_ptr fwd_exception = {};
+        if (!pending_exceptions.empty())
+        {
+            fwd_exception = pending_exceptions.front();
+            pending_exceptions.pop_front();
+        }
+        p->complete(std::move(fwd_exception));
     }
 }
 
@@ -79,13 +70,14 @@
         {
             done = false;
         }
-        else if (s.pending_count == 0)
+        else if (!s.pending_exceptions.empty())
         {
+            e = s.pending_exceptions.front();
+            s.pending_exceptions.pop_front();
             done = true;
         }
-        else if (s.pending_exception)
+        else if (s.pending_count == 0)
         {
-            e = std::exchange(s.pending_exception, {});
             done = true;
         }
         else
diff --git a/test/async/context.cpp b/test/async/context.cpp
index b60cc38..afae280 100644
--- a/test/async/context.cpp
+++ b/test/async/context.cpp
@@ -13,10 +13,15 @@
         ctx.reset();
     }
 
-    void runToStop()
+    void spawnStop()
     {
         ctx->spawn(std::execution::just() |
                    std::execution::then([this]() { ctx->request_stop(); }));
+    }
+
+    void runToStop()
+    {
+        spawnStop();
         ctx->run();
     }
 
@@ -52,6 +57,24 @@
     ctx->run();
 }
 
+TEST_F(Context, SpawnManyThrowingTasks)
+{
+    static constexpr size_t count = 100;
+    for (size_t i = 0; i < count; ++i)
+    {
+        ctx->spawn(std::execution::just() | std::execution::then([]() {
+                       throw std::logic_error("Oops");
+                   }));
+    }
+    spawnStop();
+
+    for (size_t i = 0; i < count; ++i)
+    {
+        EXPECT_THROW(ctx->run(), std::logic_error);
+    }
+    ctx->run();
+}
+
 TEST_F(Context, SpawnDelayedTask)
 {
     using namespace std::literals;