async: scope: add completion sender
Add a mechanism to await the completion of an async::scope,
equivalent to a 'join' on the spawned tasks it contains.
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I669680441704cdb59aa982d97bed610178c498b9
diff --git a/src/async/context.cpp b/src/async/context.cpp
index 38d3a89..60d9e56 100644
--- a/src/async/context.cpp
+++ b/src/async/context.cpp
@@ -150,6 +150,9 @@
details::wait_process_completion::wait_once(*this);
}
+ // Wait for all the tasks to complete.
+ std::this_thread::sync_wait(pending_tasks.empty());
+
// Stop has been requested, so finish up the loop.
loop.finish();
if (worker_thread.joinable())
diff --git a/src/async/scope.cpp b/src/async/scope.cpp
index efdc5f6..908def3 100644
--- a/src/async/scope.cpp
+++ b/src/async/scope.cpp
@@ -6,7 +6,7 @@
{
scope::~scope() noexcept(false)
{
- if (count != 0)
+ if (pending_count != 0)
{
throw std::logic_error(
"sdbusplus::async::scope destructed while tasks are pending.");
@@ -15,12 +15,58 @@
void scope::started_task() noexcept
{
- ++count;
+ std::lock_guard l{lock};
+ ++pending_count;
}
void scope::ended_task() noexcept
{
- --count;
+ scope_ns::scope_completion* p = nullptr;
+
+ {
+ std::lock_guard l{lock};
+ --pending_count;
+
+ if (pending_count == 0)
+ {
+ p = std::exchange(pending, nullptr);
+ }
+ }
+
+ if (p)
+ {
+ p->complete();
+ }
}
+scope_ns::scope_sender scope::empty() noexcept
+{
+ return scope_ns::scope_sender(*this);
+}
+
+namespace scope_ns
+{
+void scope_completion::arm() noexcept
+{
+ bool done = false;
+
+ {
+ std::lock_guard l{s.lock};
+ if (s.pending_count == 0)
+ {
+ done = true;
+ }
+ else
+ {
+ s.pending = this;
+ }
+ }
+
+ if (done)
+ {
+ this->complete();
+ }
+}
+} // namespace scope_ns
+
} // namespace sdbusplus::async