async: context: allow many exceptions
Allow many spawned tasks to throw exceptions and queue them up,
re-throwing them one-by-one from `context::run` calls. Similarly
enhance `context::scope` so that many `context::empty` completions
can complete with error.
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I8744e0a3b8b0663ebd5ca26db0f5943688afe62a
diff --git a/src/async/context.cpp b/src/async/context.cpp
index b16cf3e..6d50301 100644
--- a/src/async/context.cpp
+++ b/src/async/context.cpp
@@ -142,10 +142,10 @@
wait_for_wait_process_stopped();
// Wait for all the internal tasks to complete.
- std::this_thread::sync_wait(internal_tasks.empty() |
- execution::upon_error([&](auto&& e) {
- pending_exception = std::move(e);
- }));
+ std::this_thread::sync_wait(
+ internal_tasks.empty() | execution::upon_error([&](auto&& e) {
+ pending_exceptions.emplace_back(std::move(e));
+ }));
// Finish up the loop and join the thread.
// (There shouldn't be anything going on by this point anyhow.)
@@ -177,7 +177,7 @@
if (e)
{
- pending_exception = std::move(e);
+ pending_exceptions.emplace_back(std::move(e));
}
}
@@ -225,7 +225,7 @@
// we get an exception.
auto keep_running = [this]() {
std::lock_guard l{lock};
- return !final_stop.stop_requested() && !pending_exception;
+ return !final_stop.stop_requested() && pending_exceptions.empty();
};
// If we are suppose to keep running, start the run loop.
@@ -284,9 +284,11 @@
{
{
std::lock_guard l{lock};
- if (pending_exception)
+ if (!pending_exceptions.empty())
{
- std::rethrow_exception(std::exchange(pending_exception, {}));
+ auto e = pending_exceptions.front();
+ pending_exceptions.pop_front();
+ std::rethrow_exception(std::move(e));
}
}
}
@@ -338,7 +340,7 @@
ctx.caller_wait.wait(lock, [&] {
return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
ctx.final_stop.stop_requested() ||
- (ctx.pending_exception != nullptr);
+ !ctx.pending_exceptions.empty();
});
// Save the waiter as pending.
diff --git a/src/async/scope.cpp b/src/async/scope.cpp
index a990e41..d9eb8a7 100644
--- a/src/async/scope.cpp
+++ b/src/async/scope.cpp
@@ -28,36 +28,27 @@
std::lock_guard l{lock};
--pending_count; // decrement count.
- if (e && pending_exception)
+ if (e)
{
- // Received a second exception without delivering the first
- // to a pending completion. Terminate using the first one.
- try
- {
- std::rethrow_exception(std::exchange(pending_exception, {}));
- }
- catch (...)
- {
- std::terminate();
- }
+ pending_exceptions.emplace_back(std::exchange(e, {}));
}
// If the scope is complete, get the pending completion, if it exists.
- if (e || (pending_count == 0))
+ if (!pending_exceptions.empty() || (pending_count == 0))
{
p = std::exchange(pending, nullptr);
}
-
- // If we have an exception but no pending completion, save it away.
- if (e && !p)
- {
- pending_exception = std::move(e);
- }
}
if (p)
{
- p->complete(std::move(e));
+ std::exception_ptr fwd_exception = {};
+ if (!pending_exceptions.empty())
+ {
+ fwd_exception = pending_exceptions.front();
+ pending_exceptions.pop_front();
+ }
+ p->complete(std::move(fwd_exception));
}
}
@@ -79,13 +70,14 @@
{
done = false;
}
- else if (s.pending_count == 0)
+ else if (!s.pending_exceptions.empty())
{
+ e = s.pending_exceptions.front();
+ s.pending_exceptions.pop_front();
done = true;
}
- else if (s.pending_exception)
+ else if (s.pending_count == 0)
{
- e = std::exchange(s.pending_exception, {});
done = true;
}
else