async: match: stop pending sender on dtor

If the match is destructed with a pending Sender completion,
we need to 'stop' that Sender.  We cannot 'complete' the Sender
because we do not have a message (dbus signal) associated with the
match, but if we do nothing then the Sender is hung (and leaked).

Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: Ia6e465d46f0f7da08a385cf7c875bbda88a5ab61
diff --git a/include/sdbusplus/async/match.hpp b/include/sdbusplus/async/match.hpp
index a368c01..132db08 100644
--- a/include/sdbusplus/async/match.hpp
+++ b/include/sdbusplus/async/match.hpp
@@ -30,7 +30,7 @@
     match& operator=(const match&) = delete;
     match(match&&) = delete;
     match& operator=(match&&) = delete;
-    ~match() = default;
+    ~match();
 
     /** Construct the match using the `pattern` string on the bus managed by the
      *  context. */
@@ -91,6 +91,7 @@
 
   private:
     virtual void complete(message_t&&) noexcept = 0;
+    virtual void stop() noexcept = 0;
     void arm() noexcept;
 
     match& m;
@@ -110,6 +111,11 @@
         execution::set_value(std::move(receiver), std::move(msg));
     }
 
+    void stop() noexcept override final
+    {
+        execution::set_stopped(std::move(receiver));
+    }
+
     Receiver receiver;
 };
 
diff --git a/src/async/match.cpp b/src/async/match.cpp
index cee5c24..410e840 100644
--- a/src/async/match.cpp
+++ b/src/async/match.cpp
@@ -23,6 +23,21 @@
     slot = std::move(s);
 }
 
+match::~match()
+{
+    match_ns::match_completion* c = nullptr;
+
+    {
+        std::lock_guard l{lock};
+        c = std::exchange(complete, nullptr);
+    }
+
+    if (c)
+    {
+        c->stop();
+    }
+}
+
 void match_ns::match_completion::arm() noexcept
 {
     // Set ourselves as the awaiting Receiver and see if there is a message
diff --git a/test/async/context.cpp b/test/async/context.cpp
index 4092de5..fe273d1 100644
--- a/test/async/context.cpp
+++ b/test/async/context.cpp
@@ -13,21 +13,24 @@
         ctx.reset();
     }
 
+    void runToStop()
+    {
+        ctx->run(std::execution::just() |
+                 std::execution::then([this]() { ctx->request_stop(); }));
+    }
+
     std::optional<sdbusplus::async::context> ctx{std::in_place};
 };
 
 TEST_F(Context, RunSimple)
 {
-    ctx->run(std::execution::just() |
-             std::execution::then([this]() { ctx->request_stop(); }));
+    runToStop();
 }
 
 TEST_F(Context, SpawnedTask)
 {
     ctx->spawn(std::execution::just());
-
-    ctx->run(std::execution::just() |
-             std::execution::then([this]() { ctx->request_stop(); }));
+    runToStop();
 }
 
 TEST_F(Context, SpawnDelayedTask)
@@ -41,8 +44,7 @@
     ctx->spawn(sdbusplus::async::sleep_for(*ctx, timeout) |
                std::execution::then([&ran]() { ran = true; }));
 
-    ctx->run(std::execution::just() |
-             std::execution::then([this]() { ctx->request_stop(); }));
+    runToStop();
 
     auto stop = std::chrono::steady_clock::now();
 
@@ -50,3 +52,53 @@
     EXPECT_GT(stop - start, timeout);
     EXPECT_LT(stop - start, timeout * 2);
 }
+
+TEST_F(Context, DestructMatcherWithPendingAwait)
+{
+    using namespace std::literals;
+
+    bool ran = false;
+    auto m = std::make_optional<sdbusplus::async::match>(
+        *ctx, sdbusplus::bus::match::rules::interfacesAdded(
+                  "/this/is/a/bogus/path/for/SpawnMatcher"));
+
+    // Await the match completion (which will never happen).
+    ctx->spawn(m->next() | std::execution::then([&ran](...) { ran = true; }));
+
+    // Destruct the match.
+    ctx->spawn(sdbusplus::async::sleep_for(*ctx, 1ms) |
+               std::execution::then([&m](...) { m.reset(); }));
+
+    runToStop();
+
+    EXPECT_FALSE(ran);
+}
+
+TEST_F(Context, DestructMatcherWithPendingAwaitAsTask)
+{
+    using namespace std::literals;
+
+    auto m = std::make_optional<sdbusplus::async::match>(
+        *ctx, sdbusplus::bus::match::rules::interfacesAdded(
+                  "/this/is/a/bogus/path/for/SpawnMatcher"));
+
+    struct _
+    {
+        static auto fn(decltype(m->next()) snd, bool& ran)
+            -> sdbusplus::async::task<>
+        {
+            co_await std::move(snd);
+            ran = true;
+            co_return;
+        }
+    };
+
+    bool ran = false;
+    ctx->spawn(_::fn(m->next(), ran));
+    ctx->spawn(sdbusplus::async::sleep_for(*ctx, 1ms) |
+               std::execution::then([&]() { m.reset(); }));
+
+    runToStop();
+
+    EXPECT_FALSE(ran);
+}