async: context: handle shutdown better
When the context has been requested to stop we don't actually want
to stop it until all work has completed. Work that is pending
could require the dbus/event to function properly so we need to keep
that running until that time. If we shutdown those too early, we
can end up with a spawned task which never completes because it is
relying on an event which is never processed.
In order to keep all of these straight, split the scope/stop_tokens
into two: pending and internal. Pending tasks are for those 'spawned'
due to the user requests and internal tasks are those maintained by the
context itself to aid in processing. Defer shutting down the
dbus/event handlers until all 'pending' work is completed.
Add a test-case which use to hang and now completes.
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I399d065ae15b24ad3971c526c27b73757af14b34
diff --git a/src/async/context.cpp b/src/async/context.cpp
index 60d9e56..a5ed091 100644
--- a/src/async/context.cpp
+++ b/src/async/context.cpp
@@ -101,7 +101,7 @@
task<> wait_process_completion::loop(context& ctx)
{
- while (!ctx.stop_requested())
+ while (!ctx.final_stop.stop_requested())
{
// Handle the next sdbus event.
co_await wait_process_sender(ctx);
@@ -125,12 +125,17 @@
bool context::request_stop() noexcept
{
- auto first_stop = stop.request_stop();
+ auto first_stop = initial_stop.request_stop();
if (first_stop)
{
- caller_wait.notify_one();
- event_loop.break_run();
+ // Now that the workers have been requested to stop, we need to wait
+ // until they all drain and then stop the internal tasks.
+ internal_tasks.spawn(pending_tasks.empty() | execution::then([this]() {
+ final_stop.request_stop();
+ caller_wait.notify_one();
+ event_loop.break_run();
+ }));
}
return first_stop;
@@ -144,14 +149,14 @@
}};
// Run until the context requested to stop.
- while (!stop_requested())
+ while (!final_stop.stop_requested())
{
// Handle waiting on all the sd-events.
details::wait_process_completion::wait_once(*this);
}
- // Wait for all the tasks to complete.
- std::this_thread::sync_wait(pending_tasks.empty());
+ // Wait for all the internal tasks to complete.
+ std::this_thread::sync_wait(internal_tasks.empty());
// Stop has been requested, so finish up the loop.
loop.finish();
@@ -169,8 +174,8 @@
// implemented yet, so we don't have a lot of other options.
spawn(std::move(startup));
- // Also start up the sdbus 'wait/process' loop.
- spawn(details::wait_process_completion::loop(*this));
+ // Also start the sdbus 'wait/process' loop; treat it as an internal task.
+ internal_tasks.spawn(details::wait_process_completion::loop(*this));
// Run the execution::run_loop to handle all the tasks.
loop.run();
@@ -222,7 +227,7 @@
// don't have the required parameters).
ctx.caller_wait.wait(lock, [&] {
return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
- (ctx.stop_requested());
+ (ctx.final_stop.stop_requested());
});
// Save the waiter as pending.
@@ -234,7 +239,7 @@
// If the context has been requested to be stopped, exit now instead of
// running the context event loop.
- if (ctx.stop_requested())
+ if (ctx.final_stop.stop_requested())
{
return;
}
@@ -243,7 +248,7 @@
ctx.event_loop.run_one(ctx.pending->timeout);
// If there is a stop requested, we need to stop the pending operation.
- if (ctx.stop_requested())
+ if (ctx.final_stop.stop_requested())
{
decltype(ctx.pending) pending = nullptr;
@@ -278,7 +283,7 @@
// deadlocks.
if (pending != nullptr)
{
- if (self->stop_requested())
+ if (self->final_stop.stop_requested())
{
pending->stop();
}