add support for async mutex
Add support for async mutex for synchronizing coroutines/tasks using
sender receiver framework. Also, add lock_guard support which provides a
RAII wrapper for mutex to own it for the duration of a scoped block.
Tested: Add Unit Test using gtest
```
1/1 test_async_mutex OK 2.01s
Ok: 1
Expected Fail: 0
Fail: 0
Unexpected Pass: 0
Skipped: 0
Timeout: 0
```
Change-Id: I691528885a94b9cf55d4b7f2fb0c1e0e6d0ab84f
Signed-off-by: Jagpal Singh Gill <paligill@gmail.com>
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
diff --git a/include/sdbusplus/async.hpp b/include/sdbusplus/async.hpp
index 7573cd1..3dec440 100644
--- a/include/sdbusplus/async.hpp
+++ b/include/sdbusplus/async.hpp
@@ -5,6 +5,7 @@
#include <sdbusplus/async/execution.hpp>
#include <sdbusplus/async/fdio.hpp>
#include <sdbusplus/async/match.hpp>
+#include <sdbusplus/async/mutex.hpp>
#include <sdbusplus/async/proxy.hpp>
#include <sdbusplus/async/task.hpp>
#include <sdbusplus/async/timer.hpp>
diff --git a/include/sdbusplus/async/mutex.hpp b/include/sdbusplus/async/mutex.hpp
new file mode 100644
index 0000000..d937f92
--- /dev/null
+++ b/include/sdbusplus/async/mutex.hpp
@@ -0,0 +1,165 @@
+#pragma once
+
+#include <sdbusplus/async/execution.hpp>
+#include <sdbusplus/event.hpp>
+
+#include <queue>
+#include <string>
+
+namespace sdbusplus::async
+{
+
+namespace mutex_ns
+{
+struct mutex_completion;
+} // namespace mutex_ns
+
+class lock_guard;
+
+class mutex
+{
+ public:
+ mutex() = delete;
+ mutex(const mutex&) = delete;
+ mutex& operator=(const mutex&) = delete;
+ mutex(mutex&&) = delete;
+ mutex& operator=(mutex&&) = delete;
+ ~mutex() = default;
+
+ mutex(const std::string& name = "sdbusplus::async::mutex");
+
+ friend mutex_ns::mutex_completion;
+ friend lock_guard;
+
+ private:
+ void unlock();
+
+ std::string name;
+ bool locked{false};
+ std::queue<mutex_ns::mutex_completion*> waitingTasks;
+ std::mutex lock{};
+};
+
+// RAII wrapper for mutex for the duration of a scoped block.
+class lock_guard
+{
+ public:
+ lock_guard() = delete;
+ lock_guard(const lock_guard&) = delete;
+ lock_guard& operator=(const lock_guard&) = delete;
+ lock_guard(lock_guard&&) = delete;
+ lock_guard& operator=(lock_guard&&) = delete;
+
+ explicit lock_guard(mutex& mutexInstance) : mutexInstance(mutexInstance) {}
+
+ ~lock_guard()
+ {
+ if (owned)
+ {
+ mutexInstance.unlock();
+ owned = false;
+ }
+ }
+
+ auto lock() noexcept;
+ auto unlock() noexcept;
+
+ private:
+ mutex& mutexInstance;
+ bool owned = false;
+};
+
+namespace mutex_ns
+{
+
+struct mutex_completion
+{
+ mutex_completion() = delete;
+ mutex_completion(const mutex_completion&) = delete;
+ mutex_completion& operator=(const mutex_completion&) = delete;
+ mutex_completion(mutex_completion&&) = delete;
+ ~mutex_completion() = default;
+
+ explicit mutex_completion(mutex& mutexInstance) noexcept :
+ mutexInstance(mutexInstance) {};
+
+ friend mutex;
+
+ friend void tag_invoke(execution::start_t, mutex_completion& self) noexcept
+ {
+ self.arm();
+ }
+
+ private:
+ virtual void complete() noexcept = 0;
+ virtual void stop() noexcept = 0;
+ void arm() noexcept;
+
+ mutex& mutexInstance;
+};
+
+// Implementation (templated based on Receiver) of mutex_completion.
+template <execution::receiver Receiver>
+struct mutex_operation : mutex_completion
+{
+ mutex_operation(mutex& mutexInstance, Receiver r) :
+ mutex_completion(mutexInstance), receiver(std::move(r))
+ {}
+
+ private:
+ void complete() noexcept override final
+ {
+ execution::set_value(std::move(receiver));
+ }
+
+ void stop() noexcept override final
+ {
+ execution::set_stopped(std::move(receiver));
+ }
+
+ Receiver receiver;
+};
+
+// mutex sender
+struct mutex_sender
+{
+ using is_sender = void;
+
+ mutex_sender() = delete;
+ explicit mutex_sender(mutex& mutexInstance) noexcept :
+ mutexInstance(mutexInstance) {};
+
+ friend auto tag_invoke(execution::get_completion_signatures_t,
+ const mutex_sender&, auto)
+ -> execution::completion_signatures<execution::set_value_t(),
+ execution::set_stopped_t()>;
+
+ template <execution::receiver R>
+ friend auto tag_invoke(execution::connect_t, mutex_sender&& self, R r)
+ -> mutex_operation<R>
+ {
+ return {self.mutexInstance, std::move(r)};
+ }
+
+ private:
+ mutex& mutexInstance;
+};
+
+} // namespace mutex_ns
+
+inline auto lock_guard::lock() noexcept
+{
+ owned = true;
+ return mutex_ns::mutex_sender{this->mutexInstance};
+}
+
+inline auto lock_guard::unlock() noexcept
+{
+ if (owned)
+ {
+ mutexInstance.unlock();
+ owned = false;
+ }
+}
+
+} // namespace sdbusplus::async
diff --git a/meson.build b/meson.build
index d11202e..64e887f 100644
--- a/meson.build
+++ b/meson.build
@@ -37,6 +37,7 @@
'src/async/context.cpp',
'src/async/fdio.cpp',
'src/async/match.cpp',
+ 'src/async/mutex.cpp',
'src/bus.cpp',
'src/bus/match.cpp',
'src/event.cpp',
diff --git a/src/async/mutex.cpp b/src/async/mutex.cpp
new file mode 100644
index 0000000..44694c6
--- /dev/null
+++ b/src/async/mutex.cpp
@@ -0,0 +1,54 @@
+#include <sdbusplus/async/mutex.hpp>
+
+namespace sdbusplus::async
+{
+
+mutex::mutex(const std::string& name) : name(name) {}
+
+void mutex::unlock()
+{
+ std::unique_lock l{lock};
+ if (waitingTasks.empty())
+ {
+ auto wasLocked = std::exchange(locked, false);
+ if (!wasLocked)
+ {
+ try
+ {
+ throw std::runtime_error("mutex is not locked!");
+ }
+ catch (...)
+ {
+ std::terminate();
+ }
+ }
+ return;
+ }
+ // Wake up the next waiting task
+ auto completion = std::move(waitingTasks.front());
+ waitingTasks.pop();
+ l.unlock();
+ completion->complete();
+}
+
+namespace mutex_ns
+{
+
+void mutex_completion::arm() noexcept
+{
+ std::unique_lock l{mutexInstance.lock};
+
+ auto wasLocked = std::exchange(mutexInstance.locked, true);
+ if (!wasLocked)
+ {
+ l.unlock();
+ complete();
+ return;
+ }
+
+ mutexInstance.waitingTasks.push(this);
+}
+
+} // namespace mutex_ns
+
+} // namespace sdbusplus::async
diff --git a/test/async/mutex.cpp b/test/async/mutex.cpp
new file mode 100644
index 0000000..6630157
--- /dev/null
+++ b/test/async/mutex.cpp
@@ -0,0 +1,150 @@
+#include <sdbusplus/async.hpp>
+
+#include <filesystem>
+#include <fstream>
+
+#include <gtest/gtest.h>
+
+using namespace std::literals;
+
+namespace fs = std::filesystem;
+
+class MutexTest : public ::testing::Test
+{
+ protected:
+ constexpr static std::string testMutex = "TestMutex";
+ const fs::path path = "/tmp";
+
+ MutexTest() : mutex(testMutex)
+ {
+ auto fd = inotify_init1(IN_NONBLOCK);
+ EXPECT_NE(fd, -1) << "Error occurred during the inotify_init1, error: "
+ << errno;
+
+ auto wd = inotify_add_watch(fd, path.c_str(), IN_CLOSE_WRITE);
+ EXPECT_NE(wd, -1)
+ << "Error occurred during the inotify_add_watch, error: " << errno;
+ fdioInstance = std::make_unique<sdbusplus::async::fdio>(*ctx, fd);
+ }
+
+ std::optional<sdbusplus::async::context> ctx{std::in_place};
+
+ auto testAsyncAddition(int val = 1) -> sdbusplus::async::task<>
+ {
+ sdbusplus::async::lock_guard lg{mutex};
+ co_await lg.lock();
+
+ sharedVar += val;
+ }
+
+ auto testAsyncSubtraction(int val = 1) -> sdbusplus::async::task<>
+ {
+ sdbusplus::async::lock_guard lg{mutex};
+ co_await lg.lock();
+
+ sharedVar -= val;
+ }
+
+ auto writeToFile() -> sdbusplus::async::task<>
+ {
+ std::ofstream outfile((path / testMutex).native());
+ EXPECT_TRUE(outfile.is_open())
+ << "Error occurred during file open, error: " << errno;
+
+ outfile << testMutex << std::endl;
+ outfile.close();
+
+ co_return;
+ }
+
+ auto readFromFile() -> sdbusplus::async::task<>
+ {
+ std::ifstream infile((path / testMutex).native());
+ EXPECT_TRUE(infile.is_open())
+ << "Error occurred during file open, error: " << errno;
+
+ std::string line;
+ std::getline(infile, line);
+ EXPECT_EQ(line, testMutex);
+ infile.close();
+
+ co_return;
+ }
+
+ auto testFdEvents() -> sdbusplus::async::task<>
+ {
+ sdbusplus::async::lock_guard lg{mutex};
+ co_await lg.lock();
+
+ ctx->spawn(writeToFile());
+ co_await fdioInstance->next();
+ co_await readFromFile();
+ ran++;
+
+ co_return;
+ }
+
+ int sharedVar = 0;
+ int ran = 0;
+ sdbusplus::async::mutex mutex;
+
+ private:
+ std::unique_ptr<sdbusplus::async::fdio> fdioInstance;
+ int fd = -1;
+ int wd = -1;
+};
+
+TEST_F(MutexTest, TestAsyncAddition)
+{
+ constexpr auto testIterations = 10;
+ for (auto i = 0; i < testIterations; i++)
+ {
+ ctx->spawn(testAsyncAddition());
+ }
+
+ ctx->spawn(
+ sdbusplus::async::sleep_for(*ctx, 1s) |
+ sdbusplus::async::execution::then([&]() { ctx->request_stop(); }));
+
+ ctx->run();
+
+ EXPECT_EQ(sharedVar, testIterations);
+}
+
+TEST_F(MutexTest, TestAsyncMixed)
+{
+ constexpr auto testIterations = 10;
+ for (auto i = 0; i < testIterations; i++)
+ {
+ ctx->spawn(testAsyncAddition());
+ ctx->spawn(testAsyncSubtraction(2));
+ }
+
+ ctx->spawn(
+ sdbusplus::async::sleep_for(*ctx, 1s) |
+ sdbusplus::async::execution::then([&]() { ctx->request_stop(); }));
+
+ ctx->run();
+
+ EXPECT_EQ(sharedVar, -testIterations);
+}
+
+TEST_F(MutexTest, TestFdEvents)
+{
+ constexpr static auto testIterations = 5;
+
+ for (auto i = 0; i < testIterations; i++)
+ {
+ ctx->spawn(testFdEvents());
+ }
+ ctx->spawn(
+ sdbusplus::async::sleep_for(*ctx, 3s) |
+ sdbusplus::async::execution::then([&]() { ctx->request_stop(); }));
+ ctx->run();
+ EXPECT_EQ(ran, testIterations);
+}
+
+TEST_F(MutexTest, TestLockGuardNoLock)
+{
+ sdbusplus::async::lock_guard lg{mutex};
+}
diff --git a/test/meson.build b/test/meson.build
index 2459bd0..f67bf32 100644
--- a/test/meson.build
+++ b/test/meson.build
@@ -23,6 +23,7 @@
'async/context',
'async/fdio',
'async/fdio_timed',
+ 'async/mutex',
'async/task',
'async/timer',
'async/watchdog',