async: add match
Define a class which will register for a dbus signal match
and generate Senders that await new signals.
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: Ib1d77708d74e6063bcaa08fc76fb98667ee7a2cb
diff --git a/example/coroutine-example.cpp b/example/coroutine-example.cpp
index 8cfa0f4..2013771 100644
--- a/example/coroutine-example.cpp
+++ b/example/coroutine-example.cpp
@@ -69,6 +69,26 @@
<< e.what() << std::endl;
}
+ // Create a match for the NameOwnerChanged signal.
+ namespace rules = sdbusplus::bus::match::rules;
+ auto match = sdbusplus::async::match(ctx, rules::nameOwnerChanged());
+
+ // Listen for the signal 4 times...
+ for (size_t i = 0; i < 4; ++i)
+ {
+ auto [service, old_name, new_name] =
+ co_await match.next<std::string, std::string, std::string>();
+
+ if (!new_name.empty())
+ {
+ std::cout << new_name << " owns " << service << std::endl;
+ }
+ else
+ {
+ std::cout << service << " released" << std::endl;
+ }
+ };
+
co_return;
}
diff --git a/include/sdbusplus/async.hpp b/include/sdbusplus/async.hpp
index 2ef1df6..52f1294 100644
--- a/include/sdbusplus/async.hpp
+++ b/include/sdbusplus/async.hpp
@@ -2,5 +2,6 @@
#include <sdbusplus/async/context.hpp>
#include <sdbusplus/async/execution.hpp>
+#include <sdbusplus/async/match.hpp>
#include <sdbusplus/async/proxy.hpp>
#include <sdbusplus/async/task.hpp>
diff --git a/include/sdbusplus/async/match.hpp b/include/sdbusplus/async/match.hpp
new file mode 100644
index 0000000..a368c01
--- /dev/null
+++ b/include/sdbusplus/async/match.hpp
@@ -0,0 +1,151 @@
+#pragma once
+#include <sdbusplus/async/context.hpp>
+#include <sdbusplus/async/execution.hpp>
+#include <sdbusplus/bus/match.hpp>
+#include <sdbusplus/message.hpp>
+#include <sdbusplus/slot.hpp>
+
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+#include <string_view>
+
+namespace sdbusplus::async
+{
+namespace match_ns
+{
+struct match_completion;
+}
+
+/** Generator of dbus match Senders.
+ *
+ * This class registers a signal match pattern with the dbus and generates
+ * Senders using `next` to await the next matching signal.
+ */
+class match : private bus::details::bus_friend
+{
+ public:
+ match() = delete;
+ match(const match&) = delete;
+ match& operator=(const match&) = delete;
+ match(match&&) = delete;
+ match& operator=(match&&) = delete;
+ ~match() = default;
+
+ /** Construct the match using the `pattern` string on the bus managed by the
+ * context. */
+ match(context& ctx, const std::string_view& pattern);
+
+ /** Get the Sender for the next event (as message).
+ *
+ * Note: the implementation only supports a single awaiting task. Two
+ * tasks should not share this object and both call `next`.
+ */
+ auto next() noexcept;
+
+ /** Get the Sender for the next event, which yields one of:
+ * void, Rs, tuple<Rs...>
+ *
+ * Note: the implementation only supports a single awaiting task. Two
+ * tasks should not share this object and both call `next`.
+ */
+ template <typename... Rs>
+ auto next() noexcept;
+
+ friend match_ns::match_completion;
+
+ private:
+ sdbusplus::slot_t slot{};
+
+ std::mutex lock{};
+ std::queue<sdbusplus::message_t> queue{};
+ match_ns::match_completion* complete = nullptr;
+
+ /** Handle an incoming match event. */
+ void handle_match(message_t&&) noexcept;
+
+ /** Signal completion if there is an awaiting Receiver.
+ *
+ * This must be called with `lock` held (and ownership transfers).
+ */
+ void handle_completion(std::unique_lock<std::mutex>&&) noexcept;
+};
+
+namespace match_ns
+{
+// Virtual class to handle the match Receiver completions.
+struct match_completion
+{
+ match_completion() = delete;
+ match_completion(match_completion&&) = delete;
+
+ explicit match_completion(match& m) : m(m){};
+ virtual ~match_completion() = default;
+
+ friend match;
+
+ friend void tag_invoke(execution::start_t, match_completion& self) noexcept
+ {
+ self.arm();
+ }
+
+ private:
+ virtual void complete(message_t&&) noexcept = 0;
+ void arm() noexcept;
+
+ match& m;
+};
+
+// Implementation (templated based on Receiver) of match_completion.
+template <execution::receiver Receiver>
+struct match_operation : match_completion
+{
+ match_operation(match& m, Receiver r) :
+ match_completion(m), receiver(std::move(r))
+ {}
+
+ private:
+ void complete(message_t&& msg) noexcept override final
+ {
+ execution::set_value(std::move(receiver), std::move(msg));
+ }
+
+ Receiver receiver;
+};
+
+// match Sender implementation.
+struct match_sender
+{
+ match_sender() = delete;
+ explicit match_sender(match& m) noexcept : m(m){};
+
+ friend auto tag_invoke(execution::get_completion_signatures_t,
+ const match_sender&, auto)
+ -> execution::completion_signatures<execution::set_value_t(message_t)>;
+
+ template <execution::receiver R>
+ friend auto tag_invoke(execution::connect_t, match_sender&& self, R r)
+ -> match_operation<R>
+ {
+ return {self.m, std::move(r)};
+ }
+
+ private:
+ match& m;
+};
+
+}; // namespace match_ns
+
+auto match::next() noexcept
+{
+ return match_ns::match_sender(*this);
+}
+
+template <typename... Rs>
+auto match::next() noexcept
+{
+ return match_ns::match_sender(*this) |
+ execution::then([](message_t&& m) { return m.unpack<Rs...>(); });
+}
+
+} // namespace sdbusplus::async
diff --git a/meson.build b/meson.build
index 640059b..6239a93 100644
--- a/meson.build
+++ b/meson.build
@@ -25,6 +25,7 @@
libsdbusplus_src = files(
'src/async/context.cpp',
+ 'src/async/match.cpp',
'src/bus.cpp',
'src/exception.cpp',
'src/message/native_types.cpp',
diff --git a/src/async/match.cpp b/src/async/match.cpp
new file mode 100644
index 0000000..cee5c24
--- /dev/null
+++ b/src/async/match.cpp
@@ -0,0 +1,86 @@
+#include <sdbusplus/async/match.hpp>
+
+namespace sdbusplus::async
+{
+
+match::match(context& ctx, const std::string_view& pattern)
+{
+ // C-style callback to redirect into this::handle_match.
+ static auto match_cb = [](message::msgp_t msg, void* ctx,
+ sd_bus_error*) noexcept {
+ static_cast<match*>(ctx)->handle_match(message_t{msg});
+ return 0;
+ };
+
+ sd_bus_slot* s = nullptr;
+ auto r = sd_bus_add_match(get_busp(ctx.get_bus()), &s, pattern.data(),
+ match_cb, this);
+ if (r < 0)
+ {
+ throw exception::SdBusError(-r, "sd_bus_add_match (async::match)");
+ }
+
+ slot = std::move(s);
+}
+
+void match_ns::match_completion::arm() noexcept
+{
+ // Set ourselves as the awaiting Receiver and see if there is a message
+ // to immediately complete on.
+
+ std::unique_lock lock{m.lock};
+
+ if (std::exchange(m.complete, this) != nullptr)
+ {
+ // We do not support two awaiters; throw exception. Since we are in
+ // a noexcept context this will std::terminate anyhow, which is
+ // approximately the same as 'assert' but with better information.
+ try
+ {
+ throw std::logic_error(
+ "match_completion started with another await already pending!");
+ }
+ catch (...)
+ {
+ std::terminate();
+ }
+ }
+
+ m.handle_completion(std::move(lock));
+}
+
+void match::handle_match(message_t&& msg) noexcept
+{
+ // Insert the message into the queue and see if there is a pair ready for
+ // completion (Receiver + message).
+ std::unique_lock l{lock};
+ queue.emplace(std::move(msg));
+ handle_completion(std::move(l));
+}
+
+void match::handle_completion(std::unique_lock<std::mutex>&& l) noexcept
+{
+ auto lock = std::move(l);
+
+ // If there is no match_completion, there is no awaiting Receiver.
+ // If the queue is empty, there is no message waiting, so the waiting
+ // Receiver isn't complete.
+ if ((complete == nullptr) || queue.empty())
+ {
+ return;
+ }
+
+ // Get the waiting completion and message.
+ auto c = std::exchange(complete, nullptr);
+ auto msg = std::move(queue.front());
+ queue.pop();
+
+ // Unlock before calling complete because the completed task may run and
+ // attempt to complete on the next event (and thus deadlock).
+ lock.unlock();
+
+ // Signal completion.
+ c->complete(std::move(msg));
+}
+
+} // namespace sdbusplus::async