async: add match

Define a class which will register for a dbus signal match
and generate Senders that await new signals.

Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: Ib1d77708d74e6063bcaa08fc76fb98667ee7a2cb
diff --git a/src/async/match.cpp b/src/async/match.cpp
new file mode 100644
index 0000000..cee5c24
--- /dev/null
+++ b/src/async/match.cpp
@@ -0,0 +1,86 @@
+#include <sdbusplus/async/match.hpp>
+
+namespace sdbusplus::async
+{
+
+match::match(context& ctx, const std::string_view& pattern)
+{
+    // C-style callback to redirect into this::handle_match.
+    static auto match_cb = [](message::msgp_t msg, void* ctx,
+                              sd_bus_error*) noexcept {
+        static_cast<match*>(ctx)->handle_match(message_t{msg});
+        return 0;
+    };
+
+    sd_bus_slot* s = nullptr;
+    auto r = sd_bus_add_match(get_busp(ctx.get_bus()), &s, pattern.data(),
+                              match_cb, this);
+    if (r < 0)
+    {
+        throw exception::SdBusError(-r, "sd_bus_add_match (async::match)");
+    }
+
+    slot = std::move(s);
+}
+
+void match_ns::match_completion::arm() noexcept
+{
+    // Set ourselves as the awaiting Receiver and see if there is a message
+    // to immediately complete on.
+
+    std::unique_lock lock{m.lock};
+
+    if (std::exchange(m.complete, this) != nullptr)
+    {
+        // We do not support two awaiters; throw exception.  Since we are in
+        // a noexcept context this will std::terminate anyhow, which is
+        // approximately the same as 'assert' but with better information.
+        try
+        {
+            throw std::logic_error(
+                "match_completion started with another await already pending!");
+        }
+        catch (...)
+        {
+            std::terminate();
+        }
+    }
+
+    m.handle_completion(std::move(lock));
+}
+
+void match::handle_match(message_t&& msg) noexcept
+{
+    // Insert the message into the queue and see if there is a pair ready for
+    // completion (Receiver + message).
+    std::unique_lock l{lock};
+    queue.emplace(std::move(msg));
+    handle_completion(std::move(l));
+}
+
+void match::handle_completion(std::unique_lock<std::mutex>&& l) noexcept
+{
+    auto lock = std::move(l);
+
+    // If there is no match_completion, there is no awaiting Receiver.
+    // If the queue is empty, there is no message waiting, so the waiting
+    // Receiver isn't complete.
+    if ((complete == nullptr) || queue.empty())
+    {
+        return;
+    }
+
+    // Get the waiting completion and message.
+    auto c = std::exchange(complete, nullptr);
+    auto msg = std::move(queue.front());
+    queue.pop();
+
+    // Unlock before calling complete because the completed task may run and
+    // attempt to complete on the next event (and thus deadlock).
+    lock.unlock();
+
+    // Signal completion.
+    c->complete(std::move(msg));
+}
+
+} // namespace sdbusplus::async