event: add a simple wrapper around sd-event

In order to facilitate stop-conditions on the async::context,
we need to be able to escape the wait on the dbus fd.  sd-event
will allow us to do this and also build other co-routine primitives
in the future (such as `co_await async::delay(1s)` backed by sd-event
timers).  Define a simple wrapper around sd-event here which can
be leveraged by the async framework.

Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I945416cbce3c98d98135c42cb7ca249a57cb9a60
diff --git a/include/sdbusplus/event.hpp b/include/sdbusplus/event.hpp
new file mode 100644
index 0000000..e1e99ec
--- /dev/null
+++ b/include/sdbusplus/event.hpp
@@ -0,0 +1,141 @@
+#pragma once
+
+#include <systemd/sd-event.h>
+
+#include <chrono>
+#include <mutex>
+#include <utility>
+
+namespace sdbusplus
+{
+namespace event
+{
+class event;
+
+/** RAII holder for sd_event_sources */
+class source
+{
+  public:
+    friend event;
+
+    source() = default;
+    explicit source(event& e) : ev(&e) {}
+
+    source(const source&) = delete;
+    source(source&&);
+    source& operator=(const source&) = delete;
+    source& operator=(source&&);
+    ~source();
+
+  private:
+    source(event& e, sd_event_source*& s) : ev(&e)
+    {
+        sourcep = std::exchange(s, nullptr);
+    }
+
+    event* ev = nullptr;
+    sd_event_source* sourcep = nullptr;
+};
+
+/** sd-event wrapper for eventfd
+ *
+ *  This can be used to create something similar to a std::condition_variable
+ *  but backed by sd-event.
+ */
+class condition
+{
+  public:
+    friend event;
+
+    condition() = delete;
+    explicit condition(event& e) : condition_source(e){};
+    condition(const condition&) = delete;
+    condition(condition&&);
+
+    condition& operator=(const condition&) = delete;
+    condition& operator=(condition&&);
+
+    ~condition()
+    {
+        if (fd >= 0)
+        {
+            close(fd);
+        }
+    }
+
+    /** Increment the signal count on the eventfd. */
+    void signal();
+    /** Acknowledge all pending signals on the eventfd. */
+    void ack();
+
+  private:
+    condition(source&& s, int&& f) :
+        condition_source(std::move(s)), fd(std::exchange(f, -1))
+    {}
+
+    source condition_source;
+    int fd = -1;
+};
+
+/** sd-event based run-loop implementation.
+ *
+ *  This is sd-event is thread-safe in the sense that one thread may be
+ *  executing 'run_one' while other threads create (or destruct) additional
+ *  sources.  This might result in the 'run_one' exiting having done no
+ *  work, but the state of the underlying sd-event structures is kept
+ *  thread-safe.
+ */
+class event
+{
+  public:
+    event();
+    event(const event&) = delete;
+    event(event&& e) = delete;
+
+    ~event()
+    {
+        sd_event_unref(eventp);
+    }
+
+    /** Execute a single iteration of the run-loop (see sd_event_run). */
+    void run_one(
+        std::chrono::microseconds timeout = std::chrono::microseconds::max());
+    /** Force a pending `run_one` to exit. */
+    void break_run();
+
+    /** Add a file-descriptor source to the sd-event (see sd_event_add_io). */
+    source add_io(int fd, uint32_t events, sd_event_io_handler_t handler,
+                  void* data);
+
+    /** Add a eventfd-based sdbusplus::event::condition to the run-loop. */
+    condition add_condition(sd_event_io_handler_t handler, void* data);
+
+    friend source;
+
+  private:
+    static int run_wakeup(sd_event_source*, int, uint32_t, void*);
+
+    sd_event* eventp = nullptr;
+
+    // Condition to allow 'break_run' to exit the run-loop.
+    condition run_condition{*this};
+
+    // Lock for the sd_event.
+    //
+    // There are cases where we need to lock the mutex from inside the context
+    // of a sd-event callback, while the lock is already held.  Use a
+    // recursive_mutex to allow this.
+    std::recursive_mutex lock{};
+
+    // Safely get the lock, possibly signaling the running 'run_one' to exit.
+    template <bool Signal = true>
+    std::unique_lock<std::recursive_mutex> obtain_lock();
+};
+
+} // namespace event
+
+using event_t = event::event;
+using event_source_t = event::source;
+using event_cond_t = event::condition;
+
+} // namespace sdbusplus
diff --git a/meson.build b/meson.build
index 6239a93..af8cdf3 100644
--- a/meson.build
+++ b/meson.build
@@ -27,6 +27,7 @@
     'src/async/context.cpp',
     'src/async/match.cpp',
     'src/bus.cpp',
+    'src/event.cpp',
     'src/exception.cpp',
     'src/message/native_types.cpp',
     'src/sdbus.cpp',
diff --git a/src/event.cpp b/src/event.cpp
new file mode 100644
index 0000000..e9c11af
--- /dev/null
+++ b/src/event.cpp
@@ -0,0 +1,176 @@
+#include <sys/eventfd.h>
+
+#include <sdbusplus/event.hpp>
+#include <sdbusplus/exception.hpp>
+
+namespace sdbusplus::event
+{
+
+source::source(source&& s)
+{
+    if (&s == this)
+    {
+        return;
+    }
+    ev = std::exchange(s.ev, nullptr);
+    sourcep = std::exchange(s.sourcep, nullptr);
+}
+
+source& source::operator=(source&& s)
+{
+    if (nullptr != sourcep)
+    {
+        auto l = ev->obtain_lock();
+        sd_event_source_unref(sourcep);
+    }
+    ev = std::exchange(s.ev, nullptr);
+    sourcep = std::exchange(s.sourcep, nullptr);
+
+    return *this;
+}
+
+source::~source()
+{
+    if (nullptr != sourcep)
+    {
+        auto l = ev->obtain_lock();
+        sd_event_source_unref(sourcep);
+    }
+}
+
+condition::condition(condition&& c)
+{
+    if (&c == this)
+    {
+        return;
+    }
+
+    condition_source = std::move(c.condition_source);
+    fd = std::exchange(c.fd, -1);
+}
+
+condition& condition::operator=(condition&& c)
+{
+    condition_source = std::move(c.condition_source);
+    if (fd >= 0)
+    {
+        close(fd);
+    }
+    fd = std::exchange(c.fd, -1);
+
+    return *this;
+}
+
+void condition::signal()
+{
+    uint64_t value = 1;
+    auto rc = write(fd, &value, sizeof(value));
+    if (rc < static_cast<decltype(rc)>(sizeof(value)))
+    {
+        throw exception::SdBusError(errno, __func__);
+    }
+}
+
+void condition::ack()
+{
+    uint64_t value = 0;
+    auto rc = read(fd, &value, sizeof(value));
+    if (rc < static_cast<decltype(rc)>(sizeof(value)))
+    {
+        throw exception::SdBusError(errno, __func__);
+    }
+}
+
+event::event()
+{
+    if (auto rc = sd_event_new(&eventp); rc < 0)
+    {
+        throw exception::SdBusError(-rc, __func__);
+    }
+    run_condition = add_condition(run_wakeup, this);
+}
+
+void event::run_one(std::chrono::microseconds timeout)
+{
+    auto l = obtain_lock<false>();
+
+    auto rc = sd_event_run(eventp, static_cast<uint64_t>(timeout.count()));
+    if (rc < 0)
+    {
+        throw exception::SdBusError(-rc, __func__);
+    }
+}
+
+void event::break_run()
+{
+    run_condition.signal();
+}
+
+source event::add_io(int fd, uint32_t events, sd_event_io_handler_t handler,
+                     void* data)
+{
+    auto l = obtain_lock();
+
+    source s{*this};
+
+    auto rc = sd_event_add_io(eventp, &s.sourcep, fd, events, handler, data);
+    if (rc < 0)
+    {
+        throw exception::SdBusError(-rc, __func__);
+    }
+
+    return s;
+}
+
+condition event::add_condition(sd_event_io_handler_t handler, void* data)
+{
+    // We don't need any locks here because we only touch the sd_event
+    // indirectly through `add_io` which handles its own locking.
+
+    auto fd = eventfd(0, 0);
+    if (fd < 0)
+    {
+        throw exception::SdBusError(errno, __func__);
+    }
+
+    try
+    {
+        auto io = add_io(fd, EPOLLIN, handler, data);
+        return {std::move(io), std::move(fd)};
+    }
+    catch (...)
+    {
+        close(fd);
+        throw;
+    }
+}
+
+int event::run_wakeup(sd_event_source*, int, uint32_t, void* data)
+{
+    auto self = static_cast<event*>(data);
+    self->run_condition.ack();
+
+    return 0;
+}
+
+template <bool Signal>
+std::unique_lock<std::recursive_mutex> event::obtain_lock()
+{
+    std::unique_lock<std::recursive_mutex> l{this->lock, std::defer_lock_t()};
+    if constexpr (Signal)
+    {
+        if (!l.try_lock())
+        {
+            run_condition.signal();
+            l.lock();
+        }
+    }
+    else
+    {
+        l.lock();
+    }
+
+    return l;
+}
+
+} // namespace sdbusplus::event
diff --git a/test/event/event.cpp b/test/event/event.cpp
new file mode 100644
index 0000000..dbe470c
--- /dev/null
+++ b/test/event/event.cpp
@@ -0,0 +1,58 @@
+#include <sdbusplus/event.hpp>
+
+#include <chrono>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+using namespace std::literals::chrono_literals;
+
+struct Event : public testing::Test
+{
+    sdbusplus::event_t ev{};
+};
+
+TEST_F(Event, TimeoutWorks)
+{
+    static constexpr auto timeout = 250ms;
+
+    auto start = std::chrono::steady_clock::now();
+    ev.run_one(timeout);
+    auto stop = std::chrono::steady_clock::now();
+
+    EXPECT_TRUE(stop - start > timeout);
+    EXPECT_TRUE(stop - start < timeout * 2);
+}
+
+TEST_F(Event, Runnable)
+{
+    static constexpr auto timeout = 10s;
+
+    std::jthread j{[&]() { ev.break_run(); }};
+
+    auto start = std::chrono::steady_clock::now();
+    ev.run_one(timeout);
+    auto stop = std::chrono::steady_clock::now();
+
+    EXPECT_TRUE(stop - start < timeout);
+}
+
+TEST_F(Event, ConditionSignals)
+{
+    struct run
+    {
+        static int _(sd_event_source*, int, uint32_t, void* data)
+        {
+            *static_cast<bool*>(data) = true;
+            return 0;
+        }
+    };
+    bool ran = false;
+
+    auto c = ev.add_condition(run::_, &ran);
+    std::jthread j{[&]() { c.signal(); }};
+
+    ev.run_one();
+    EXPECT_TRUE(ran);
+    c.ack();
+}
diff --git a/test/meson.build b/test/meson.build
index 4d3114c..e62c1e7 100644
--- a/test/meson.build
+++ b/test/meson.build
@@ -23,6 +23,7 @@
     'async/task',
     'bus/list_names',
     'bus/match',
+    'event/event',
     'exception/sdbus_error',
     'message/append',
     'message/call',