async: add match

Define a class which will register for a dbus signal match
and generate Senders that await new signals.

Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: Ib1d77708d74e6063bcaa08fc76fb98667ee7a2cb
diff --git a/example/coroutine-example.cpp b/example/coroutine-example.cpp
index 8cfa0f4..2013771 100644
--- a/example/coroutine-example.cpp
+++ b/example/coroutine-example.cpp
@@ -69,6 +69,26 @@
                   << e.what() << std::endl;
     }
 
+    // Create a match for the NameOwnerChanged signal.
+    namespace rules = sdbusplus::bus::match::rules;
+    auto match = sdbusplus::async::match(ctx, rules::nameOwnerChanged());
+
+    // Listen for the signal 4 times...
+    for (size_t i = 0; i < 4; ++i)
+    {
+        auto [service, old_name, new_name] =
+            co_await match.next<std::string, std::string, std::string>();
+
+        if (!new_name.empty())
+        {
+            std::cout << new_name << " owns " << service << std::endl;
+        }
+        else
+        {
+            std::cout << service << " released" << std::endl;
+        }
+    };
+
     co_return;
 }
 
diff --git a/include/sdbusplus/async.hpp b/include/sdbusplus/async.hpp
index 2ef1df6..52f1294 100644
--- a/include/sdbusplus/async.hpp
+++ b/include/sdbusplus/async.hpp
@@ -2,5 +2,6 @@
 
 #include <sdbusplus/async/context.hpp>
 #include <sdbusplus/async/execution.hpp>
+#include <sdbusplus/async/match.hpp>
 #include <sdbusplus/async/proxy.hpp>
 #include <sdbusplus/async/task.hpp>
diff --git a/include/sdbusplus/async/match.hpp b/include/sdbusplus/async/match.hpp
new file mode 100644
index 0000000..a368c01
--- /dev/null
+++ b/include/sdbusplus/async/match.hpp
@@ -0,0 +1,151 @@
+#pragma once
+#include <sdbusplus/async/context.hpp>
+#include <sdbusplus/async/execution.hpp>
+#include <sdbusplus/bus/match.hpp>
+#include <sdbusplus/message.hpp>
+#include <sdbusplus/slot.hpp>
+
+#include <condition_variable>
+#include <mutex>
+#include <queue>
+#include <string_view>
+
+namespace sdbusplus::async
+{
+namespace match_ns
+{
+struct match_completion;
+}
+
+/** Generator of dbus match Senders.
+ *
+ *  This class registers a signal match pattern with the dbus and generates
+ *  Senders using `next` to await the next matching signal.
+ */
+class match : private bus::details::bus_friend
+{
+  public:
+    match() = delete;
+    match(const match&) = delete;
+    match& operator=(const match&) = delete;
+    match(match&&) = delete;
+    match& operator=(match&&) = delete;
+    ~match() = default;
+
+    /** Construct the match using the `pattern` string on the bus managed by the
+     *  context. */
+    match(context& ctx, const std::string_view& pattern);
+
+    /** Get the Sender for the next event (as message).
+     *
+     *  Note: the implementation only supports a single awaiting task.  Two
+     *  tasks should not share this object and both call `next`.
+     */
+    auto next() noexcept;
+
+    /** Get the Sender for the next event, which yields one of:
+     *    void, Rs, tuple<Rs...>
+     *
+     *  Note: the implementation only supports a single awaiting task.  Two
+     *  tasks should not share this object and both call `next`.
+     */
+    template <typename... Rs>
+    auto next() noexcept;
+
+    friend match_ns::match_completion;
+
+  private:
+    sdbusplus::slot_t slot{};
+
+    std::mutex lock{};
+    std::queue<sdbusplus::message_t> queue{};
+    match_ns::match_completion* complete = nullptr;
+
+    /** Handle an incoming match event. */
+    void handle_match(message_t&&) noexcept;
+
+    /** Signal completion if there is an awaiting Receiver.
+     *
+     *  This must be called with `lock` held (and ownership transfers).
+     */
+    void handle_completion(std::unique_lock<std::mutex>&&) noexcept;
+};
+
+namespace match_ns
+{
+// Virtual class to handle the match Receiver completions.
+struct match_completion
+{
+    match_completion() = delete;
+    match_completion(match_completion&&) = delete;
+
+    explicit match_completion(match& m) : m(m){};
+    virtual ~match_completion() = default;
+
+    friend match;
+
+    friend void tag_invoke(execution::start_t, match_completion& self) noexcept
+    {
+        self.arm();
+    }
+
+  private:
+    virtual void complete(message_t&&) noexcept = 0;
+    void arm() noexcept;
+
+    match& m;
+};
+
+// Implementation (templated based on Receiver) of match_completion.
+template <execution::receiver Receiver>
+struct match_operation : match_completion
+{
+    match_operation(match& m, Receiver r) :
+        match_completion(m), receiver(std::move(r))
+    {}
+
+  private:
+    void complete(message_t&& msg) noexcept override final
+    {
+        execution::set_value(std::move(receiver), std::move(msg));
+    }
+
+    Receiver receiver;
+};
+
+// match Sender implementation.
+struct match_sender
+{
+    match_sender() = delete;
+    explicit match_sender(match& m) noexcept : m(m){};
+
+    friend auto tag_invoke(execution::get_completion_signatures_t,
+                           const match_sender&, auto)
+        -> execution::completion_signatures<execution::set_value_t(message_t)>;
+
+    template <execution::receiver R>
+    friend auto tag_invoke(execution::connect_t, match_sender&& self, R r)
+        -> match_operation<R>
+    {
+        return {self.m, std::move(r)};
+    }
+
+  private:
+    match& m;
+};
+
+}; // namespace match_ns
+
+auto match::next() noexcept
+{
+    return match_ns::match_sender(*this);
+}
+
+template <typename... Rs>
+auto match::next() noexcept
+{
+    return match_ns::match_sender(*this) |
+           execution::then([](message_t&& m) { return m.unpack<Rs...>(); });
+}
+
+} // namespace sdbusplus::async
diff --git a/meson.build b/meson.build
index 640059b..6239a93 100644
--- a/meson.build
+++ b/meson.build
@@ -25,6 +25,7 @@
 
 libsdbusplus_src = files(
     'src/async/context.cpp',
+    'src/async/match.cpp',
     'src/bus.cpp',
     'src/exception.cpp',
     'src/message/native_types.cpp',
diff --git a/src/async/match.cpp b/src/async/match.cpp
new file mode 100644
index 0000000..cee5c24
--- /dev/null
+++ b/src/async/match.cpp
@@ -0,0 +1,86 @@
+#include <sdbusplus/async/match.hpp>
+
+namespace sdbusplus::async
+{
+
+match::match(context& ctx, const std::string_view& pattern)
+{
+    // C-style callback to redirect into this::handle_match.
+    static auto match_cb = [](message::msgp_t msg, void* ctx,
+                              sd_bus_error*) noexcept {
+        static_cast<match*>(ctx)->handle_match(message_t{msg});
+        return 0;
+    };
+
+    sd_bus_slot* s = nullptr;
+    auto r = sd_bus_add_match(get_busp(ctx.get_bus()), &s, pattern.data(),
+                              match_cb, this);
+    if (r < 0)
+    {
+        throw exception::SdBusError(-r, "sd_bus_add_match (async::match)");
+    }
+
+    slot = std::move(s);
+}
+
+void match_ns::match_completion::arm() noexcept
+{
+    // Set ourselves as the awaiting Receiver and see if there is a message
+    // to immediately complete on.
+
+    std::unique_lock lock{m.lock};
+
+    if (std::exchange(m.complete, this) != nullptr)
+    {
+        // We do not support two awaiters; throw exception.  Since we are in
+        // a noexcept context this will std::terminate anyhow, which is
+        // approximately the same as 'assert' but with better information.
+        try
+        {
+            throw std::logic_error(
+                "match_completion started with another await already pending!");
+        }
+        catch (...)
+        {
+            std::terminate();
+        }
+    }
+
+    m.handle_completion(std::move(lock));
+}
+
+void match::handle_match(message_t&& msg) noexcept
+{
+    // Insert the message into the queue and see if there is a pair ready for
+    // completion (Receiver + message).
+    std::unique_lock l{lock};
+    queue.emplace(std::move(msg));
+    handle_completion(std::move(l));
+}
+
+void match::handle_completion(std::unique_lock<std::mutex>&& l) noexcept
+{
+    auto lock = std::move(l);
+
+    // If there is no match_completion, there is no awaiting Receiver.
+    // If the queue is empty, there is no message waiting, so the waiting
+    // Receiver isn't complete.
+    if ((complete == nullptr) || queue.empty())
+    {
+        return;
+    }
+
+    // Get the waiting completion and message.
+    auto c = std::exchange(complete, nullptr);
+    auto msg = std::move(queue.front());
+    queue.pop();
+
+    // Unlock before calling complete because the completed task may run and
+    // attempt to complete on the next event (and thus deadlock).
+    lock.unlock();
+
+    // Signal completion.
+    c->complete(std::move(msg));
+}
+
+} // namespace sdbusplus::async