add support for timed fdio
For certain operations such as reading and writing to a fd for request
response flows, the fd needs to have a timeout to avoid everlasting
hangs in case of a unresponsive responder. To support this, the fdio has
been extended to add timeout support by throwing an exception in case of
timeout.
Tested: Unit Test
```
1/1 test_async_fdio_timed OK 10.03s
Ok: 1
Expected Fail: 0
Fail: 0
Unexpected Pass: 0
Skipped: 0
Timeout: 0
```
Change-Id: Ib219e4a4c55125785e6c1571488e445ba3379244
Signed-off-by: Jagpal Singh Gill <paligill@gmail.com>
diff --git a/include/sdbusplus/async/fdio.hpp b/include/sdbusplus/async/fdio.hpp
index 036abfd..abf3b42 100644
--- a/include/sdbusplus/async/fdio.hpp
+++ b/include/sdbusplus/async/fdio.hpp
@@ -22,8 +22,8 @@
fdio& operator=(fdio&&) = delete;
~fdio() = default;
- /** Construct a new fdio object from a context and a file descriptor. */
- fdio(context& ctx, int fd);
+ fdio(context& ctx, int fd,
+ std::chrono::microseconds timeout = std::chrono::microseconds(0));
/** Get the next fd event.
* Note: the implementation only supports a single awaiting task. Two
@@ -34,6 +34,7 @@
friend fdio_ns::fdio_completion;
private:
+ event_t::time_resolution timeout;
event_source_t source;
std::mutex lock{};
fdio_ns::fdio_completion* complete{nullptr};
@@ -44,6 +45,13 @@
}
void handleEvent() noexcept;
+ void handleTimeout() noexcept;
+};
+
+class fdio_timeout_exception : public std::runtime_error
+{
+ public:
+ fdio_timeout_exception() : std::runtime_error("Timeout") {}
};
namespace fdio_ns
@@ -69,10 +77,12 @@
private:
virtual void complete() noexcept = 0;
+ virtual void error(std::exception_ptr exceptionPtr) noexcept = 0;
virtual void stop() noexcept = 0;
void arm() noexcept;
fdio& fdioInstance;
+ event_source_t source;
};
// Implementation (templated based on Receiver) of fdio_completion.
@@ -89,6 +99,11 @@
execution::set_value(std::move(receiver));
}
+ void error(std::exception_ptr exceptionPtr) noexcept override final
+ {
+ execution::set_error(std::move(receiver), exceptionPtr);
+ }
+
void stop() noexcept override final
{
execution::set_stopped(std::move(receiver));
@@ -108,8 +123,10 @@
friend auto tag_invoke(execution::get_completion_signatures_t,
const fdio_sender&, auto)
- -> execution::completion_signatures<execution::set_value_t(),
- execution::set_stopped_t()>;
+ -> execution::completion_signatures<
+ execution::set_value_t(),
+ execution::set_error_t(std::exception_ptr),
+ execution::set_stopped_t()>;
template <execution::receiver R>
friend auto tag_invoke(execution::connect_t, fdio_sender&& self, R r)
diff --git a/src/async/fdio.cpp b/src/async/fdio.cpp
index e1c3ddc..94c79d1 100644
--- a/src/async/fdio.cpp
+++ b/src/async/fdio.cpp
@@ -2,7 +2,9 @@
namespace sdbusplus::async
{
-fdio::fdio(context& ctx, int fd) : context_ref(ctx)
+fdio::fdio(context& ctx, int fd, std::chrono::microseconds timeout) :
+ context_ref(ctx),
+ timeout(std::chrono::duration_cast<event_t::time_resolution>(timeout))
{
static auto eventHandler =
[](sd_event_source*, int, uint32_t, void* data) noexcept {
@@ -32,6 +34,18 @@
c->complete();
}
+void fdio::handleTimeout() noexcept
+{
+ std::unique_lock l{lock};
+ if (complete == nullptr)
+ {
+ return;
+ }
+ auto c = std::exchange(complete, nullptr);
+ l.unlock();
+ c->error(std::make_exception_ptr(fdio_timeout_exception()));
+}
+
namespace fdio_ns
{
@@ -65,6 +79,28 @@
std::terminate();
}
}
+
+ l.unlock();
+
+ // Schedule the timeout
+ if (fdioInstance.timeout != event_t::time_resolution::zero())
+ {
+ static auto eventHandler =
+ [](sd_event_source*, uint64_t, void* data) noexcept {
+ static_cast<fdio*>(data)->handleTimeout();
+ return 0;
+ };
+
+ try
+ {
+ source = fdioInstance.event_loop().add_oneshot_timer(
+ eventHandler, &fdioInstance, fdioInstance.timeout);
+ }
+ catch (...)
+ {
+ error(std::current_exception());
+ }
+ }
}
} // namespace fdio_ns
diff --git a/test/async/fdio_timed.cpp b/test/async/fdio_timed.cpp
new file mode 100644
index 0000000..85c93b6
--- /dev/null
+++ b/test/async/fdio_timed.cpp
@@ -0,0 +1,179 @@
+#include <sdbusplus/async.hpp>
+
+#include <filesystem>
+#include <fstream>
+
+#include <gtest/gtest.h>
+
+using namespace std::literals;
+
+namespace fs = std::filesystem;
+
+class FdioTimedTest : public ::testing::Test
+{
+ protected:
+ enum class testWriteOperation
+ {
+ writeSync,
+ writeAsync,
+ writeSkip
+ };
+
+ const fs::path path = "/tmp/test_fdio_timed";
+
+ FdioTimedTest()
+ {
+ if (!fs::exists(path))
+ {
+ fs::create_directory(path);
+ }
+
+ fd = inotify_init1(IN_NONBLOCK);
+ EXPECT_NE(fd, -1) << "Error occurred during the inotify_init1, error: "
+ << errno;
+
+ wd = inotify_add_watch(fd, path.c_str(), IN_CLOSE_WRITE);
+ EXPECT_NE(wd, -1)
+ << "Error occurred during the inotify_add_watch, error: " << errno;
+ fdioInstance = std::make_unique<sdbusplus::async::fdio>(
+ *ctx, fd, std::chrono::microseconds(1000));
+ }
+
+ ~FdioTimedTest() noexcept override
+ {
+ if (fd != -1)
+ {
+ if (wd != -1)
+ {
+ inotify_rm_watch(fd, wd);
+ }
+ close(fd);
+ }
+ ctx.reset();
+
+ if (fs::exists(path))
+ {
+ fs::remove_all(path);
+ }
+ }
+
+ auto writeToFile() -> sdbusplus::async::task<>
+ {
+ std::ofstream outfile((path / "test_fdio.txt").native());
+ EXPECT_TRUE(outfile.is_open())
+ << "Error occurred during file open, error: " << errno;
+ outfile << "Test fdio!" << std::endl;
+ outfile.close();
+ co_return;
+ }
+
+ auto testFdTimedEvents(bool& ran, testWriteOperation writeOperation,
+ int testIterations) -> sdbusplus::async::task<>
+ {
+ for (int i = 0; i < testIterations; i++)
+ {
+ switch (writeOperation)
+ {
+ case testWriteOperation::writeSync:
+ co_await writeToFile();
+ break;
+ case testWriteOperation::writeAsync:
+ ctx->spawn(
+ sdbusplus::async::sleep_for(*ctx, 1s) |
+ stdexec::then([&]() { ctx->spawn(writeToFile()); }));
+ break;
+ case testWriteOperation::writeSkip:
+ default:
+ break;
+ }
+
+ bool receivedTimeout = false;
+
+ try
+ {
+ co_await fdioInstance->next();
+ }
+ catch (const sdbusplus::async::fdio_timeout_exception& e)
+ {
+ receivedTimeout = true;
+ }
+
+ switch (writeOperation)
+ {
+ case testWriteOperation::writeSync:
+ EXPECT_FALSE(receivedTimeout) << "Expected event";
+ break;
+ case testWriteOperation::writeAsync:
+ case testWriteOperation::writeSkip:
+ default:
+ EXPECT_TRUE(receivedTimeout) << "Expected timeout";
+ break;
+ }
+ }
+ ran = true;
+
+ co_return;
+ }
+
+ std::unique_ptr<sdbusplus::async::fdio> fdioInstance;
+ std::optional<sdbusplus::async::context> ctx{std::in_place};
+
+ private:
+ int fd = -1;
+ int wd = -1;
+};
+
+TEST_F(FdioTimedTest, TestWriteSkipWithTimeout)
+{
+ bool ran = false;
+ ctx->spawn(testFdTimedEvents(ran, testWriteOperation::writeSkip, 1));
+ ctx->spawn(
+ sdbusplus::async::sleep_for(*ctx, 2s) |
+ sdbusplus::async::execution::then([&]() { ctx->request_stop(); }));
+ ctx->run();
+ EXPECT_TRUE(ran);
+}
+
+TEST_F(FdioTimedTest, TestWriteAsyncWithTimeout)
+{
+ bool ran = false;
+ ctx->spawn(testFdTimedEvents(ran, testWriteOperation::writeAsync, 1));
+ ctx->spawn(
+ sdbusplus::async::sleep_for(*ctx, 2s) |
+ sdbusplus::async::execution::then([&]() { ctx->request_stop(); }));
+ ctx->run();
+ EXPECT_TRUE(ran);
+}
+
+TEST_F(FdioTimedTest, TestWriteAsyncWithTimeoutIterative)
+{
+ bool ran = false;
+ ctx->spawn(testFdTimedEvents(ran, testWriteOperation::writeAsync, 100));
+ ctx->spawn(
+ sdbusplus::async::sleep_for(*ctx, 2s) |
+ sdbusplus::async::execution::then([&]() { ctx->request_stop(); }));
+ ctx->run();
+ EXPECT_TRUE(ran);
+}
+
+TEST_F(FdioTimedTest, TestWriteSync)
+{
+ bool ran = false;
+ ctx->spawn(testFdTimedEvents(ran, testWriteOperation::writeSync, 1));
+ ctx->spawn(
+ sdbusplus::async::sleep_for(*ctx, 2s) |
+ sdbusplus::async::execution::then([&]() { ctx->request_stop(); }));
+ ctx->run();
+ EXPECT_TRUE(ran);
+}
+
+TEST_F(FdioTimedTest, TestWriteSyncIterative)
+{
+ bool ran = false;
+ ctx->spawn(testFdTimedEvents(ran, testWriteOperation::writeSync, 100));
+ ctx->spawn(
+ sdbusplus::async::sleep_for(*ctx, 2s) |
+ sdbusplus::async::execution::then([&]() { ctx->request_stop(); }));
+ ctx->run();
+ EXPECT_TRUE(ran);
+}
diff --git a/test/meson.build b/test/meson.build
index 2b7bcfc..2459bd0 100644
--- a/test/meson.build
+++ b/test/meson.build
@@ -21,9 +21,10 @@
tests = [
'async/context',
+ 'async/fdio',
+ 'async/fdio_timed',
'async/task',
'async/timer',
- 'async/fdio',
'async/watchdog',
'bus/exception',
'bus/list_names',