blob: 9565025350aeb305d5f2dc760f1b89865bd03e6f [file] [log] [blame]
Patrick Williams74187662022-08-26 19:28:24 -05001#include <poll.h>
2#include <systemd/sd-bus.h>
3
4#include <sdbusplus/async/context.hpp>
5
6namespace sdbusplus::async
7{
8
9namespace details
10{
11
12/* The sd_bus_wait/process completion event.
13 *
14 * The wait/process handshake is modelled as a Sender with the the worker
15 * task `co_await`ing Senders over and over. This class is the completion
16 * handling for the Sender (to get it back to the Receiver, ie. the worker).
17 */
18struct wait_process_completion : bus::details::bus_friend
19{
20 explicit wait_process_completion(context& ctx) : ctx(ctx) {}
21 virtual ~wait_process_completion() = default;
22
23 // Called by the `caller` to indicate the Sender is completed.
24 virtual void complete() noexcept = 0;
25
26 // Arm the completion event.
27 void arm() noexcept;
28
29 // Data to share with the worker.
30 context& ctx;
31 pollfd fd{};
32 int timeout = 0;
33
34 static task<> loop(context& ctx);
35 static void wait_once(context& ctx);
36};
37
38/* The completion template based on receiver type.
39 *
40 * The type of the receivers (typically the co_awaiter) is only known by
41 * a template, so we need a sub-class of completion to hold the receiver.
42 */
43template <execution::receiver R>
44struct wait_process_operation : public wait_process_completion
45{
46 wait_process_operation(context& ctx, R r) :
47 wait_process_completion(ctx), receiver(std::move(r))
48 {}
49
50 wait_process_operation(wait_process_operation&&) = delete;
51
52 void complete() noexcept override final
53 {
54 execution::set_value(std::move(this->receiver));
55 }
56
57 friend void tag_invoke(execution::start_t,
58 wait_process_operation& self) noexcept
59 {
60 self.arm();
61 }
62
63 R receiver;
64};
65
66/* The sender for the wait/process event. */
67struct wait_process_sender
68{
69 explicit wait_process_sender(context& ctx) : ctx(ctx){};
70
71 friend auto tag_invoke(execution::get_completion_signatures_t,
72 const wait_process_sender&, auto)
73 -> execution::completion_signatures<execution::set_value_t()>;
74
75 template <execution::receiver R>
76 friend auto tag_invoke(execution::connect_t, wait_process_sender&& self,
77 R r) -> wait_process_operation<R>
78 {
79 // Create the completion for the wait.
80 return {self.ctx, std::move(r)};
81 }
82
83 private:
84 context& ctx;
85};
86
87task<> wait_process_completion::loop(context& ctx)
88{
89 while (1)
90 {
91 // Handle the next sdbus event.
92 co_await wait_process_sender(ctx);
93
94 // Completion likely happened on the context 'caller' thread, so
95 // we need to switch to the worker thread.
96 co_await execution::schedule(ctx.loop.get_scheduler());
97 }
98}
99
100} // namespace details
101
102context::~context() noexcept(false)
103{
104 if (worker_thread.joinable())
105 {
106 throw std::logic_error(
107 "sdbusplus::async::context destructed without completion.");
108 }
109}
110
111void context::caller_run(task<> startup)
112{
113 // Start up the worker thread.
114 worker_thread = std::thread{[this, startup = std::move(startup)]() mutable {
115 worker_run(std::move(startup));
116 }};
117
118 while (1)
119 {
120 // Handle 'sd_bus_wait's.
121 details::wait_process_completion::wait_once(*this);
122 }
123
124 // TODO: We can't actually get here. Need to deal with stop conditions and
125 // then we'll need this code.
126 loop.finish();
127 if (worker_thread.joinable())
128 {
129 worker_thread.join();
130 }
131}
132
133void context::worker_run(task<> startup)
134{
135 // Begin the 'startup' task.
136 // This shouldn't start detached because we want to be able to forward
137 // failures back to the 'run'. execution::ensure_started isn't
138 // implemented yet, so we don't have a lot of other options.
139 execution::start_detached(std::move(startup));
140
141 // Also start up the sdbus 'wait/process' loop.
142 execution::start_detached(details::wait_process_completion::loop(*this));
143
144 // Run the execution::run_loop to handle all the tasks.
145 loop.run();
146}
147
148void details::wait_process_completion::arm() noexcept
149{
150 // Call process. True indicates something was handled and we do not
151 // need to `wait`, because there might be yet another pending operation
152 // to process, so immediately signal the operation as complete.
153 if (ctx.get_bus().process_discard())
154 {
155 this->complete();
156 return;
157 }
158
159 // We need to call wait now, so formulate all the data that the 'caller'
160 // needs.
161
162 // Get the bus' pollfd data.
163 auto b = get_busp(ctx.get_bus());
164 fd = pollfd{sd_bus_get_fd(b), static_cast<short>(sd_bus_get_events(b)), 0};
165
166 // Get the bus' timeout.
167 uint64_t to_nsec = 0;
168 sd_bus_get_timeout(b, &to_nsec);
169
170 if (to_nsec == UINT64_MAX)
171 {
172 // sd_bus_get_timeout returns UINT64_MAX to indicate 'wait forever'.
173 // Turn this into a negative number for `poll`.
174 timeout = -1;
175 }
176 else
177 {
178 // Otherwise, convert usec from sd_bus to msec for poll.
179 // sd_bus manpage suggests you should round-up (ceil).
180 timeout = std::chrono::ceil<std::chrono::milliseconds>(
181 std::chrono::microseconds(to_nsec))
182 .count();
183 }
184
185 // Assign ourselves as the pending completion and release the caller.
186 std::lock_guard lock{ctx.lock};
187 ctx.complete = this;
188 ctx.caller_wait.notify_one();
189}
190
191void details::wait_process_completion::wait_once(context& ctx)
192{
193 details::wait_process_completion* c = nullptr;
194
195 // Scope for lock.
196 {
197 std::unique_lock lock{ctx.lock};
198
199 // If there isn't a completion waiting already, wait on the condition
200 // variable for one to show up (we can't call `poll` yet because we
201 // don't have the required parameters).
202 ctx.caller_wait.wait(lock, [&] { return ctx.complete != nullptr; });
203
204 // Save the waiter and call `poll`.
205 c = std::exchange(ctx.complete, nullptr);
206 poll(&c->fd, 1, c->timeout);
207 }
208
209 // Outside the lock complete the operation; this can cause the Receiver
210 // task (the worker) to start executing, hence why we do not want the
211 // lock held.
212 c->complete();
213}
214
215} // namespace sdbusplus::async