blob: cee5c24b928e11f2e4e5a8472243306812c5006f [file] [log] [blame]
Patrick Williams290fa422022-08-29 09:15:58 -05001#include <sdbusplus/async/match.hpp>
2
3namespace sdbusplus::async
4{
5
6match::match(context& ctx, const std::string_view& pattern)
7{
8 // C-style callback to redirect into this::handle_match.
9 static auto match_cb = [](message::msgp_t msg, void* ctx,
10 sd_bus_error*) noexcept {
11 static_cast<match*>(ctx)->handle_match(message_t{msg});
12 return 0;
13 };
14
15 sd_bus_slot* s = nullptr;
16 auto r = sd_bus_add_match(get_busp(ctx.get_bus()), &s, pattern.data(),
17 match_cb, this);
18 if (r < 0)
19 {
20 throw exception::SdBusError(-r, "sd_bus_add_match (async::match)");
21 }
22
23 slot = std::move(s);
24}
25
26void match_ns::match_completion::arm() noexcept
27{
28 // Set ourselves as the awaiting Receiver and see if there is a message
29 // to immediately complete on.
30
31 std::unique_lock lock{m.lock};
32
33 if (std::exchange(m.complete, this) != nullptr)
34 {
35 // We do not support two awaiters; throw exception. Since we are in
36 // a noexcept context this will std::terminate anyhow, which is
37 // approximately the same as 'assert' but with better information.
38 try
39 {
40 throw std::logic_error(
41 "match_completion started with another await already pending!");
42 }
43 catch (...)
44 {
45 std::terminate();
46 }
47 }
48
49 m.handle_completion(std::move(lock));
50}
51
52void match::handle_match(message_t&& msg) noexcept
53{
54 // Insert the message into the queue and see if there is a pair ready for
55 // completion (Receiver + message).
56 std::unique_lock l{lock};
57 queue.emplace(std::move(msg));
58 handle_completion(std::move(l));
59}
60
61void match::handle_completion(std::unique_lock<std::mutex>&& l) noexcept
62{
63 auto lock = std::move(l);
64
65 // If there is no match_completion, there is no awaiting Receiver.
66 // If the queue is empty, there is no message waiting, so the waiting
67 // Receiver isn't complete.
68 if ((complete == nullptr) || queue.empty())
69 {
70 return;
71 }
72
73 // Get the waiting completion and message.
74 auto c = std::exchange(complete, nullptr);
75 auto msg = std::move(queue.front());
76 queue.pop();
77
78 // Unlock before calling complete because the completed task may run and
79 // attempt to complete on the next event (and thus deadlock).
80 lock.unlock();
81
82 // Signal completion.
83 c->complete(std::move(msg));
84}
85
86} // namespace sdbusplus::async