async: add sleep_for sender
Sometimes it is useful to do the equivalent of
`std::this_thread::sleep_for` in a co-routine context. Add a
sender-based implementation to async.
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I9991eb40b7a1b12e61511f1200bc99fdcdbccf0a
diff --git a/include/sdbusplus/async.hpp b/include/sdbusplus/async.hpp
index 52f1294..70b8abf 100644
--- a/include/sdbusplus/async.hpp
+++ b/include/sdbusplus/async.hpp
@@ -5,3 +5,4 @@
#include <sdbusplus/async/match.hpp>
#include <sdbusplus/async/proxy.hpp>
#include <sdbusplus/async/task.hpp>
+#include <sdbusplus/async/timer.hpp>
diff --git a/include/sdbusplus/async/context.hpp b/include/sdbusplus/async/context.hpp
index 4bd2384..a59dc72 100644
--- a/include/sdbusplus/async/context.hpp
+++ b/include/sdbusplus/async/context.hpp
@@ -16,7 +16,8 @@
namespace details
{
struct wait_process_completion;
-}
+struct context_friend;
+} // namespace details
/** @brief A run-loop context for handling asynchronous dbus operations.
*
@@ -68,7 +69,8 @@
return stop.stop_requested();
}
- friend struct details::wait_process_completion;
+ friend details::wait_process_completion;
+ friend details::context_friend;
private:
bus_t bus;
@@ -117,4 +119,20 @@
}
}
+namespace details
+{
+struct context_friend
+{
+ static event_t& get_event_loop(context& ctx)
+ {
+ return ctx.event_loop;
+ }
+
+ static auto get_scheduler(context& ctx)
+ {
+ return ctx.loop.get_scheduler();
+ }
+};
+} // namespace details
+
} // namespace sdbusplus::async
diff --git a/include/sdbusplus/async/timer.hpp b/include/sdbusplus/async/timer.hpp
new file mode 100644
index 0000000..23d8822
--- /dev/null
+++ b/include/sdbusplus/async/timer.hpp
@@ -0,0 +1,121 @@
+#pragma once
+
+#include <sdbusplus/async/context.hpp>
+#include <sdbusplus/async/execution.hpp>
+#include <sdbusplus/event.hpp>
+
+namespace sdbusplus::async
+{
+
+/** sleep_for Sender
+ *
+ * This Sender performs the equivalent of `std::this_thread::sleep_for`,
+ * in an async context.
+ *
+ * @param[in] ctx The async context.
+ * @param[in] time The length of time to delay.
+ *
+ * @return A sender which completes after time.
+ */
+template <typename Rep, typename Period>
+auto sleep_for(context& ctx, std::chrono::duration<Rep, Period> time);
+
+namespace timer_ns
+{
+
+/* The sleep completion event.
+ *
+ * On start, creates the sd-event timer.
+ * On callback, completes the Receiver.
+ */
+template <execution::receiver R>
+struct sleep_operation : public details::context_friend
+{
+ sleep_operation() = delete;
+ sleep_operation(sleep_operation&&) = delete;
+
+ sleep_operation(context& ctx, event_t::time_resolution time, R&& r) :
+ ctx(ctx), time(time), receiver(std::move(r))
+ {}
+
+ static int handler(sd_event_source*, uint64_t, void* data) noexcept
+ {
+ auto self = static_cast<sleep_operation<R>*>(data);
+ execution::set_value(std::move(self->receiver));
+
+ return 0;
+ }
+
+ friend auto tag_invoke(execution::start_t, sleep_operation& self) noexcept
+ {
+ try
+ {
+ self.source = self.event_loop().add_oneshot_timer(handler, &self,
+ self.time);
+ }
+ catch (...)
+ {
+ execution::set_error(std::move(self.receiver),
+ std::current_exception());
+ }
+ }
+
+ private:
+ event_t& event_loop()
+ {
+ return get_event_loop(ctx);
+ }
+
+ context& ctx;
+ event_t::time_resolution time;
+ event_source_t source;
+ R receiver;
+};
+
+/** The delay Sender.
+ *
+ * On connect, instantiates the completion event.
+ */
+struct sleep_sender : public details::context_friend
+{
+ sleep_sender() = delete;
+
+ sleep_sender(context& ctx, event_t::time_resolution time) noexcept :
+ ctx(ctx), time(time)
+ {}
+
+ friend auto tag_invoke(execution::get_completion_signatures_t,
+ const sleep_sender&, auto)
+ -> execution::completion_signatures<execution::set_value_t()>;
+
+ template <execution::receiver R>
+ friend auto tag_invoke(execution::connect_t, sleep_sender&& self, R r)
+ -> sleep_operation<R>
+ {
+ return {self.ctx, self.time, std::move(r)};
+ }
+
+ static auto sleep_for(context& ctx, event_t::time_resolution time)
+ {
+ // Run the delay sender and then switch back to the worker thread.
+ // The delay completion happens from the sd-event handler, which is
+ // ran on the 'caller' thread.
+
+ return execution::when_all(sleep_sender(ctx, time),
+ execution::schedule(get_scheduler(ctx)));
+ }
+
+ private:
+ context& ctx;
+ event_t::time_resolution time;
+};
+
+} // namespace timer_ns
+
+template <typename Rep, typename Period>
+auto sleep_for(context& ctx, std::chrono::duration<Rep, Period> time)
+{
+ return timer_ns::sleep_sender::sleep_for(
+ ctx, std::chrono::duration_cast<event_t::time_resolution>(time));
+}
+} // namespace sdbusplus::async
diff --git a/include/sdbusplus/event.hpp b/include/sdbusplus/event.hpp
index eede9ae..89784b3 100644
--- a/include/sdbusplus/event.hpp
+++ b/include/sdbusplus/event.hpp
@@ -88,6 +88,8 @@
class event
{
public:
+ using time_resolution = std::chrono::microseconds;
+
event();
event(const event&) = delete;
event(event&& e) = delete;
@@ -98,8 +100,7 @@
}
/** Execute a single iteration of the run-loop (see sd_event_run). */
- void run_one(
- std::chrono::microseconds timeout = std::chrono::microseconds::max());
+ void run_one(time_resolution timeout = time_resolution::max());
/** Force a pending `run_one` to exit. */
void break_run();
@@ -112,9 +113,8 @@
/** Add a one shot timer source to the run-loop. */
source add_oneshot_timer(
- sd_event_time_handler_t handler, void* data,
- std::chrono::microseconds time,
- std::chrono::microseconds accuracy = std::chrono::milliseconds(1));
+ sd_event_time_handler_t handler, void* data, time_resolution time,
+ time_resolution accuracy = std::chrono::milliseconds(1));
friend source;
diff --git a/src/async/context.cpp b/src/async/context.cpp
index 1dc9906..007a266 100644
--- a/src/async/context.cpp
+++ b/src/async/context.cpp
@@ -37,7 +37,7 @@
// Data to share with the worker.
context& ctx;
- std::chrono::microseconds timeout{};
+ event_t::time_resolution timeout{};
static task<> loop(context& ctx);
static void wait_once(context& ctx);
diff --git a/src/event.cpp b/src/event.cpp
index 295191d..badd43a 100644
--- a/src/event.cpp
+++ b/src/event.cpp
@@ -90,7 +90,7 @@
run_condition = add_condition(run_wakeup, this);
}
-void event::run_one(std::chrono::microseconds timeout)
+void event::run_one(time_resolution timeout)
{
auto l = obtain_lock<false>();
@@ -146,8 +146,7 @@
}
source event::add_oneshot_timer(sd_event_time_handler_t handler, void* data,
- std::chrono::microseconds time,
- std::chrono::microseconds accuracy)
+ time_resolution time, time_resolution accuracy)
{
auto l = obtain_lock();
diff --git a/test/async/timer.cpp b/test/async/timer.cpp
new file mode 100644
index 0000000..f377e83
--- /dev/null
+++ b/test/async/timer.cpp
@@ -0,0 +1,24 @@
+#include <sdbusplus/async.hpp>
+
+#include <chrono>
+
+#include <gtest/gtest.h>
+
+using namespace std::literals;
+
+TEST(Timer, DelaySome)
+{
+ static constexpr auto timeout = 500ms;
+
+ sdbusplus::async::context ctx;
+
+ auto start = std::chrono::steady_clock::now();
+
+ ctx.run(sdbusplus::async::sleep_for(ctx, timeout) |
+ std::execution::then([&ctx]() { ctx.request_stop(); }));
+
+ auto stop = std::chrono::steady_clock::now();
+
+ EXPECT_GT(stop - start, timeout);
+ EXPECT_LT(stop - start, timeout * 2);
+}
diff --git a/test/meson.build b/test/meson.build
index c2e937a..0181766 100644
--- a/test/meson.build
+++ b/test/meson.build
@@ -22,6 +22,7 @@
tests = [
'async/context',
'async/task',
+ 'async/timer',
'bus/list_names',
'bus/match',
'event/event',