async: add sleep_for sender

Sometimes it is useful to do the equivalent of
`std::this_thread::sleep_for` in a co-routine context.  Add a
sender-based implementation to async.

Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I9991eb40b7a1b12e61511f1200bc99fdcdbccf0a
diff --git a/include/sdbusplus/async.hpp b/include/sdbusplus/async.hpp
index 52f1294..70b8abf 100644
--- a/include/sdbusplus/async.hpp
+++ b/include/sdbusplus/async.hpp
@@ -5,3 +5,4 @@
 #include <sdbusplus/async/match.hpp>
 #include <sdbusplus/async/proxy.hpp>
 #include <sdbusplus/async/task.hpp>
+#include <sdbusplus/async/timer.hpp>
diff --git a/include/sdbusplus/async/context.hpp b/include/sdbusplus/async/context.hpp
index 4bd2384..a59dc72 100644
--- a/include/sdbusplus/async/context.hpp
+++ b/include/sdbusplus/async/context.hpp
@@ -16,7 +16,8 @@
 namespace details
 {
 struct wait_process_completion;
-}
+struct context_friend;
+} // namespace details
 
 /** @brief A run-loop context for handling asynchronous dbus operations.
  *
@@ -68,7 +69,8 @@
         return stop.stop_requested();
     }
 
-    friend struct details::wait_process_completion;
+    friend details::wait_process_completion;
+    friend details::context_friend;
 
   private:
     bus_t bus;
@@ -117,4 +119,20 @@
     }
 }
 
+namespace details
+{
+struct context_friend
+{
+    static event_t& get_event_loop(context& ctx)
+    {
+        return ctx.event_loop;
+    }
+
+    static auto get_scheduler(context& ctx)
+    {
+        return ctx.loop.get_scheduler();
+    }
+};
+} // namespace details
+
 } // namespace sdbusplus::async
diff --git a/include/sdbusplus/async/timer.hpp b/include/sdbusplus/async/timer.hpp
new file mode 100644
index 0000000..23d8822
--- /dev/null
+++ b/include/sdbusplus/async/timer.hpp
@@ -0,0 +1,121 @@
+#pragma once
+
+#include <sdbusplus/async/context.hpp>
+#include <sdbusplus/async/execution.hpp>
+#include <sdbusplus/event.hpp>
+
+namespace sdbusplus::async
+{
+
+/** sleep_for Sender
+ *
+ *  This Sender performs the equivalent of `std::this_thread::sleep_for`,
+ *  in an async context.
+ *
+ *  @param[in] ctx The async context.
+ *  @param[in] time The length of time to delay.
+ *
+ *  @return A sender which completes after time.
+ */
+template <typename Rep, typename Period>
+auto sleep_for(context& ctx, std::chrono::duration<Rep, Period> time);
+
+namespace timer_ns
+{
+
+/* The sleep completion event.
+ *
+ * On start, creates the sd-event timer.
+ * On callback, completes the Receiver.
+ */
+template <execution::receiver R>
+struct sleep_operation : public details::context_friend
+{
+    sleep_operation() = delete;
+    sleep_operation(sleep_operation&&) = delete;
+
+    sleep_operation(context& ctx, event_t::time_resolution time, R&& r) :
+        ctx(ctx), time(time), receiver(std::move(r))
+    {}
+
+    static int handler(sd_event_source*, uint64_t, void* data) noexcept
+    {
+        auto self = static_cast<sleep_operation<R>*>(data);
+        execution::set_value(std::move(self->receiver));
+
+        return 0;
+    }
+
+    friend auto tag_invoke(execution::start_t, sleep_operation& self) noexcept
+    {
+        try
+        {
+            self.source = self.event_loop().add_oneshot_timer(handler, &self,
+                                                              self.time);
+        }
+        catch (...)
+        {
+            execution::set_error(std::move(self.receiver),
+                                 std::current_exception());
+        }
+    }
+
+  private:
+    event_t& event_loop()
+    {
+        return get_event_loop(ctx);
+    }
+
+    context& ctx;
+    event_t::time_resolution time;
+    event_source_t source;
+    R receiver;
+};
+
+/** The delay Sender.
+ *
+ *  On connect, instantiates the completion event.
+ */
+struct sleep_sender : public details::context_friend
+{
+    sleep_sender() = delete;
+
+    sleep_sender(context& ctx, event_t::time_resolution time) noexcept :
+        ctx(ctx), time(time)
+    {}
+
+    friend auto tag_invoke(execution::get_completion_signatures_t,
+                           const sleep_sender&, auto)
+        -> execution::completion_signatures<execution::set_value_t()>;
+
+    template <execution::receiver R>
+    friend auto tag_invoke(execution::connect_t, sleep_sender&& self, R r)
+        -> sleep_operation<R>
+    {
+        return {self.ctx, self.time, std::move(r)};
+    }
+
+    static auto sleep_for(context& ctx, event_t::time_resolution time)
+    {
+        // Run the delay sender and then switch back to the worker thread.
+        // The delay completion happens from the sd-event handler, which is
+        // ran on the 'caller' thread.
+
+        return execution::when_all(sleep_sender(ctx, time),
+                                   execution::schedule(get_scheduler(ctx)));
+    }
+
+  private:
+    context& ctx;
+    event_t::time_resolution time;
+};
+
+} // namespace timer_ns
+
+template <typename Rep, typename Period>
+auto sleep_for(context& ctx, std::chrono::duration<Rep, Period> time)
+{
+    return timer_ns::sleep_sender::sleep_for(
+        ctx, std::chrono::duration_cast<event_t::time_resolution>(time));
+}
+} // namespace sdbusplus::async
diff --git a/include/sdbusplus/event.hpp b/include/sdbusplus/event.hpp
index eede9ae..89784b3 100644
--- a/include/sdbusplus/event.hpp
+++ b/include/sdbusplus/event.hpp
@@ -88,6 +88,8 @@
 class event
 {
   public:
+    using time_resolution = std::chrono::microseconds;
+
     event();
     event(const event&) = delete;
     event(event&& e) = delete;
@@ -98,8 +100,7 @@
     }
 
     /** Execute a single iteration of the run-loop (see sd_event_run). */
-    void run_one(
-        std::chrono::microseconds timeout = std::chrono::microseconds::max());
+    void run_one(time_resolution timeout = time_resolution::max());
     /** Force a pending `run_one` to exit. */
     void break_run();
 
@@ -112,9 +113,8 @@
 
     /** Add a one shot timer source to the run-loop. */
     source add_oneshot_timer(
-        sd_event_time_handler_t handler, void* data,
-        std::chrono::microseconds time,
-        std::chrono::microseconds accuracy = std::chrono::milliseconds(1));
+        sd_event_time_handler_t handler, void* data, time_resolution time,
+        time_resolution accuracy = std::chrono::milliseconds(1));
 
     friend source;
 
diff --git a/src/async/context.cpp b/src/async/context.cpp
index 1dc9906..007a266 100644
--- a/src/async/context.cpp
+++ b/src/async/context.cpp
@@ -37,7 +37,7 @@
 
     // Data to share with the worker.
     context& ctx;
-    std::chrono::microseconds timeout{};
+    event_t::time_resolution timeout{};
 
     static task<> loop(context& ctx);
     static void wait_once(context& ctx);
diff --git a/src/event.cpp b/src/event.cpp
index 295191d..badd43a 100644
--- a/src/event.cpp
+++ b/src/event.cpp
@@ -90,7 +90,7 @@
     run_condition = add_condition(run_wakeup, this);
 }
 
-void event::run_one(std::chrono::microseconds timeout)
+void event::run_one(time_resolution timeout)
 {
     auto l = obtain_lock<false>();
 
@@ -146,8 +146,7 @@
 }
 
 source event::add_oneshot_timer(sd_event_time_handler_t handler, void* data,
-                                std::chrono::microseconds time,
-                                std::chrono::microseconds accuracy)
+                                time_resolution time, time_resolution accuracy)
 {
     auto l = obtain_lock();
 
diff --git a/test/async/timer.cpp b/test/async/timer.cpp
new file mode 100644
index 0000000..f377e83
--- /dev/null
+++ b/test/async/timer.cpp
@@ -0,0 +1,24 @@
+#include <sdbusplus/async.hpp>
+
+#include <chrono>
+
+#include <gtest/gtest.h>
+
+using namespace std::literals;
+
+TEST(Timer, DelaySome)
+{
+    static constexpr auto timeout = 500ms;
+
+    sdbusplus::async::context ctx;
+
+    auto start = std::chrono::steady_clock::now();
+
+    ctx.run(sdbusplus::async::sleep_for(ctx, timeout) |
+            std::execution::then([&ctx]() { ctx.request_stop(); }));
+
+    auto stop = std::chrono::steady_clock::now();
+
+    EXPECT_GT(stop - start, timeout);
+    EXPECT_LT(stop - start, timeout * 2);
+}
diff --git a/test/meson.build b/test/meson.build
index c2e937a..0181766 100644
--- a/test/meson.build
+++ b/test/meson.build
@@ -22,6 +22,7 @@
 tests = [
     'async/context',
     'async/task',
+    'async/timer',
     'bus/list_names',
     'bus/match',
     'event/event',