add async fd sender receiver for coroutines
Add async sender receiver for file descriptor based events. The user of
the async file descriptor needs to initialize a fdio instance and then
call next() to get each new event for the fd.
Tested:
```
> meson test -C builddir test_async_fdio
ninja: Entering directory `/host/repos/sdbusplus/builddir'
ninja: no work to do.
1/1 test_async_fdio OK 6.01s
Ok: 1
Expected Fail: 0
Fail: 0
Unexpected Pass: 0
Skipped: 0
Timeout: 0
Full log written to /host/repos/sdbusplus/builddir/meson-logs/testlog.txt
```
Change-Id: I1b4f16963e6096f30484c4a6df471e64ed24448b
Signed-off-by: Jagpal Singh Gill <paligill@gmail.com>
diff --git a/include/sdbusplus/async.hpp b/include/sdbusplus/async.hpp
index 70b8abf..31b3a99 100644
--- a/include/sdbusplus/async.hpp
+++ b/include/sdbusplus/async.hpp
@@ -2,6 +2,7 @@
#include <sdbusplus/async/context.hpp>
#include <sdbusplus/async/execution.hpp>
+#include <sdbusplus/async/fdio.hpp>
#include <sdbusplus/async/match.hpp>
#include <sdbusplus/async/proxy.hpp>
#include <sdbusplus/async/task.hpp>
diff --git a/include/sdbusplus/async/fdio.hpp b/include/sdbusplus/async/fdio.hpp
new file mode 100644
index 0000000..d6f7673
--- /dev/null
+++ b/include/sdbusplus/async/fdio.hpp
@@ -0,0 +1,133 @@
+#pragma once
+
+#include <sdbusplus/async/context.hpp>
+#include <sdbusplus/async/execution.hpp>
+#include <sdbusplus/event.hpp>
+
+namespace sdbusplus::async
+{
+
+namespace fdio_ns
+{
+struct fdio_completion;
+}
+
+class fdio : private context_ref, details::context_friend
+{
+ public:
+ fdio() = delete;
+ fdio(const fdio&) = delete;
+ fdio& operator=(const fdio&) = delete;
+ fdio(fdio&&) = delete;
+ fdio& operator=(fdio&&) = delete;
+ ~fdio() = default;
+
+ /** Construct a new fdio object from a context and a file descriptor. */
+ fdio(context& ctx, int fd);
+
+ /** Get the next fd event.
+ * Note: the implementation only supports a single awaiting task. Two
+ * tasks should not share this object and both call `next`.
+ */
+ auto next() noexcept;
+
+ friend fdio_ns::fdio_completion;
+
+ private:
+ int fd;
+ event_source_t source;
+ std::mutex lock{};
+ fdio_ns::fdio_completion* complete{nullptr};
+
+ event_t& event_loop()
+ {
+ return get_event_loop(ctx);
+ }
+
+ void handleEvent() noexcept;
+};
+
+namespace fdio_ns
+{
+
+struct fdio_completion
+{
+ fdio_completion() = delete;
+ fdio_completion(const fdio_completion&) = delete;
+ fdio_completion& operator=(const fdio_completion&) = delete;
+ fdio_completion(fdio_completion&&) = delete;
+
+ explicit fdio_completion(fdio& fdioInstance) noexcept :
+ fdioInstance(fdioInstance) {};
+ ~fdio_completion();
+
+ friend fdio;
+
+ friend void tag_invoke(execution::start_t, fdio_completion& self) noexcept
+ {
+ self.arm();
+ }
+
+ private:
+ virtual void complete() noexcept = 0;
+ virtual void stop() noexcept = 0;
+ void arm() noexcept;
+
+ fdio& fdioInstance;
+};
+
+// Implementation (templated based on Receiver) of fdio_completion.
+template <execution::receiver Receiver>
+struct fdio_operation : fdio_completion
+{
+ fdio_operation(fdio& fdioInstance, Receiver r) :
+ fdio_completion(fdioInstance), receiver(std::move(r))
+ {}
+
+ private:
+ void complete() noexcept override final
+ {
+ execution::set_value(std::move(receiver));
+ }
+
+ void stop() noexcept override final
+ {
+ execution::set_stopped(std::move(receiver));
+ }
+
+ Receiver receiver;
+};
+
+// fdio Sender implementation.
+struct fdio_sender
+{
+ using is_sender = void;
+
+ fdio_sender() = delete;
+ explicit fdio_sender(fdio& fdioInstance) noexcept :
+ fdioInstance(fdioInstance) {};
+
+ friend auto tag_invoke(
+ execution::get_completion_signatures_t, const fdio_sender&,
+ auto) -> execution::completion_signatures<execution::set_value_t(),
+ execution::set_stopped_t()>;
+
+ template <execution::receiver R>
+ friend auto tag_invoke(execution::connect_t, fdio_sender&& self,
+ R r) -> fdio_operation<R>
+ {
+ return {self.fdioInstance, std::move(r)};
+ }
+
+ private:
+ fdio& fdioInstance;
+};
+
+} // namespace fdio_ns
+
+inline auto fdio::next() noexcept
+{
+ return fdio_ns::fdio_sender{*this};
+}
+
+} // namespace sdbusplus::async
diff --git a/meson.build b/meson.build
index 598ad64..613f9ca 100644
--- a/meson.build
+++ b/meson.build
@@ -27,6 +27,7 @@
libsdbusplus_src = files(
'src/async/context.cpp',
'src/async/match.cpp',
+ 'src/async/fdio.cpp',
'src/bus.cpp',
'src/bus/match.cpp',
'src/event.cpp',
diff --git a/src/async/fdio.cpp b/src/async/fdio.cpp
new file mode 100644
index 0000000..641d6a6
--- /dev/null
+++ b/src/async/fdio.cpp
@@ -0,0 +1,72 @@
+#include <sdbusplus/async/fdio.hpp>
+
+namespace sdbusplus::async
+{
+fdio::fdio(context& ctx, int fd) : context_ref(ctx), fd(fd)
+{
+ static auto eventHandler =
+ [](sd_event_source*, int, uint32_t, void* data) noexcept {
+ static_cast<fdio*>(data)->handleEvent();
+ return 0;
+ };
+
+ try
+ {
+ source = event_loop().add_io(fd, EPOLLIN, eventHandler, this);
+ }
+ catch (...)
+ {
+ throw std::runtime_error("Failed to add fd to event loop");
+ }
+}
+
+void fdio::handleEvent() noexcept
+{
+ std::unique_lock l{lock};
+ if (complete == nullptr)
+ {
+ return;
+ }
+ auto c = std::exchange(complete, nullptr);
+ l.unlock();
+ c->complete();
+}
+
+namespace fdio_ns
+{
+
+fdio_completion::~fdio_completion()
+{
+ std::unique_lock l{fdioInstance.lock};
+
+ if (fdioInstance.complete == this)
+ {
+ std::exchange(fdioInstance.complete, nullptr);
+ }
+}
+
+void fdio_completion::arm() noexcept
+{
+ // Set ourselves as the awaiting Receiver
+ std::unique_lock l{fdioInstance.lock};
+
+ if (std::exchange(fdioInstance.complete, this) != nullptr)
+ {
+ // We do not support two awaiters; throw exception. Since we are in
+ // a noexcept context this will std::terminate anyhow, which is
+ // approximately the same as 'assert' but with better information.
+ try
+ {
+ throw std::logic_error(
+ "fdio_completion started with another await already pending!");
+ }
+ catch (...)
+ {
+ std::terminate();
+ }
+ }
+}
+
+} // namespace fdio_ns
+
+} // namespace sdbusplus::async
diff --git a/test/async/fdio.cpp b/test/async/fdio.cpp
new file mode 100644
index 0000000..62f084d
--- /dev/null
+++ b/test/async/fdio.cpp
@@ -0,0 +1,101 @@
+#include <sdbusplus/async.hpp>
+
+#include <filesystem>
+#include <fstream>
+
+#include <gtest/gtest.h>
+
+using namespace std::literals;
+
+namespace fs = std::filesystem;
+
+class FdioTest : public ::testing::Test
+{
+ protected:
+ const fs::path path = "/tmp";
+ constexpr static auto testIterations = 5;
+
+ FdioTest()
+ {
+ auto fd = inotify_init1(IN_NONBLOCK);
+ EXPECT_NE(fd, -1) << "Error occurred during the inotify_init1, error: "
+ << errno;
+
+ auto wd = inotify_add_watch(fd, path.c_str(), IN_CLOSE_WRITE);
+ EXPECT_NE(wd, -1)
+ << "Error occurred during the inotify_add_watch, error: " << errno;
+ fdioInstance = std::make_unique<sdbusplus::async::fdio>(*ctx, fd);
+ }
+
+ ~FdioTest() override
+ {
+ if (fd != -1)
+ {
+ if (wd != -1)
+ {
+ inotify_rm_watch(fd, wd);
+ }
+ close(fd);
+ }
+ ctx.reset();
+ }
+
+ auto writeToFile() -> sdbusplus::async::task<>
+ {
+ std::ofstream outfile((path / "test_fdio.txt").native());
+ EXPECT_TRUE(outfile.is_open())
+ << "Error occurred during file open, error: " << errno;
+ outfile << "Test fdio!" << std::endl;
+ outfile.close();
+ co_return;
+ }
+
+ auto testFdEvents(bool& ran,
+ bool sleepBeforeWrite) -> sdbusplus::async::task<>
+ {
+ for (int i = 0; i < testIterations; i++)
+ {
+ if (sleepBeforeWrite)
+ {
+ ctx->spawn(sdbusplus::async::sleep_for(*ctx, 1s) |
+ stdexec::then([&]() { ctx->spawn(writeToFile()); }));
+ }
+ else
+ {
+ co_await writeToFile();
+ }
+ co_await fdioInstance->next();
+ }
+ ran = true;
+ co_return;
+ }
+
+ std::unique_ptr<sdbusplus::async::fdio> fdioInstance;
+ std::optional<sdbusplus::async::context> ctx{std::in_place};
+
+ private:
+ int fd = -1;
+ int wd = -1;
+};
+
+TEST_F(FdioTest, TestFdEvents)
+{
+ bool ran = false;
+ ctx->spawn(testFdEvents(ran, false));
+ ctx->spawn(
+ sdbusplus::async::sleep_for(*ctx, 1s) |
+ sdbusplus::async::execution::then([&]() { ctx->request_stop(); }));
+ ctx->run();
+ EXPECT_TRUE(ran);
+}
+
+TEST_F(FdioTest, TestFdEventsWithSleep)
+{
+ bool ran = false;
+ ctx->spawn(testFdEvents(ran, true));
+ ctx->spawn(
+ sdbusplus::async::sleep_for(*ctx, 5s) |
+ sdbusplus::async::execution::then([&]() { ctx->request_stop(); }));
+ ctx->run();
+ EXPECT_TRUE(ran);
+}
diff --git a/test/meson.build b/test/meson.build
index 1c9f7aa..c0b7a0a 100644
--- a/test/meson.build
+++ b/test/meson.build
@@ -23,6 +23,7 @@
'async/context',
'async/task',
'async/timer',
+ 'async/fdio',
'bus/exception',
'bus/list_names',
'bus/match',