async: add context support
In order to handle async operations, we need some kind of
'run-loop' around the sd_bus_wait/sd_bus_process process.
Define a 'context' to hold the bus and manage the run-loop.
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I16099dd2e15809776827dd85327d950af3f8f5b2
diff --git a/src/async/context.cpp b/src/async/context.cpp
new file mode 100644
index 0000000..9565025
--- /dev/null
+++ b/src/async/context.cpp
@@ -0,0 +1,215 @@
+#include <poll.h>
+#include <systemd/sd-bus.h>
+
+#include <sdbusplus/async/context.hpp>
+
+namespace sdbusplus::async
+{
+
+namespace details
+{
+
+/* The sd_bus_wait/process completion event.
+ *
+ * The wait/process handshake is modelled as a Sender with the the worker
+ * task `co_await`ing Senders over and over. This class is the completion
+ * handling for the Sender (to get it back to the Receiver, ie. the worker).
+ */
+struct wait_process_completion : bus::details::bus_friend
+{
+ explicit wait_process_completion(context& ctx) : ctx(ctx) {}
+ virtual ~wait_process_completion() = default;
+
+ // Called by the `caller` to indicate the Sender is completed.
+ virtual void complete() noexcept = 0;
+
+ // Arm the completion event.
+ void arm() noexcept;
+
+ // Data to share with the worker.
+ context& ctx;
+ pollfd fd{};
+ int timeout = 0;
+
+ static task<> loop(context& ctx);
+ static void wait_once(context& ctx);
+};
+
+/* The completion template based on receiver type.
+ *
+ * The type of the receivers (typically the co_awaiter) is only known by
+ * a template, so we need a sub-class of completion to hold the receiver.
+ */
+template <execution::receiver R>
+struct wait_process_operation : public wait_process_completion
+{
+ wait_process_operation(context& ctx, R r) :
+ wait_process_completion(ctx), receiver(std::move(r))
+ {}
+
+ wait_process_operation(wait_process_operation&&) = delete;
+
+ void complete() noexcept override final
+ {
+ execution::set_value(std::move(this->receiver));
+ }
+
+ friend void tag_invoke(execution::start_t,
+ wait_process_operation& self) noexcept
+ {
+ self.arm();
+ }
+
+ R receiver;
+};
+
+/* The sender for the wait/process event. */
+struct wait_process_sender
+{
+ explicit wait_process_sender(context& ctx) : ctx(ctx){};
+
+ friend auto tag_invoke(execution::get_completion_signatures_t,
+ const wait_process_sender&, auto)
+ -> execution::completion_signatures<execution::set_value_t()>;
+
+ template <execution::receiver R>
+ friend auto tag_invoke(execution::connect_t, wait_process_sender&& self,
+ R r) -> wait_process_operation<R>
+ {
+ // Create the completion for the wait.
+ return {self.ctx, std::move(r)};
+ }
+
+ private:
+ context& ctx;
+};
+
+task<> wait_process_completion::loop(context& ctx)
+{
+ while (1)
+ {
+ // Handle the next sdbus event.
+ co_await wait_process_sender(ctx);
+
+ // Completion likely happened on the context 'caller' thread, so
+ // we need to switch to the worker thread.
+ co_await execution::schedule(ctx.loop.get_scheduler());
+ }
+}
+
+} // namespace details
+
+context::~context() noexcept(false)
+{
+ if (worker_thread.joinable())
+ {
+ throw std::logic_error(
+ "sdbusplus::async::context destructed without completion.");
+ }
+}
+
+void context::caller_run(task<> startup)
+{
+ // Start up the worker thread.
+ worker_thread = std::thread{[this, startup = std::move(startup)]() mutable {
+ worker_run(std::move(startup));
+ }};
+
+ while (1)
+ {
+ // Handle 'sd_bus_wait's.
+ details::wait_process_completion::wait_once(*this);
+ }
+
+ // TODO: We can't actually get here. Need to deal with stop conditions and
+ // then we'll need this code.
+ loop.finish();
+ if (worker_thread.joinable())
+ {
+ worker_thread.join();
+ }
+}
+
+void context::worker_run(task<> startup)
+{
+ // Begin the 'startup' task.
+ // This shouldn't start detached because we want to be able to forward
+ // failures back to the 'run'. execution::ensure_started isn't
+ // implemented yet, so we don't have a lot of other options.
+ execution::start_detached(std::move(startup));
+
+ // Also start up the sdbus 'wait/process' loop.
+ execution::start_detached(details::wait_process_completion::loop(*this));
+
+ // Run the execution::run_loop to handle all the tasks.
+ loop.run();
+}
+
+void details::wait_process_completion::arm() noexcept
+{
+ // Call process. True indicates something was handled and we do not
+ // need to `wait`, because there might be yet another pending operation
+ // to process, so immediately signal the operation as complete.
+ if (ctx.get_bus().process_discard())
+ {
+ this->complete();
+ return;
+ }
+
+ // We need to call wait now, so formulate all the data that the 'caller'
+ // needs.
+
+ // Get the bus' pollfd data.
+ auto b = get_busp(ctx.get_bus());
+ fd = pollfd{sd_bus_get_fd(b), static_cast<short>(sd_bus_get_events(b)), 0};
+
+ // Get the bus' timeout.
+ uint64_t to_nsec = 0;
+ sd_bus_get_timeout(b, &to_nsec);
+
+ if (to_nsec == UINT64_MAX)
+ {
+ // sd_bus_get_timeout returns UINT64_MAX to indicate 'wait forever'.
+ // Turn this into a negative number for `poll`.
+ timeout = -1;
+ }
+ else
+ {
+ // Otherwise, convert usec from sd_bus to msec for poll.
+ // sd_bus manpage suggests you should round-up (ceil).
+ timeout = std::chrono::ceil<std::chrono::milliseconds>(
+ std::chrono::microseconds(to_nsec))
+ .count();
+ }
+
+ // Assign ourselves as the pending completion and release the caller.
+ std::lock_guard lock{ctx.lock};
+ ctx.complete = this;
+ ctx.caller_wait.notify_one();
+}
+
+void details::wait_process_completion::wait_once(context& ctx)
+{
+ details::wait_process_completion* c = nullptr;
+
+ // Scope for lock.
+ {
+ std::unique_lock lock{ctx.lock};
+
+ // If there isn't a completion waiting already, wait on the condition
+ // variable for one to show up (we can't call `poll` yet because we
+ // don't have the required parameters).
+ ctx.caller_wait.wait(lock, [&] { return ctx.complete != nullptr; });
+
+ // Save the waiter and call `poll`.
+ c = std::exchange(ctx.complete, nullptr);
+ poll(&c->fd, 1, c->timeout);
+ }
+
+ // Outside the lock complete the operation; this can cause the Receiver
+ // task (the worker) to start executing, hence why we do not want the
+ // lock held.
+ c->complete();
+}
+
+} // namespace sdbusplus::async