async: context: support stopping the context
Up until this point, the async::context would run forever and have
no way of exiting the process, which isn't very useful. Add some
support for stop conditions so that we can ask the context to stop
and cleanly exit.
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: Ia5967162fb103d4b4b0490d8cbbef12bdb361bac
diff --git a/example/coroutine-example.cpp b/example/coroutine-example.cpp
index 2013771..e3e8bec 100644
--- a/example/coroutine-example.cpp
+++ b/example/coroutine-example.cpp
@@ -89,6 +89,9 @@
}
};
+ // We are all done, so shutdown the server.
+ ctx.request_stop();
+
co_return;
}
diff --git a/include/sdbusplus/async/context.hpp b/include/sdbusplus/async/context.hpp
index f588c70..4bd2384 100644
--- a/include/sdbusplus/async/context.hpp
+++ b/include/sdbusplus/async/context.hpp
@@ -3,9 +3,11 @@
#include <sdbusplus/async/execution.hpp>
#include <sdbusplus/async/task.hpp>
#include <sdbusplus/bus.hpp>
+#include <sdbusplus/event.hpp>
#include <condition_variable>
#include <mutex>
+#include <stop_token>
#include <thread>
namespace sdbusplus::async
@@ -39,7 +41,7 @@
class context : public bus::details::bus_friend
{
public:
- explicit context(bus_t&& bus = bus::new_default()) : bus(std::move(bus)) {}
+ explicit context(bus_t&& bus = bus::new_default());
context(context&&) = delete;
context(const context&) = delete;
@@ -60,25 +62,38 @@
return bus;
}
+ bool request_stop() noexcept;
+ bool stop_requested() noexcept
+ {
+ return stop.stop_requested();
+ }
+
friend struct details::wait_process_completion;
private:
bus_t bus;
+ event_source_t dbus_source;
+ event_t event_loop{};
/** The async run-loop from std::execution. */
execution::run_loop loop{};
/** The worker thread to handle async tasks. */
std::thread worker_thread{};
+ /** Stop source */
+ std::stop_source stop{};
// Lock and condition variable to signal `caller`.
std::mutex lock{};
std::condition_variable caller_wait{};
/** Completion object to signal the worker that 'sd_bus_wait' is done. */
- details::wait_process_completion* complete = nullptr;
+ details::wait_process_completion* staged = nullptr;
+ details::wait_process_completion* pending = nullptr;
void worker_run(task<> startup);
void caller_run(task<> startup);
+
+ static int dbus_event_handle(sd_event_source*, int, uint32_t, void*);
};
template <execution::sender_of<execution::set_value_t()> Snd>
diff --git a/src/async/context.cpp b/src/async/context.cpp
index 9565025..1dc9906 100644
--- a/src/async/context.cpp
+++ b/src/async/context.cpp
@@ -1,11 +1,18 @@
-#include <poll.h>
#include <systemd/sd-bus.h>
#include <sdbusplus/async/context.hpp>
+#include <chrono>
+
namespace sdbusplus::async
{
+context::context(bus_t&& b) : bus(std::move(b))
+{
+ dbus_source = event_loop.add_io(bus.get_fd(), EPOLLIN, dbus_event_handle,
+ this);
+}
+
namespace details
{
@@ -22,14 +29,15 @@
// Called by the `caller` to indicate the Sender is completed.
virtual void complete() noexcept = 0;
+ // Called by the `caller` to indicate the Sender should be stopped.
+ virtual void stop() noexcept = 0;
// Arm the completion event.
void arm() noexcept;
// Data to share with the worker.
context& ctx;
- pollfd fd{};
- int timeout = 0;
+ std::chrono::microseconds timeout{};
static task<> loop(context& ctx);
static void wait_once(context& ctx);
@@ -54,6 +62,13 @@
execution::set_value(std::move(this->receiver));
}
+ void stop() noexcept override final
+ {
+ // Stop can be called when the context is shutting down,
+ // so treat it as if the receiver completed.
+ execution::set_value(std::move(this->receiver));
+ }
+
friend void tag_invoke(execution::start_t,
wait_process_operation& self) noexcept
{
@@ -86,7 +101,7 @@
task<> wait_process_completion::loop(context& ctx)
{
- while (1)
+ while (!ctx.stop_requested())
{
// Handle the next sdbus event.
co_await wait_process_sender(ctx);
@@ -108,6 +123,19 @@
}
}
+bool context::request_stop() noexcept
+{
+ auto first_stop = stop.request_stop();
+
+ if (first_stop)
+ {
+ caller_wait.notify_one();
+ event_loop.break_run();
+ }
+
+ return first_stop;
+}
+
void context::caller_run(task<> startup)
{
// Start up the worker thread.
@@ -115,14 +143,14 @@
worker_run(std::move(startup));
}};
- while (1)
+ // Run until the context requested to stop.
+ while (!stop_requested())
{
- // Handle 'sd_bus_wait's.
+ // Handle waiting on all the sd-events.
details::wait_process_completion::wait_once(*this);
}
- // TODO: We can't actually get here. Need to deal with stop conditions and
- // then we'll need this code.
+ // Stop has been requested, so finish up the loop.
loop.finish();
if (worker_thread.joinable())
{
@@ -156,42 +184,32 @@
return;
}
- // We need to call wait now, so formulate all the data that the 'caller'
- // needs.
-
- // Get the bus' pollfd data.
- auto b = get_busp(ctx.get_bus());
- fd = pollfd{sd_bus_get_fd(b), static_cast<short>(sd_bus_get_events(b)), 0};
+ // We need to call wait now, get the current timeout and stage ourselves
+ // as the next completion.
// Get the bus' timeout.
- uint64_t to_nsec = 0;
- sd_bus_get_timeout(b, &to_nsec);
+ uint64_t to_usec = 0;
+ sd_bus_get_timeout(get_busp(ctx.get_bus()), &to_usec);
- if (to_nsec == UINT64_MAX)
+ if (to_usec == UINT64_MAX)
{
// sd_bus_get_timeout returns UINT64_MAX to indicate 'wait forever'.
- // Turn this into a negative number for `poll`.
- timeout = -1;
+ // Turn this into -1 for sd-event.
+ timeout = std::chrono::microseconds{-1};
}
else
{
- // Otherwise, convert usec from sd_bus to msec for poll.
- // sd_bus manpage suggests you should round-up (ceil).
- timeout = std::chrono::ceil<std::chrono::milliseconds>(
- std::chrono::microseconds(to_nsec))
- .count();
+ timeout = std::chrono::microseconds(to_usec);
}
// Assign ourselves as the pending completion and release the caller.
std::lock_guard lock{ctx.lock};
- ctx.complete = this;
+ ctx.staged = this;
ctx.caller_wait.notify_one();
}
void details::wait_process_completion::wait_once(context& ctx)
{
- details::wait_process_completion* c = nullptr;
-
// Scope for lock.
{
std::unique_lock lock{ctx.lock};
@@ -199,17 +217,75 @@
// If there isn't a completion waiting already, wait on the condition
// variable for one to show up (we can't call `poll` yet because we
// don't have the required parameters).
- ctx.caller_wait.wait(lock, [&] { return ctx.complete != nullptr; });
+ ctx.caller_wait.wait(lock, [&] {
+ return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
+ (ctx.stop_requested());
+ });
- // Save the waiter and call `poll`.
- c = std::exchange(ctx.complete, nullptr);
- poll(&c->fd, 1, c->timeout);
+ // Save the waiter as pending.
+ if (ctx.pending == nullptr)
+ {
+ ctx.pending = std::exchange(ctx.staged, nullptr);
+ }
}
- // Outside the lock complete the operation; this can cause the Receiver
- // task (the worker) to start executing, hence why we do not want the
- // lock held.
- c->complete();
+ // If the context has been requested to be stopped, exit now instead of
+ // running the context event loop.
+ if (ctx.stop_requested())
+ {
+ return;
+ }
+
+ // Run the event loop to process one request.
+ ctx.event_loop.run_one(ctx.pending->timeout);
+
+ // If there is a stop requested, we need to stop the pending operation.
+ if (ctx.stop_requested())
+ {
+ decltype(ctx.pending) pending = nullptr;
+
+ {
+ std::lock_guard lock{ctx.lock};
+ pending = std::exchange(ctx.pending, nullptr);
+ }
+
+ // Do the stop outside the lock to prevent potential deadlocks due to
+ // the stop handler running.
+ if (pending != nullptr)
+ {
+ pending->stop();
+ }
+ }
+}
+
+int context::dbus_event_handle(sd_event_source*, int, uint32_t, void* data)
+{
+ auto self = static_cast<context*>(data);
+
+ decltype(self->pending) pending = nullptr;
+ {
+ std::lock_guard lock{self->lock};
+ pending = std::exchange(self->pending, nullptr);
+ }
+
+ // Outside the lock complete the pending operation.
+ //
+ // This can cause the Receiver task (the worker) to start executing (on
+ // this thread!), hence we do not want the lock held in order to avoid
+ // deadlocks.
+ if (pending != nullptr)
+ {
+ if (self->stop_requested())
+ {
+ pending->stop();
+ }
+ else
+ {
+ pending->complete();
+ }
+ }
+
+ return 0;
}
} // namespace sdbusplus::async
diff --git a/test/async/context.cpp b/test/async/context.cpp
new file mode 100644
index 0000000..60d3d8d
--- /dev/null
+++ b/test/async/context.cpp
@@ -0,0 +1,10 @@
+#include <sdbusplus/async.hpp>
+
+#include <gtest/gtest.h>
+
+TEST(Context, RunSimple)
+{
+ sdbusplus::async::context ctx;
+ ctx.run(std::execution::just() |
+ std::execution::then([&ctx]() { ctx.request_stop(); }));
+}
diff --git a/test/meson.build b/test/meson.build
index e62c1e7..c2e937a 100644
--- a/test/meson.build
+++ b/test/meson.build
@@ -20,6 +20,7 @@
endif
tests = [
+ 'async/context',
'async/task',
'bus/list_names',
'bus/match',