async: context: allow many exceptions

Allow many spawned tasks to throw exceptions and queue them up,
re-throwing them one-by-one from `context::run` calls.  Similarly
enhance `context::scope` so that many `context::empty` completions
can complete with error.

Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I8744e0a3b8b0663ebd5ca26db0f5943688afe62a
diff --git a/src/async/context.cpp b/src/async/context.cpp
index b16cf3e..6d50301 100644
--- a/src/async/context.cpp
+++ b/src/async/context.cpp
@@ -142,10 +142,10 @@
     wait_for_wait_process_stopped();
 
     // Wait for all the internal tasks to complete.
-    std::this_thread::sync_wait(internal_tasks.empty() |
-                                execution::upon_error([&](auto&& e) {
-                                    pending_exception = std::move(e);
-                                }));
+    std::this_thread::sync_wait(
+        internal_tasks.empty() | execution::upon_error([&](auto&& e) {
+            pending_exceptions.emplace_back(std::move(e));
+        }));
 
     // Finish up the loop and join the thread.
     // (There shouldn't be anything going on by this point anyhow.)
@@ -177,7 +177,7 @@
 
         if (e)
         {
-            pending_exception = std::move(e);
+            pending_exceptions.emplace_back(std::move(e));
         }
     }
 
@@ -225,7 +225,7 @@
     // we get an exception.
     auto keep_running = [this]() {
         std::lock_guard l{lock};
-        return !final_stop.stop_requested() && !pending_exception;
+        return !final_stop.stop_requested() && pending_exceptions.empty();
     };
 
     // If we are suppose to keep running, start the run loop.
@@ -284,9 +284,11 @@
 {
     {
         std::lock_guard l{lock};
-        if (pending_exception)
+        if (!pending_exceptions.empty())
         {
-            std::rethrow_exception(std::exchange(pending_exception, {}));
+            auto e = pending_exceptions.front();
+            pending_exceptions.pop_front();
+            std::rethrow_exception(std::move(e));
         }
     }
 }
@@ -338,7 +340,7 @@
         ctx.caller_wait.wait(lock, [&] {
             return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
                    ctx.final_stop.stop_requested() ||
-                   (ctx.pending_exception != nullptr);
+                   !ctx.pending_exceptions.empty();
         });
 
         // Save the waiter as pending.