async: add match
Define a class which will register for a dbus signal match
and generate Senders that await new signals.
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: Ib1d77708d74e6063bcaa08fc76fb98667ee7a2cb
diff --git a/src/async/match.cpp b/src/async/match.cpp
new file mode 100644
index 0000000..cee5c24
--- /dev/null
+++ b/src/async/match.cpp
@@ -0,0 +1,86 @@
+#include <sdbusplus/async/match.hpp>
+
+namespace sdbusplus::async
+{
+
+match::match(context& ctx, const std::string_view& pattern)
+{
+ // C-style callback to redirect into this::handle_match.
+ static auto match_cb = [](message::msgp_t msg, void* ctx,
+ sd_bus_error*) noexcept {
+ static_cast<match*>(ctx)->handle_match(message_t{msg});
+ return 0;
+ };
+
+ sd_bus_slot* s = nullptr;
+ auto r = sd_bus_add_match(get_busp(ctx.get_bus()), &s, pattern.data(),
+ match_cb, this);
+ if (r < 0)
+ {
+ throw exception::SdBusError(-r, "sd_bus_add_match (async::match)");
+ }
+
+ slot = std::move(s);
+}
+
+void match_ns::match_completion::arm() noexcept
+{
+ // Set ourselves as the awaiting Receiver and see if there is a message
+ // to immediately complete on.
+
+ std::unique_lock lock{m.lock};
+
+ if (std::exchange(m.complete, this) != nullptr)
+ {
+ // We do not support two awaiters; throw exception. Since we are in
+ // a noexcept context this will std::terminate anyhow, which is
+ // approximately the same as 'assert' but with better information.
+ try
+ {
+ throw std::logic_error(
+ "match_completion started with another await already pending!");
+ }
+ catch (...)
+ {
+ std::terminate();
+ }
+ }
+
+ m.handle_completion(std::move(lock));
+}
+
+void match::handle_match(message_t&& msg) noexcept
+{
+ // Insert the message into the queue and see if there is a pair ready for
+ // completion (Receiver + message).
+ std::unique_lock l{lock};
+ queue.emplace(std::move(msg));
+ handle_completion(std::move(l));
+}
+
+void match::handle_completion(std::unique_lock<std::mutex>&& l) noexcept
+{
+ auto lock = std::move(l);
+
+ // If there is no match_completion, there is no awaiting Receiver.
+ // If the queue is empty, there is no message waiting, so the waiting
+ // Receiver isn't complete.
+ if ((complete == nullptr) || queue.empty())
+ {
+ return;
+ }
+
+ // Get the waiting completion and message.
+ auto c = std::exchange(complete, nullptr);
+ auto msg = std::move(queue.front());
+ queue.pop();
+
+ // Unlock before calling complete because the completed task may run and
+ // attempt to complete on the next event (and thus deadlock).
+ lock.unlock();
+
+ // Signal completion.
+ c->complete(std::move(msg));
+}
+
+} // namespace sdbusplus::async