async: context: allow run re-entrance
Switch the context call so that it can be re-entrant. A single call
to `run` will either:
1. Process until the context is successfully stopped.
2. An exception is raised from a spawned task.
If an exception is raised, it could be caught outside the run call
and then run called again.
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: Icd05df421a1f0592bd2390fbe8e21a16252eafed
diff --git a/include/sdbusplus/async/context.hpp b/include/sdbusplus/async/context.hpp
index 12fc98c..c2d6b81 100644
--- a/include/sdbusplus/async/context.hpp
+++ b/include/sdbusplus/async/context.hpp
@@ -60,13 +60,12 @@
template <execution::sender_of<execution::set_value_t()> Snd>
void spawn(Snd&& sender)
{
- if (stop_requested())
- {
- throw std::logic_error(
- "sdbusplus::async::context spawn called while already stopped.");
- }
+ check_stop_requested();
+
pending_tasks.spawn(
std::move(execution::on(loop.get_scheduler(), std::move(sender))));
+
+ spawn_watcher();
}
bus_t& get_bus() noexcept
@@ -74,7 +73,10 @@
return bus;
}
- bool request_stop() noexcept;
+ bool request_stop() noexcept
+ {
+ return initial_stop.request_stop();
+ }
bool stop_requested() noexcept
{
return initial_stop.stop_requested();
@@ -106,11 +108,22 @@
std::mutex lock{};
std::condition_variable caller_wait{};
+ std::exception_ptr pending_exception{};
+ bool spawn_watcher_running = false;
+
/** Completion object to signal the worker that 'sd_bus_wait' is done. */
details::wait_process_completion* staged = nullptr;
details::wait_process_completion* pending = nullptr;
+ bool wait_process_stopped = false;
void worker_run();
+ void spawn_complete(std::exception_ptr&& = {});
+ void check_stop_requested();
+ void spawn_watcher();
+
+ void caller_run();
+ void rethrow_pending_exception();
+ void wait_for_wait_process_stopped();
static int dbus_event_handle(sd_event_source*, int, uint32_t, void*);
};
diff --git a/src/async/context.cpp b/src/async/context.cpp
index 36f5552..b16cf3e 100644
--- a/src/async/context.cpp
+++ b/src/async/context.cpp
@@ -110,6 +110,11 @@
// we need to switch to the worker thread.
co_await execution::schedule(ctx.loop.get_scheduler());
}
+
+ {
+ std::lock_guard lock{ctx.lock};
+ ctx.wait_process_stopped = true;
+ }
}
} // namespace details
@@ -123,44 +128,18 @@
}
}
-bool context::request_stop() noexcept
-{
- auto first_stop = initial_stop.request_stop();
-
- if (first_stop)
- {
- // Now that the workers have been requested to stop, we need to wait
- // until they all drain and then stop the internal tasks.
-
- auto complete = [this]() {
- final_stop.request_stop();
- caller_wait.notify_one();
- event_loop.break_run();
- };
-
- internal_tasks.spawn(pending_tasks.empty() | execution::then(complete) |
- execution::upon_error([=](auto&& e) {
- complete();
- std::rethrow_exception(e);
- }));
- }
-
- return first_stop;
-}
-
void context::run()
{
- // Start up the worker thread.
- worker_thread = std::thread{[this]() { worker_run(); }};
+ // Run the primary portion of the run-loop.
+ caller_run();
- // Run until the context requested to stop.
- while (!final_stop.stop_requested())
- {
- // Handle waiting on all the sd-events.
- details::wait_process_completion::wait_once(*this);
- }
+ // Rethrow the pending exception (if it exists).
+ rethrow_pending_exception();
- std::exception_ptr pending_exception{};
+ // Otherwise this should be final_stop...
+
+ // We need to wait for the pending wait process and stop it.
+ wait_for_wait_process_stopped();
// Wait for all the internal tasks to complete.
std::this_thread::sync_wait(internal_tasks.empty() |
@@ -168,29 +147,150 @@
pending_exception = std::move(e);
}));
- // Stop has been requested, so finish up the loop.
+ // Finish up the loop and join the thread.
+ // (There shouldn't be anything going on by this point anyhow.)
loop.finish();
if (worker_thread.joinable())
{
worker_thread.join();
}
- // If there was an exception inside the context, rethrow it.
- if (pending_exception)
- {
- std::rethrow_exception(std::move(pending_exception));
- }
+ // Check for one last exception.
+ rethrow_pending_exception();
}
void context::worker_run()
{
// Start the sdbus 'wait/process' loop; treat it as an internal task.
- internal_tasks.spawn(details::wait_process_completion::loop(*this));
+ internal_tasks.spawn(details::wait_process_completion::loop(*this) |
+ execution::upon_stopped([]() {}));
// Run the execution::run_loop to handle all the tasks.
loop.run();
}
+void context::spawn_complete(std::exception_ptr&& e)
+{
+ {
+ std::lock_guard l{lock};
+ spawn_watcher_running = false;
+
+ if (e)
+ {
+ pending_exception = std::move(e);
+ }
+ }
+
+ if (stop_requested())
+ {
+ final_stop.request_stop();
+ }
+
+ caller_wait.notify_one();
+ event_loop.break_run();
+}
+
+void context::check_stop_requested()
+{
+ if (stop_requested())
+ {
+ throw std::logic_error(
+ "sdbusplus::async::context spawn called while already stopped.");
+ }
+}
+
+void context::spawn_watcher()
+{
+ {
+ std::lock_guard l{lock};
+ if (spawn_watcher_running)
+ {
+ return;
+ }
+
+ spawn_watcher_running = true;
+ }
+
+ // Spawn the watch for completion / exceptions.
+ internal_tasks.spawn(pending_tasks.empty() |
+ execution::then([this]() { spawn_complete(); }) |
+ execution::upon_error([this](auto&& e) {
+ spawn_complete(std::move(e));
+ }));
+}
+
+void context::caller_run()
+{
+ // We are able to run the loop until the context is requested to stop or
+ // we get an exception.
+ auto keep_running = [this]() {
+ std::lock_guard l{lock};
+ return !final_stop.stop_requested() && !pending_exception;
+ };
+
+ // If we are suppose to keep running, start the run loop.
+ if (keep_running())
+ {
+ // Start up the worker thread.
+ if (!worker_thread.joinable())
+ {
+ worker_thread = std::thread{[this]() { worker_run(); }};
+ }
+ else
+ {
+ // We've already been running and there might an exception or
+ // completion pending. Spawn a new watcher that checks for these.
+ spawn_watcher();
+ }
+
+ while (keep_running())
+ {
+ // Handle waiting on all the sd-events.
+ details::wait_process_completion::wait_once(*this);
+ }
+ }
+ else
+ {
+ // There might be pending exceptions still, so spawn a watcher for them.
+ spawn_watcher();
+ }
+}
+
+void context::wait_for_wait_process_stopped()
+{
+ auto worker = std::exchange(pending, nullptr);
+ while (worker == nullptr)
+ {
+ std::lock_guard l{lock};
+ if (wait_process_stopped)
+ {
+ break;
+ }
+
+ worker = std::exchange(staged, nullptr);
+ if (!worker)
+ {
+ std::this_thread::yield();
+ }
+ }
+ if (worker)
+ {
+ worker->stop();
+ wait_process_stopped = true;
+ }
+}
+
+void context::rethrow_pending_exception()
+{
+ {
+ std::lock_guard l{lock};
+ if (pending_exception)
+ {
+ std::rethrow_exception(std::exchange(pending_exception, {}));
+ }
+ }
+}
+
void details::wait_process_completion::arm() noexcept
{
// Call process. True indicates something was handled and we do not
@@ -237,7 +337,8 @@
// don't have the required parameters).
ctx.caller_wait.wait(lock, [&] {
return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
- (ctx.final_stop.stop_requested());
+ ctx.final_stop.stop_requested() ||
+ (ctx.pending_exception != nullptr);
});
// Save the waiter as pending.
@@ -247,32 +348,11 @@
}
}
- // If the context has been requested to be stopped, exit now instead of
- // running the context event loop.
- if (ctx.final_stop.stop_requested())
- {
- return;
- }
-
// Run the event loop to process one request.
- ctx.event_loop.run_one(ctx.pending->timeout);
-
- // If there is a stop requested, we need to stop the pending operation.
- if (ctx.final_stop.stop_requested())
+ // If the context has been requested to be stopped, skip the event loop.
+ if (!ctx.final_stop.stop_requested() && ctx.pending)
{
- decltype(ctx.pending) pending = nullptr;
-
- {
- std::lock_guard lock{ctx.lock};
- pending = std::exchange(ctx.pending, nullptr);
- }
-
- // Do the stop outside the lock to prevent potential deadlocks due to
- // the stop handler running.
- if (pending != nullptr)
- {
- pending->stop();
- }
+ ctx.event_loop.run_one(ctx.pending->timeout);
}
}
@@ -280,27 +360,10 @@
{
auto self = static_cast<context*>(data);
- decltype(self->pending) pending = nullptr;
- {
- std::lock_guard lock{self->lock};
- pending = std::exchange(self->pending, nullptr);
- }
-
- // Outside the lock complete the pending operation.
- //
- // This can cause the Receiver task (the worker) to start executing (on
- // this thread!), hence we do not want the lock held in order to avoid
- // deadlocks.
+ auto pending = std::exchange(self->pending, nullptr);
if (pending != nullptr)
{
- if (self->final_stop.stop_requested())
- {
- pending->stop();
- }
- else
- {
- pending->complete();
- }
+ pending->complete();
}
return 0;
diff --git a/test/async/context.cpp b/test/async/context.cpp
index 34e5bb0..b60cc38 100644
--- a/test/async/context.cpp
+++ b/test/async/context.cpp
@@ -34,6 +34,24 @@
runToStop();
}
+TEST_F(Context, ReentrantRun)
+{
+ runToStop();
+ for (int i = 0; i < 100; ++i)
+ {
+ ctx->run();
+ }
+}
+
+TEST_F(Context, SpawnThrowingTask)
+{
+ ctx->spawn(std::execution::just() |
+ std::execution::then([]() { throw std::logic_error("Oops"); }));
+
+ EXPECT_THROW(runToStop(), std::logic_error);
+ ctx->run();
+}
+
TEST_F(Context, SpawnDelayedTask)
{
using namespace std::literals;
@@ -99,6 +117,7 @@
std::execution::then([&m](...) { m.reset(); }));
EXPECT_THROW(runToStop(), sdbusplus::exception::UnhandledStop);
+ EXPECT_NO_THROW(ctx->run());
EXPECT_FALSE(ran);
}
@@ -127,5 +146,6 @@
std::execution::then([&]() { m.reset(); }));
EXPECT_THROW(runToStop(), sdbusplus::exception::UnhandledStop);
+ EXPECT_NO_THROW(ctx->run());
EXPECT_FALSE(ran);
}