blob: 0b313916390595c419cb1254889e5294e71c7222 [file] [log] [blame]
Patrick Williams74187662022-08-26 19:28:24 -05001#include <systemd/sd-bus.h>
2
3#include <sdbusplus/async/context.hpp>
Patrick Williamsbc139972025-07-08 16:46:50 -04004#include <sdbusplus/async/task.hpp>
5#include <sdbusplus/async/timer.hpp>
Patrick Williams74187662022-08-26 19:28:24 -05006
Patrick Williams73e278b2022-09-16 08:31:36 -05007#include <chrono>
8
Patrick Williams74187662022-08-26 19:28:24 -05009namespace sdbusplus::async
10{
11
Patrick Williams73e278b2022-09-16 08:31:36 -050012context::context(bus_t&& b) : bus(std::move(b))
13{
Patrick Williams06f265f2024-08-16 15:19:49 -040014 dbus_source =
15 event_loop.add_io(bus.get_fd(), EPOLLIN, dbus_event_handle, this);
Patrick Williams73e278b2022-09-16 08:31:36 -050016}
17
Patrick Williams74187662022-08-26 19:28:24 -050018namespace details
19{
20
21/* The sd_bus_wait/process completion event.
22 *
23 * The wait/process handshake is modelled as a Sender with the the worker
24 * task `co_await`ing Senders over and over. This class is the completion
25 * handling for the Sender (to get it back to the Receiver, ie. the worker).
26 */
Patrick Williams4a9e4222023-08-18 09:57:28 -050027struct wait_process_completion : context_ref, bus::details::bus_friend
Patrick Williams74187662022-08-26 19:28:24 -050028{
Patrick Williams4a9e4222023-08-18 09:57:28 -050029 explicit wait_process_completion(context& ctx) : context_ref(ctx) {}
Patrick Williams74187662022-08-26 19:28:24 -050030 virtual ~wait_process_completion() = default;
31
32 // Called by the `caller` to indicate the Sender is completed.
33 virtual void complete() noexcept = 0;
Patrick Williams73e278b2022-09-16 08:31:36 -050034 // Called by the `caller` to indicate the Sender should be stopped.
35 virtual void stop() noexcept = 0;
Patrick Williams74187662022-08-26 19:28:24 -050036
Patrick Williams5e36f4a2025-11-06 17:00:08 -050037 void start() noexcept;
Patrick Williams74187662022-08-26 19:28:24 -050038
39 // Data to share with the worker.
Patrick Williams435eb1b2022-09-16 16:22:07 -050040 event_t::time_resolution timeout{};
Patrick Williams74187662022-08-26 19:28:24 -050041
Patrick Williamsbbc181e2023-11-16 16:29:53 -060042 static auto loop(context& ctx) -> task<>;
Patrick Williams74187662022-08-26 19:28:24 -050043 static void wait_once(context& ctx);
44};
45
46/* The completion template based on receiver type.
47 *
48 * The type of the receivers (typically the co_awaiter) is only known by
49 * a template, so we need a sub-class of completion to hold the receiver.
50 */
51template <execution::receiver R>
52struct wait_process_operation : public wait_process_completion
53{
54 wait_process_operation(context& ctx, R r) :
55 wait_process_completion(ctx), receiver(std::move(r))
56 {}
57
58 wait_process_operation(wait_process_operation&&) = delete;
59
60 void complete() noexcept override final
61 {
62 execution::set_value(std::move(this->receiver));
63 }
64
Patrick Williams73e278b2022-09-16 08:31:36 -050065 void stop() noexcept override final
66 {
67 // Stop can be called when the context is shutting down,
68 // so treat it as if the receiver completed.
69 execution::set_value(std::move(this->receiver));
70 }
71
Patrick Williams74187662022-08-26 19:28:24 -050072 R receiver;
73};
74
75/* The sender for the wait/process event. */
Patrick Williams4a9e4222023-08-18 09:57:28 -050076struct wait_process_sender : public context_ref
Patrick Williams74187662022-08-26 19:28:24 -050077{
Patrick Williams5e36f4a2025-11-06 17:00:08 -050078 using sender_concept = execution::sender_t;
Patrick Williams9c6ec9b2023-06-23 15:47:42 -050079
Patrick Williams4a9e4222023-08-18 09:57:28 -050080 explicit wait_process_sender(context& ctx) : context_ref(ctx) {}
Patrick Williams74187662022-08-26 19:28:24 -050081
Patrick Williams5e36f4a2025-11-06 17:00:08 -050082 template <typename Self, class... Env>
83 static constexpr auto get_completion_signatures(Self&&, Env&&...)
Patrick Williams36137e02024-12-18 11:20:12 -050084 -> execution::completion_signatures<execution::set_value_t()>;
Patrick Williams74187662022-08-26 19:28:24 -050085
86 template <execution::receiver R>
Patrick Williams5e36f4a2025-11-06 17:00:08 -050087 auto connect(R r) -> wait_process_operation<R>
Patrick Williams74187662022-08-26 19:28:24 -050088 {
89 // Create the completion for the wait.
Patrick Williams5e36f4a2025-11-06 17:00:08 -050090 return {ctx, std::move(r)};
Patrick Williams74187662022-08-26 19:28:24 -050091 }
Patrick Williams74187662022-08-26 19:28:24 -050092};
93
Patrick Williamsbbc181e2023-11-16 16:29:53 -060094auto wait_process_completion::loop(context& ctx) -> task<>
Patrick Williams74187662022-08-26 19:28:24 -050095{
Patrick Williams78e436f2022-09-21 10:06:20 -050096 while (!ctx.final_stop.stop_requested())
Patrick Williams74187662022-08-26 19:28:24 -050097 {
Patrick Williamse8e66312023-12-01 16:57:29 -060098 // Handle the next sdbus event. Completion likely happened on a
99 // different thread so we need to transfer back to the worker thread.
Patrick Williamsf083bc12024-12-16 16:13:28 -0500100 co_await execution::continues_on(wait_process_sender(ctx),
101 ctx.loop.get_scheduler());
Patrick Williams74187662022-08-26 19:28:24 -0500102 }
Patrick Williams10483c92022-09-23 11:33:39 -0500103
104 {
105 std::lock_guard lock{ctx.lock};
106 ctx.wait_process_stopped = true;
107 }
Patrick Williams74187662022-08-26 19:28:24 -0500108}
109
110} // namespace details
111
112context::~context() noexcept(false)
113{
114 if (worker_thread.joinable())
115 {
116 throw std::logic_error(
117 "sdbusplus::async::context destructed without completion.");
118 }
119}
120
Patrick Williams3c242ba2022-09-23 09:51:55 -0500121void context::run()
Patrick Williams74187662022-08-26 19:28:24 -0500122{
Patrick Williams10483c92022-09-23 11:33:39 -0500123 // Run the primary portion of the run-loop.
124 caller_run();
Patrick Williams74187662022-08-26 19:28:24 -0500125
Patrick Williams5d16a8e2023-05-25 12:13:24 -0500126 // This should be final_stop...
Patrick Williams10483c92022-09-23 11:33:39 -0500127
128 // We need to wait for the pending wait process and stop it.
129 wait_for_wait_process_stopped();
Patrick Williams4cfc2842022-09-22 09:53:33 -0500130
Patrick Williams78e436f2022-09-21 10:06:20 -0500131 // Wait for all the internal tasks to complete.
Patrick Williams97c31c82023-05-25 11:04:46 -0500132 stdexec::sync_wait(internal_tasks.on_empty());
Patrick Williamsc5b5ff52022-09-21 08:34:22 -0500133
Patrick Williams10483c92022-09-23 11:33:39 -0500134 // Finish up the loop and join the thread.
135 // (There shouldn't be anything going on by this point anyhow.)
Patrick Williams74187662022-08-26 19:28:24 -0500136 loop.finish();
137 if (worker_thread.joinable())
138 {
139 worker_thread.join();
140 }
141}
142
Patrick Williamsbc139972025-07-08 16:46:50 -0400143static auto watchdog_loop(sdbusplus::async::context& ctx) -> task<>
144{
145 auto watchdog_time =
146 std::chrono::microseconds(ctx.get_bus().watchdog_enabled());
147 if (watchdog_time.count() == 0)
148 {
149 co_return;
150 }
151
152 // Recommended interval is half of WATCHDOG_USEC
153 watchdog_time /= 2;
154
155 while (!ctx.stop_requested())
156 {
157 ctx.get_bus().watchdog_pet();
158 co_await sleep_for(ctx, watchdog_time);
159 }
160}
161
Patrick Williams3c242ba2022-09-23 09:51:55 -0500162void context::worker_run()
Patrick Williams74187662022-08-26 19:28:24 -0500163{
Patrick Williamsbc139972025-07-08 16:46:50 -0400164 internal_tasks.spawn(watchdog_loop(*this));
165
Patrick Williams3c242ba2022-09-23 09:51:55 -0500166 // Start the sdbus 'wait/process' loop; treat it as an internal task.
Patrick Williams97c31c82023-05-25 11:04:46 -0500167 internal_tasks.spawn(details::wait_process_completion::loop(*this));
Patrick Williams74187662022-08-26 19:28:24 -0500168
169 // Run the execution::run_loop to handle all the tasks.
170 loop.run();
171}
172
Patrick Williams5d16a8e2023-05-25 12:13:24 -0500173void context::spawn_complete()
Patrick Williams10483c92022-09-23 11:33:39 -0500174{
175 {
176 std::lock_guard l{lock};
177 spawn_watcher_running = false;
Patrick Williams10483c92022-09-23 11:33:39 -0500178 }
179
180 if (stop_requested())
181 {
182 final_stop.request_stop();
183 }
184
185 caller_wait.notify_one();
186 event_loop.break_run();
187}
188
189void context::check_stop_requested()
190{
191 if (stop_requested())
192 {
193 throw std::logic_error(
194 "sdbusplus::async::context spawn called while already stopped.");
195 }
196}
197
198void context::spawn_watcher()
199{
200 {
201 std::lock_guard l{lock};
202 if (spawn_watcher_running)
203 {
204 return;
205 }
206
207 spawn_watcher_running = true;
208 }
209
210 // Spawn the watch for completion / exceptions.
Patrick Williams97c31c82023-05-25 11:04:46 -0500211 internal_tasks.spawn(pending_tasks.on_empty() |
212 execution::then([this]() { spawn_complete(); }));
Patrick Williams10483c92022-09-23 11:33:39 -0500213}
214
215void context::caller_run()
216{
217 // We are able to run the loop until the context is requested to stop or
218 // we get an exception.
219 auto keep_running = [this]() {
220 std::lock_guard l{lock};
Patrick Williams5d16a8e2023-05-25 12:13:24 -0500221 return !final_stop.stop_requested();
Patrick Williams10483c92022-09-23 11:33:39 -0500222 };
223
224 // If we are suppose to keep running, start the run loop.
225 if (keep_running())
226 {
227 // Start up the worker thread.
228 if (!worker_thread.joinable())
229 {
230 worker_thread = std::thread{[this]() { worker_run(); }};
231 }
232 else
233 {
Patrick Williams5d16a8e2023-05-25 12:13:24 -0500234 // We've already been running and there might a completion pending.
235 // Spawn a new watcher that checks for these.
Patrick Williams10483c92022-09-23 11:33:39 -0500236 spawn_watcher();
237 }
238
239 while (keep_running())
240 {
241 // Handle waiting on all the sd-events.
242 details::wait_process_completion::wait_once(*this);
243 }
244 }
245 else
246 {
Patrick Williams5d16a8e2023-05-25 12:13:24 -0500247 // There might be pending completions still, so spawn a watcher for
248 // them.
Patrick Williams10483c92022-09-23 11:33:39 -0500249 spawn_watcher();
250 }
251}
252
253void context::wait_for_wait_process_stopped()
254{
255 auto worker = std::exchange(pending, nullptr);
256 while (worker == nullptr)
257 {
258 std::lock_guard l{lock};
259 if (wait_process_stopped)
260 {
261 break;
262 }
263
264 worker = std::exchange(staged, nullptr);
265 if (!worker)
266 {
267 std::this_thread::yield();
268 }
269 }
270 if (worker)
271 {
272 worker->stop();
273 wait_process_stopped = true;
274 }
275}
276
Patrick Williams5e36f4a2025-11-06 17:00:08 -0500277void details::wait_process_completion::start() noexcept
Patrick Williams74187662022-08-26 19:28:24 -0500278{
279 // Call process. True indicates something was handled and we do not
280 // need to `wait`, because there might be yet another pending operation
281 // to process, so immediately signal the operation as complete.
282 if (ctx.get_bus().process_discard())
283 {
284 this->complete();
285 return;
286 }
287
Patrick Williams73e278b2022-09-16 08:31:36 -0500288 // We need to call wait now, get the current timeout and stage ourselves
289 // as the next completion.
Patrick Williams74187662022-08-26 19:28:24 -0500290
291 // Get the bus' timeout.
Patrick Williams73e278b2022-09-16 08:31:36 -0500292 uint64_t to_usec = 0;
Patrick Williams1ee60d62023-08-18 12:59:05 -0500293 sd_bus_get_timeout(get_busp(ctx), &to_usec);
Patrick Williams74187662022-08-26 19:28:24 -0500294
Patrick Williams73e278b2022-09-16 08:31:36 -0500295 if (to_usec == UINT64_MAX)
Patrick Williams74187662022-08-26 19:28:24 -0500296 {
297 // sd_bus_get_timeout returns UINT64_MAX to indicate 'wait forever'.
Patrick Williams73e278b2022-09-16 08:31:36 -0500298 // Turn this into -1 for sd-event.
299 timeout = std::chrono::microseconds{-1};
Patrick Williams74187662022-08-26 19:28:24 -0500300 }
301 else
302 {
Patrick Williams73e278b2022-09-16 08:31:36 -0500303 timeout = std::chrono::microseconds(to_usec);
Patrick Williams74187662022-08-26 19:28:24 -0500304 }
305
306 // Assign ourselves as the pending completion and release the caller.
307 std::lock_guard lock{ctx.lock};
Patrick Williams73e278b2022-09-16 08:31:36 -0500308 ctx.staged = this;
Patrick Williams74187662022-08-26 19:28:24 -0500309 ctx.caller_wait.notify_one();
310}
311
312void details::wait_process_completion::wait_once(context& ctx)
313{
Patrick Williams74187662022-08-26 19:28:24 -0500314 // Scope for lock.
315 {
316 std::unique_lock lock{ctx.lock};
317
318 // If there isn't a completion waiting already, wait on the condition
319 // variable for one to show up (we can't call `poll` yet because we
320 // don't have the required parameters).
Patrick Williams73e278b2022-09-16 08:31:36 -0500321 ctx.caller_wait.wait(lock, [&] {
322 return (ctx.pending != nullptr) || (ctx.staged != nullptr) ||
Patrick Williams5d16a8e2023-05-25 12:13:24 -0500323 ctx.final_stop.stop_requested();
Patrick Williams73e278b2022-09-16 08:31:36 -0500324 });
Patrick Williams74187662022-08-26 19:28:24 -0500325
Patrick Williams73e278b2022-09-16 08:31:36 -0500326 // Save the waiter as pending.
327 if (ctx.pending == nullptr)
328 {
329 ctx.pending = std::exchange(ctx.staged, nullptr);
330 }
Patrick Williams74187662022-08-26 19:28:24 -0500331 }
332
Patrick Williams73e278b2022-09-16 08:31:36 -0500333 // Run the event loop to process one request.
Patrick Williams10483c92022-09-23 11:33:39 -0500334 // If the context has been requested to be stopped, skip the event loop.
335 if (!ctx.final_stop.stop_requested() && ctx.pending)
Patrick Williams73e278b2022-09-16 08:31:36 -0500336 {
Patrick Williams10483c92022-09-23 11:33:39 -0500337 ctx.event_loop.run_one(ctx.pending->timeout);
Patrick Williams73e278b2022-09-16 08:31:36 -0500338 }
339}
340
341int context::dbus_event_handle(sd_event_source*, int, uint32_t, void* data)
342{
343 auto self = static_cast<context*>(data);
344
Patrick Williams10483c92022-09-23 11:33:39 -0500345 auto pending = std::exchange(self->pending, nullptr);
Patrick Williams73e278b2022-09-16 08:31:36 -0500346 if (pending != nullptr)
347 {
Patrick Williams10483c92022-09-23 11:33:39 -0500348 pending->complete();
Patrick Williams73e278b2022-09-16 08:31:36 -0500349 }
350
351 return 0;
Patrick Williams74187662022-08-26 19:28:24 -0500352}
353
354} // namespace sdbusplus::async