event: add a simple wrapper around sd-event
In order to facilitate stop-conditions on the async::context,
we need to be able to escape the wait on the dbus fd. sd-event
will allow us to do this and also build other co-routine primitives
in the future (such as `co_await async::delay(1s)` backed by sd-event
timers). Define a simple wrapper around sd-event here which can
be leveraged by the async framework.
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I945416cbce3c98d98135c42cb7ca249a57cb9a60
diff --git a/include/sdbusplus/event.hpp b/include/sdbusplus/event.hpp
new file mode 100644
index 0000000..e1e99ec
--- /dev/null
+++ b/include/sdbusplus/event.hpp
@@ -0,0 +1,141 @@
+#pragma once
+
+#include <systemd/sd-event.h>
+
+#include <chrono>
+#include <mutex>
+#include <utility>
+
+namespace sdbusplus
+{
+namespace event
+{
+class event;
+
+/** RAII holder for sd_event_sources */
+class source
+{
+ public:
+ friend event;
+
+ source() = default;
+ explicit source(event& e) : ev(&e) {}
+
+ source(const source&) = delete;
+ source(source&&);
+ source& operator=(const source&) = delete;
+ source& operator=(source&&);
+ ~source();
+
+ private:
+ source(event& e, sd_event_source*& s) : ev(&e)
+ {
+ sourcep = std::exchange(s, nullptr);
+ }
+
+ event* ev = nullptr;
+ sd_event_source* sourcep = nullptr;
+};
+
+/** sd-event wrapper for eventfd
+ *
+ * This can be used to create something similar to a std::condition_variable
+ * but backed by sd-event.
+ */
+class condition
+{
+ public:
+ friend event;
+
+ condition() = delete;
+ explicit condition(event& e) : condition_source(e){};
+ condition(const condition&) = delete;
+ condition(condition&&);
+
+ condition& operator=(const condition&) = delete;
+ condition& operator=(condition&&);
+
+ ~condition()
+ {
+ if (fd >= 0)
+ {
+ close(fd);
+ }
+ }
+
+ /** Increment the signal count on the eventfd. */
+ void signal();
+ /** Acknowledge all pending signals on the eventfd. */
+ void ack();
+
+ private:
+ condition(source&& s, int&& f) :
+ condition_source(std::move(s)), fd(std::exchange(f, -1))
+ {}
+
+ source condition_source;
+ int fd = -1;
+};
+
+/** sd-event based run-loop implementation.
+ *
+ * This is sd-event is thread-safe in the sense that one thread may be
+ * executing 'run_one' while other threads create (or destruct) additional
+ * sources. This might result in the 'run_one' exiting having done no
+ * work, but the state of the underlying sd-event structures is kept
+ * thread-safe.
+ */
+class event
+{
+ public:
+ event();
+ event(const event&) = delete;
+ event(event&& e) = delete;
+
+ ~event()
+ {
+ sd_event_unref(eventp);
+ }
+
+ /** Execute a single iteration of the run-loop (see sd_event_run). */
+ void run_one(
+ std::chrono::microseconds timeout = std::chrono::microseconds::max());
+ /** Force a pending `run_one` to exit. */
+ void break_run();
+
+ /** Add a file-descriptor source to the sd-event (see sd_event_add_io). */
+ source add_io(int fd, uint32_t events, sd_event_io_handler_t handler,
+ void* data);
+
+ /** Add a eventfd-based sdbusplus::event::condition to the run-loop. */
+ condition add_condition(sd_event_io_handler_t handler, void* data);
+
+ friend source;
+
+ private:
+ static int run_wakeup(sd_event_source*, int, uint32_t, void*);
+
+ sd_event* eventp = nullptr;
+
+ // Condition to allow 'break_run' to exit the run-loop.
+ condition run_condition{*this};
+
+ // Lock for the sd_event.
+ //
+ // There are cases where we need to lock the mutex from inside the context
+ // of a sd-event callback, while the lock is already held. Use a
+ // recursive_mutex to allow this.
+ std::recursive_mutex lock{};
+
+ // Safely get the lock, possibly signaling the running 'run_one' to exit.
+ template <bool Signal = true>
+ std::unique_lock<std::recursive_mutex> obtain_lock();
+};
+
+} // namespace event
+
+using event_t = event::event;
+using event_source_t = event::source;
+using event_cond_t = event::condition;
+
+} // namespace sdbusplus
diff --git a/meson.build b/meson.build
index 6239a93..af8cdf3 100644
--- a/meson.build
+++ b/meson.build
@@ -27,6 +27,7 @@
'src/async/context.cpp',
'src/async/match.cpp',
'src/bus.cpp',
+ 'src/event.cpp',
'src/exception.cpp',
'src/message/native_types.cpp',
'src/sdbus.cpp',
diff --git a/src/event.cpp b/src/event.cpp
new file mode 100644
index 0000000..e9c11af
--- /dev/null
+++ b/src/event.cpp
@@ -0,0 +1,176 @@
+#include <sys/eventfd.h>
+
+#include <sdbusplus/event.hpp>
+#include <sdbusplus/exception.hpp>
+
+namespace sdbusplus::event
+{
+
+source::source(source&& s)
+{
+ if (&s == this)
+ {
+ return;
+ }
+ ev = std::exchange(s.ev, nullptr);
+ sourcep = std::exchange(s.sourcep, nullptr);
+}
+
+source& source::operator=(source&& s)
+{
+ if (nullptr != sourcep)
+ {
+ auto l = ev->obtain_lock();
+ sd_event_source_unref(sourcep);
+ }
+ ev = std::exchange(s.ev, nullptr);
+ sourcep = std::exchange(s.sourcep, nullptr);
+
+ return *this;
+}
+
+source::~source()
+{
+ if (nullptr != sourcep)
+ {
+ auto l = ev->obtain_lock();
+ sd_event_source_unref(sourcep);
+ }
+}
+
+condition::condition(condition&& c)
+{
+ if (&c == this)
+ {
+ return;
+ }
+
+ condition_source = std::move(c.condition_source);
+ fd = std::exchange(c.fd, -1);
+}
+
+condition& condition::operator=(condition&& c)
+{
+ condition_source = std::move(c.condition_source);
+ if (fd >= 0)
+ {
+ close(fd);
+ }
+ fd = std::exchange(c.fd, -1);
+
+ return *this;
+}
+
+void condition::signal()
+{
+ uint64_t value = 1;
+ auto rc = write(fd, &value, sizeof(value));
+ if (rc < static_cast<decltype(rc)>(sizeof(value)))
+ {
+ throw exception::SdBusError(errno, __func__);
+ }
+}
+
+void condition::ack()
+{
+ uint64_t value = 0;
+ auto rc = read(fd, &value, sizeof(value));
+ if (rc < static_cast<decltype(rc)>(sizeof(value)))
+ {
+ throw exception::SdBusError(errno, __func__);
+ }
+}
+
+event::event()
+{
+ if (auto rc = sd_event_new(&eventp); rc < 0)
+ {
+ throw exception::SdBusError(-rc, __func__);
+ }
+ run_condition = add_condition(run_wakeup, this);
+}
+
+void event::run_one(std::chrono::microseconds timeout)
+{
+ auto l = obtain_lock<false>();
+
+ auto rc = sd_event_run(eventp, static_cast<uint64_t>(timeout.count()));
+ if (rc < 0)
+ {
+ throw exception::SdBusError(-rc, __func__);
+ }
+}
+
+void event::break_run()
+{
+ run_condition.signal();
+}
+
+source event::add_io(int fd, uint32_t events, sd_event_io_handler_t handler,
+ void* data)
+{
+ auto l = obtain_lock();
+
+ source s{*this};
+
+ auto rc = sd_event_add_io(eventp, &s.sourcep, fd, events, handler, data);
+ if (rc < 0)
+ {
+ throw exception::SdBusError(-rc, __func__);
+ }
+
+ return s;
+}
+
+condition event::add_condition(sd_event_io_handler_t handler, void* data)
+{
+ // We don't need any locks here because we only touch the sd_event
+ // indirectly through `add_io` which handles its own locking.
+
+ auto fd = eventfd(0, 0);
+ if (fd < 0)
+ {
+ throw exception::SdBusError(errno, __func__);
+ }
+
+ try
+ {
+ auto io = add_io(fd, EPOLLIN, handler, data);
+ return {std::move(io), std::move(fd)};
+ }
+ catch (...)
+ {
+ close(fd);
+ throw;
+ }
+}
+
+int event::run_wakeup(sd_event_source*, int, uint32_t, void* data)
+{
+ auto self = static_cast<event*>(data);
+ self->run_condition.ack();
+
+ return 0;
+}
+
+template <bool Signal>
+std::unique_lock<std::recursive_mutex> event::obtain_lock()
+{
+ std::unique_lock<std::recursive_mutex> l{this->lock, std::defer_lock_t()};
+ if constexpr (Signal)
+ {
+ if (!l.try_lock())
+ {
+ run_condition.signal();
+ l.lock();
+ }
+ }
+ else
+ {
+ l.lock();
+ }
+
+ return l;
+}
+
+} // namespace sdbusplus::event
diff --git a/test/event/event.cpp b/test/event/event.cpp
new file mode 100644
index 0000000..dbe470c
--- /dev/null
+++ b/test/event/event.cpp
@@ -0,0 +1,58 @@
+#include <sdbusplus/event.hpp>
+
+#include <chrono>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+using namespace std::literals::chrono_literals;
+
+struct Event : public testing::Test
+{
+ sdbusplus::event_t ev{};
+};
+
+TEST_F(Event, TimeoutWorks)
+{
+ static constexpr auto timeout = 250ms;
+
+ auto start = std::chrono::steady_clock::now();
+ ev.run_one(timeout);
+ auto stop = std::chrono::steady_clock::now();
+
+ EXPECT_TRUE(stop - start > timeout);
+ EXPECT_TRUE(stop - start < timeout * 2);
+}
+
+TEST_F(Event, Runnable)
+{
+ static constexpr auto timeout = 10s;
+
+ std::jthread j{[&]() { ev.break_run(); }};
+
+ auto start = std::chrono::steady_clock::now();
+ ev.run_one(timeout);
+ auto stop = std::chrono::steady_clock::now();
+
+ EXPECT_TRUE(stop - start < timeout);
+}
+
+TEST_F(Event, ConditionSignals)
+{
+ struct run
+ {
+ static int _(sd_event_source*, int, uint32_t, void* data)
+ {
+ *static_cast<bool*>(data) = true;
+ return 0;
+ }
+ };
+ bool ran = false;
+
+ auto c = ev.add_condition(run::_, &ran);
+ std::jthread j{[&]() { c.signal(); }};
+
+ ev.run_one();
+ EXPECT_TRUE(ran);
+ c.ack();
+}
diff --git a/test/meson.build b/test/meson.build
index 4d3114c..e62c1e7 100644
--- a/test/meson.build
+++ b/test/meson.build
@@ -23,6 +23,7 @@
'async/task',
'bus/list_names',
'bus/match',
+ 'event/event',
'exception/sdbus_error',
'message/append',
'message/call',