async: handle exceptions and stops
Add support in `context` and `scope` to handle exceptions and stop
conditions:
* When an unhandled_stop occurs, turn it into an UnhandledStop
exception.
* When a `scope`-spawned task throws an exception, save it and cause
`set_error` on the `scope::empty()`'s Receiver.
* When anything in the `context` completes with `set_error` propagate
that out to the caller of `context::run`.
* If multiple exceptions occur within a `scope`, terminate.
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I25285f7ece5c0675864489bbe1345fa8e7afa70c
diff --git a/src/async/context.cpp b/src/async/context.cpp
index a5ed091..4807a6a 100644
--- a/src/async/context.cpp
+++ b/src/async/context.cpp
@@ -131,10 +131,17 @@
{
// Now that the workers have been requested to stop, we need to wait
// until they all drain and then stop the internal tasks.
- internal_tasks.spawn(pending_tasks.empty() | execution::then([this]() {
- final_stop.request_stop();
- caller_wait.notify_one();
- event_loop.break_run();
+
+ auto complete = [this]() {
+ final_stop.request_stop();
+ caller_wait.notify_one();
+ event_loop.break_run();
+ };
+
+ internal_tasks.spawn(pending_tasks.empty() | execution::then(complete) |
+ execution::upon_error([=](auto&& e) {
+ complete();
+ std::rethrow_exception(e);
}));
}
@@ -155,8 +162,13 @@
details::wait_process_completion::wait_once(*this);
}
+ std::exception_ptr pending_exception{};
+
// Wait for all the internal tasks to complete.
- std::this_thread::sync_wait(internal_tasks.empty());
+ std::this_thread::sync_wait(internal_tasks.empty() |
+ execution::upon_error([&](auto&& e) {
+ pending_exception = std::move(e);
+ }));
// Stop has been requested, so finish up the loop.
loop.finish();
@@ -164,6 +176,12 @@
{
worker_thread.join();
}
+
+ // If there was an exception inside the context, rethrow it.
+ if (pending_exception)
+ {
+ std::rethrow_exception(std::move(pending_exception));
+ }
}
void context::worker_run(task<> startup)
diff --git a/src/async/scope.cpp b/src/async/scope.cpp
index 908def3..353c52b 100644
--- a/src/async/scope.cpp
+++ b/src/async/scope.cpp
@@ -19,23 +19,44 @@
++pending_count;
}
-void scope::ended_task() noexcept
+void scope::ended_task(std::exception_ptr&& e) noexcept
{
scope_ns::scope_completion* p = nullptr;
{
std::lock_guard l{lock};
- --pending_count;
+ --pending_count; // decrement count.
- if (pending_count == 0)
+ if (e && pending_exception)
+ {
+ // Received a second exception without delivering the first
+ // to a pending completion. Terminate using the first one.
+ try
+ {
+ std::rethrow_exception(std::exchange(pending_exception, {}));
+ }
+ catch (...)
+ {
+ std::terminate();
+ }
+ }
+
+ // If the scope is complete, get the pending completion, if it exists.
+ if (e || (pending_count == 0))
{
p = std::exchange(pending, nullptr);
}
+
+ // If we have an exception but no pending completion, save it away.
+ if (e && !p)
+ {
+ pending_exception = std::move(e);
+ }
}
if (p)
{
- p->complete();
+ p->complete(std::move(e));
}
}
@@ -49,6 +70,7 @@
void scope_completion::arm() noexcept
{
bool done = false;
+ std::exception_ptr e{};
{
std::lock_guard l{s.lock};
@@ -56,6 +78,11 @@
{
done = true;
}
+ else if (s.pending_exception)
+ {
+ e = std::exchange(s.pending_exception, {});
+ done = true;
+ }
else
{
s.pending = this;
@@ -64,7 +91,7 @@
if (done)
{
- this->complete();
+ this->complete(std::move(e));
}
}
} // namespace scope_ns