add support for timed fdio

For certain operations such as reading and writing to a fd for request
response flows, the fd needs to have a timeout to avoid everlasting
hangs in case of a unresponsive responder. To support this, the fdio has
been extended to add timeout support by throwing an exception in case of
timeout.

Tested: Unit Test
```
1/1 test_async_fdio_timed OK             10.03s

Ok:                 1
Expected Fail:      0
Fail:               0
Unexpected Pass:    0
Skipped:            0
Timeout:            0
```

Change-Id: Ib219e4a4c55125785e6c1571488e445ba3379244
Signed-off-by: Jagpal Singh Gill <paligill@gmail.com>
diff --git a/include/sdbusplus/async/fdio.hpp b/include/sdbusplus/async/fdio.hpp
index 036abfd..abf3b42 100644
--- a/include/sdbusplus/async/fdio.hpp
+++ b/include/sdbusplus/async/fdio.hpp
@@ -22,8 +22,8 @@
     fdio& operator=(fdio&&) = delete;
     ~fdio() = default;
 
-    /** Construct a new fdio object from a context and a file descriptor. */
-    fdio(context& ctx, int fd);
+    fdio(context& ctx, int fd,
+         std::chrono::microseconds timeout = std::chrono::microseconds(0));
 
     /** Get the next fd event.
      *  Note: the implementation only supports a single awaiting task.  Two
@@ -34,6 +34,7 @@
     friend fdio_ns::fdio_completion;
 
   private:
+    event_t::time_resolution timeout;
     event_source_t source;
     std::mutex lock{};
     fdio_ns::fdio_completion* complete{nullptr};
@@ -44,6 +45,13 @@
     }
 
     void handleEvent() noexcept;
+    void handleTimeout() noexcept;
+};
+
+class fdio_timeout_exception : public std::runtime_error
+{
+  public:
+    fdio_timeout_exception() : std::runtime_error("Timeout") {}
 };
 
 namespace fdio_ns
@@ -69,10 +77,12 @@
 
   private:
     virtual void complete() noexcept = 0;
+    virtual void error(std::exception_ptr exceptionPtr) noexcept = 0;
     virtual void stop() noexcept = 0;
     void arm() noexcept;
 
     fdio& fdioInstance;
+    event_source_t source;
 };
 
 // Implementation (templated based on Receiver) of fdio_completion.
@@ -89,6 +99,11 @@
         execution::set_value(std::move(receiver));
     }
 
+    void error(std::exception_ptr exceptionPtr) noexcept override final
+    {
+        execution::set_error(std::move(receiver), exceptionPtr);
+    }
+
     void stop() noexcept override final
     {
         execution::set_stopped(std::move(receiver));
@@ -108,8 +123,10 @@
 
     friend auto tag_invoke(execution::get_completion_signatures_t,
                            const fdio_sender&, auto)
-        -> execution::completion_signatures<execution::set_value_t(),
-                                            execution::set_stopped_t()>;
+        -> execution::completion_signatures<
+            execution::set_value_t(),
+            execution::set_error_t(std::exception_ptr),
+            execution::set_stopped_t()>;
 
     template <execution::receiver R>
     friend auto tag_invoke(execution::connect_t, fdio_sender&& self, R r)
diff --git a/src/async/fdio.cpp b/src/async/fdio.cpp
index e1c3ddc..94c79d1 100644
--- a/src/async/fdio.cpp
+++ b/src/async/fdio.cpp
@@ -2,7 +2,9 @@
 
 namespace sdbusplus::async
 {
-fdio::fdio(context& ctx, int fd) : context_ref(ctx)
+fdio::fdio(context& ctx, int fd, std::chrono::microseconds timeout) :
+    context_ref(ctx),
+    timeout(std::chrono::duration_cast<event_t::time_resolution>(timeout))
 {
     static auto eventHandler =
         [](sd_event_source*, int, uint32_t, void* data) noexcept {
@@ -32,6 +34,18 @@
     c->complete();
 }
 
+void fdio::handleTimeout() noexcept
+{
+    std::unique_lock l{lock};
+    if (complete == nullptr)
+    {
+        return;
+    }
+    auto c = std::exchange(complete, nullptr);
+    l.unlock();
+    c->error(std::make_exception_ptr(fdio_timeout_exception()));
+}
+
 namespace fdio_ns
 {
 
@@ -65,6 +79,28 @@
             std::terminate();
         }
     }
+
+    l.unlock();
+
+    // Schedule the timeout
+    if (fdioInstance.timeout != event_t::time_resolution::zero())
+    {
+        static auto eventHandler =
+            [](sd_event_source*, uint64_t, void* data) noexcept {
+                static_cast<fdio*>(data)->handleTimeout();
+                return 0;
+            };
+
+        try
+        {
+            source = fdioInstance.event_loop().add_oneshot_timer(
+                eventHandler, &fdioInstance, fdioInstance.timeout);
+        }
+        catch (...)
+        {
+            error(std::current_exception());
+        }
+    }
 }
 
 } // namespace fdio_ns
diff --git a/test/async/fdio_timed.cpp b/test/async/fdio_timed.cpp
new file mode 100644
index 0000000..85c93b6
--- /dev/null
+++ b/test/async/fdio_timed.cpp
@@ -0,0 +1,179 @@
+#include <sdbusplus/async.hpp>
+
+#include <filesystem>
+#include <fstream>
+
+#include <gtest/gtest.h>
+
+using namespace std::literals;
+
+namespace fs = std::filesystem;
+
+class FdioTimedTest : public ::testing::Test
+{
+  protected:
+    enum class testWriteOperation
+    {
+        writeSync,
+        writeAsync,
+        writeSkip
+    };
+
+    const fs::path path = "/tmp/test_fdio_timed";
+
+    FdioTimedTest()
+    {
+        if (!fs::exists(path))
+        {
+            fs::create_directory(path);
+        }
+
+        fd = inotify_init1(IN_NONBLOCK);
+        EXPECT_NE(fd, -1) << "Error occurred during the inotify_init1, error: "
+                          << errno;
+
+        wd = inotify_add_watch(fd, path.c_str(), IN_CLOSE_WRITE);
+        EXPECT_NE(wd, -1)
+            << "Error occurred during the inotify_add_watch, error: " << errno;
+        fdioInstance = std::make_unique<sdbusplus::async::fdio>(
+            *ctx, fd, std::chrono::microseconds(1000));
+    }
+
+    ~FdioTimedTest() noexcept override
+    {
+        if (fd != -1)
+        {
+            if (wd != -1)
+            {
+                inotify_rm_watch(fd, wd);
+            }
+            close(fd);
+        }
+        ctx.reset();
+
+        if (fs::exists(path))
+        {
+            fs::remove_all(path);
+        }
+    }
+
+    auto writeToFile() -> sdbusplus::async::task<>
+    {
+        std::ofstream outfile((path / "test_fdio.txt").native());
+        EXPECT_TRUE(outfile.is_open())
+            << "Error occurred during file open, error: " << errno;
+        outfile << "Test fdio!" << std::endl;
+        outfile.close();
+        co_return;
+    }
+
+    auto testFdTimedEvents(bool& ran, testWriteOperation writeOperation,
+                           int testIterations) -> sdbusplus::async::task<>
+    {
+        for (int i = 0; i < testIterations; i++)
+        {
+            switch (writeOperation)
+            {
+                case testWriteOperation::writeSync:
+                    co_await writeToFile();
+                    break;
+                case testWriteOperation::writeAsync:
+                    ctx->spawn(
+                        sdbusplus::async::sleep_for(*ctx, 1s) |
+                        stdexec::then([&]() { ctx->spawn(writeToFile()); }));
+                    break;
+                case testWriteOperation::writeSkip:
+                default:
+                    break;
+            }
+
+            bool receivedTimeout = false;
+
+            try
+            {
+                co_await fdioInstance->next();
+            }
+            catch (const sdbusplus::async::fdio_timeout_exception& e)
+            {
+                receivedTimeout = true;
+            }
+
+            switch (writeOperation)
+            {
+                case testWriteOperation::writeSync:
+                    EXPECT_FALSE(receivedTimeout) << "Expected event";
+                    break;
+                case testWriteOperation::writeAsync:
+                case testWriteOperation::writeSkip:
+                default:
+                    EXPECT_TRUE(receivedTimeout) << "Expected timeout";
+                    break;
+            }
+        }
+        ran = true;
+
+        co_return;
+    }
+
+    std::unique_ptr<sdbusplus::async::fdio> fdioInstance;
+    std::optional<sdbusplus::async::context> ctx{std::in_place};
+
+  private:
+    int fd = -1;
+    int wd = -1;
+};
+
+TEST_F(FdioTimedTest, TestWriteSkipWithTimeout)
+{
+    bool ran = false;
+    ctx->spawn(testFdTimedEvents(ran, testWriteOperation::writeSkip, 1));
+    ctx->spawn(
+        sdbusplus::async::sleep_for(*ctx, 2s) |
+        sdbusplus::async::execution::then([&]() { ctx->request_stop(); }));
+    ctx->run();
+    EXPECT_TRUE(ran);
+}
+
+TEST_F(FdioTimedTest, TestWriteAsyncWithTimeout)
+{
+    bool ran = false;
+    ctx->spawn(testFdTimedEvents(ran, testWriteOperation::writeAsync, 1));
+    ctx->spawn(
+        sdbusplus::async::sleep_for(*ctx, 2s) |
+        sdbusplus::async::execution::then([&]() { ctx->request_stop(); }));
+    ctx->run();
+    EXPECT_TRUE(ran);
+}
+
+TEST_F(FdioTimedTest, TestWriteAsyncWithTimeoutIterative)
+{
+    bool ran = false;
+    ctx->spawn(testFdTimedEvents(ran, testWriteOperation::writeAsync, 100));
+    ctx->spawn(
+        sdbusplus::async::sleep_for(*ctx, 2s) |
+        sdbusplus::async::execution::then([&]() { ctx->request_stop(); }));
+    ctx->run();
+    EXPECT_TRUE(ran);
+}
+
+TEST_F(FdioTimedTest, TestWriteSync)
+{
+    bool ran = false;
+    ctx->spawn(testFdTimedEvents(ran, testWriteOperation::writeSync, 1));
+    ctx->spawn(
+        sdbusplus::async::sleep_for(*ctx, 2s) |
+        sdbusplus::async::execution::then([&]() { ctx->request_stop(); }));
+    ctx->run();
+    EXPECT_TRUE(ran);
+}
+
+TEST_F(FdioTimedTest, TestWriteSyncIterative)
+{
+    bool ran = false;
+    ctx->spawn(testFdTimedEvents(ran, testWriteOperation::writeSync, 100));
+    ctx->spawn(
+        sdbusplus::async::sleep_for(*ctx, 2s) |
+        sdbusplus::async::execution::then([&]() { ctx->request_stop(); }));
+    ctx->run();
+    EXPECT_TRUE(ran);
+}
diff --git a/test/meson.build b/test/meson.build
index 2b7bcfc..2459bd0 100644
--- a/test/meson.build
+++ b/test/meson.build
@@ -21,9 +21,10 @@
 
 tests = [
     'async/context',
+    'async/fdio',
+    'async/fdio_timed',
     'async/task',
     'async/timer',
-    'async/fdio',
     'async/watchdog',
     'bus/exception',
     'bus/list_names',