async: switch to stdexec async_scope
Use the `async_scope` implementation from stdexec instead of the
custom `scope` class here, which greatly reduces our own code and
also aligns better with the C++ STL direction. The major changes are
around exception paths:
- Spawned tasks which end with an exception will now std::terminate,
so any task expected to throw should have an `upon_error` chain.
- Spawned tasks which are stopped will be no longer throw an
`UnexpectedStop` exception, so they should be chained with an
`upon_stopped` if this is important.
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I4e0c85712652efa5b296b898dcc2b0026ba4c625
diff --git a/include/sdbusplus/async.hpp b/include/sdbusplus/async.hpp
index 6c9e834..70b8abf 100644
--- a/include/sdbusplus/async.hpp
+++ b/include/sdbusplus/async.hpp
@@ -4,6 +4,5 @@
#include <sdbusplus/async/execution.hpp>
#include <sdbusplus/async/match.hpp>
#include <sdbusplus/async/proxy.hpp>
-#include <sdbusplus/async/scope.hpp>
#include <sdbusplus/async/task.hpp>
#include <sdbusplus/async/timer.hpp>
diff --git a/include/sdbusplus/async/context.hpp b/include/sdbusplus/async/context.hpp
index 17a8a37..9101df3 100644
--- a/include/sdbusplus/async/context.hpp
+++ b/include/sdbusplus/async/context.hpp
@@ -1,7 +1,6 @@
#pragma once
#include <sdbusplus/async/execution.hpp>
-#include <sdbusplus/async/scope.hpp>
#include <sdbusplus/async/task.hpp>
#include <sdbusplus/bus.hpp>
#include <sdbusplus/event.hpp>
@@ -98,11 +97,11 @@
/** Stop source */
std::stop_source initial_stop{};
- scope pending_tasks{loop};
+ async_scope pending_tasks{};
// In order to coordinate final completion of work, we keep some tasks
// on a separate scope (the ones which maintain the sd-event/dbus state
// and keep a final stop-source for them.
- scope internal_tasks{loop};
+ async_scope internal_tasks{};
std::stop_source final_stop{};
// Lock and condition variable to signal `caller`.
diff --git a/include/sdbusplus/async/execution.hpp b/include/sdbusplus/async/execution.hpp
index 1de17e0..5fc34d5 100644
--- a/include/sdbusplus/async/execution.hpp
+++ b/include/sdbusplus/async/execution.hpp
@@ -8,6 +8,7 @@
#pragma GCC diagnostic ignored "-Wnon-template-friend"
#endif
#pragma GCC diagnostic ignored "-Wmissing-field-initializers"
+#include <sdbusplus/async/stdexec/async_scope.hpp>
#include <sdbusplus/async/stdexec/coroutine.hpp>
#include <sdbusplus/async/stdexec/execution.hpp>
#pragma GCC diagnostic pop
@@ -17,4 +18,5 @@
namespace sdbusplus::async
{
namespace execution = stdexec;
-}
+using async_scope = exec::async_scope;
+} // namespace sdbusplus::async
diff --git a/include/sdbusplus/async/scope.hpp b/include/sdbusplus/async/scope.hpp
deleted file mode 100644
index 8975194..0000000
--- a/include/sdbusplus/async/scope.hpp
+++ /dev/null
@@ -1,261 +0,0 @@
-#pragma once
-
-#include <sdbusplus/async/execution.hpp>
-#include <sdbusplus/exception.hpp>
-
-#include <deque>
-#include <mutex>
-
-namespace sdbusplus::async
-{
-
-namespace scope_ns
-{
-template <execution::sender Sender>
-struct scope_receiver;
-
-struct scope_sender;
-
-struct scope_completion;
-} // namespace scope_ns
-
-/** A collection of tasks.
- *
- * It is sometimes useful to run a group of tasks, possibly even detached
- * from an execution context, but the only option in std::execution is
- * start_detached, which does not have any mechanism to deal with errors
- * from those tasks and can potentially leak tasks. Alternatively, you
- * can save the result of `execution::connect`, but you need to maintain a
- * lifetime of it until the connected-task completes.
- *
- * The scope maintains the lifetime of the tasks it `spawns` so that the
- * operational-state created by `connect` is preserved (and deleted) as
- * appropriate.
- */
-struct scope
-{
- scope() = delete;
- explicit scope(execution::run_loop& loop) : loop(loop) {}
-
- // The scope destructor can throw if it was destructed while there are
- // outstanding tasks.
- ~scope() noexcept(false);
-
- /** Spawn a Sender to run on the scope.
- *
- * @param[in] sender - The sender to run.
- */
- template <execution::sender_of<execution::set_value_t()> Sender>
- void spawn(Sender&& sender);
-
- /** Get a Sender that awaits for all tasks to complete. */
- scope_ns::scope_sender empty() noexcept;
-
- template <execution::sender>
- friend struct scope_ns::scope_receiver;
-
- friend scope_ns::scope_completion;
-
- private:
- void started_task() noexcept;
- void ended_task(std::exception_ptr&&) noexcept;
-
- std::mutex lock{};
- bool started = false;
- size_t pending_count = 0;
- std::deque<std::exception_ptr> pending_exceptions = {};
- scope_ns::scope_completion* pending = nullptr;
-
- execution::run_loop& loop;
-};
-
-namespace scope_ns
-{
-
-/** The Sender-completion Reciever. */
-template <execution::sender Sender>
-struct scope_receiver
-{
- template <typename OpState>
- explicit scope_receiver(OpState* os, scope* s) : op_state(os), s(s)
- {}
-
- friend void tag_invoke(execution::set_value_t, scope_receiver&& self,
- auto&&...) noexcept
- {
- self.complete();
- }
-
- friend void tag_invoke(execution::set_error_t, scope_receiver&& self,
- std::exception_ptr&& e) noexcept
- {
- self.complete(std::move(e));
- }
-
- friend void tag_invoke(execution::set_stopped_t,
- scope_receiver&& self) noexcept
- {
- self.complete(std::make_exception_ptr(exception::UnhandledStop{}));
- }
-
- friend decltype(auto) tag_invoke(execution::get_env_t,
- const scope_receiver& self) noexcept
- {
- return self;
- }
-
- friend decltype(auto) tag_invoke(execution::get_scheduler_t,
- const scope_receiver& self) noexcept
- {
- return self.get_scheduler();
- }
-
- void complete(std::exception_ptr&& = {}) noexcept;
-
- void* op_state;
- scope* s = nullptr;
-
- private:
- decltype(auto) get_scheduler() const
- {
- return s->loop.get_scheduler();
- }
-};
-
-/** The holder of the connect operational-state. */
-template <execution::sender Sender>
-struct scope_operation_state
-{
- using state_t = execution::connect_result_t<Sender, scope_receiver<Sender>>;
- std::optional<state_t> op_state;
-};
-
-template <execution::sender Sender>
-void scope_receiver<Sender>::complete(std::exception_ptr&& e) noexcept
-{
- // The Sender is complete, so we need to clean up the saved operational
- // state.
-
- // Save the scope (since we're going to delete `this`).
- auto owning_scope = s;
- // We also need to save the exception, which is likely owned by `this`.
- std::exception_ptr ex{std::move(e)};
-
- // Delete the operational state, which triggers deleting this.
- delete static_cast<scope_ns::scope_operation_state<Sender>*>(op_state);
-
- // Inform the scope that a task has completed.
- owning_scope->ended_task(std::move(ex));
-}
-
-// Virtual class to handle the scope completions.
-struct scope_completion
-{
- scope_completion() = delete;
- scope_completion(scope_completion&&) = delete;
-
- explicit scope_completion(scope& s) : s(s){};
- virtual ~scope_completion() = default;
-
- friend scope;
-
- friend void tag_invoke(execution::start_t, scope_completion& self) noexcept
- {
- self.arm();
- }
-
- private:
- virtual void complete(std::exception_ptr&&) noexcept = 0;
- void arm() noexcept;
-
- scope& s;
-};
-
-// Implementation (templated based on Reciever) of scope_completion.
-template <execution::receiver Reciever>
-struct scope_operation : scope_completion
-{
- scope_operation(scope& s, Reciever r) :
- scope_completion(s), receiver(std::move(r))
- {}
-
- private:
- void complete(std::exception_ptr&& e) noexcept override final
- {
- if (e)
- {
- execution::set_error(std::move(receiver), std::move(e));
- }
- else
- {
- execution::set_value(std::move(receiver));
- }
- }
-
- Reciever receiver;
-};
-
-// Scope completion Sender implementation.
-struct scope_sender
-{
- scope_sender() = delete;
- explicit scope_sender(scope& m) noexcept : m(m){};
-
- friend auto tag_invoke(execution::get_completion_signatures_t,
- const scope_sender&, auto)
- -> execution::completion_signatures<execution::set_value_t(),
- execution::set_stopped_t()>;
-
- template <execution::receiver R>
- friend auto tag_invoke(execution::connect_t, scope_sender&& self, R r)
- -> scope_operation<R>
- {
- return {self.m, std::move(r)};
- }
-
- private:
- scope& m;
-};
-
-// Most (non-movable) receivers cannot be emplaced without this template magic.
-// Ex. `spawn(std::execution::just())` doesnt' work without this.
-template <typename Fn>
-struct in_place_construct
-{
- Fn fn;
- using result_t = decltype(std::declval<Fn>()());
-
- operator result_t()
- {
- return ((Fn&&)fn)();
- }
-
- explicit in_place_construct(Fn&& fn) : fn(std::move(fn)){};
-};
-
-} // namespace scope_ns
-
-template <execution::sender_of<execution::set_value_t()> Sender>
-void scope::spawn(Sender&& sender)
-{
- // Create a holder of the operational state. Keep it in a unique_ptr
- // so it is cleaned up if there are any exceptions in this function.
- auto s = std::make_unique<
- scope_ns::scope_operation_state<std::decay_t<Sender>>>();
-
- // Associate the state and scope with the receiver and connect to the
- // Sender.
- s->op_state.emplace(scope_ns::in_place_construct{[&] {
- return execution::connect(
- std::forward<Sender>(sender),
- scope_ns::scope_receiver<std::decay_t<Sender>>{s.get(), this});
- }});
-
- started_task();
-
- // Start is noexcept, so it is safe to release the pointer which is now
- // contained in the receiver.
- execution::start(s.release()->op_state.value());
-}
-
-} // namespace sdbusplus::async
diff --git a/meson.build b/meson.build
index 448d167..af8cdf3 100644
--- a/meson.build
+++ b/meson.build
@@ -26,7 +26,6 @@
libsdbusplus_src = files(
'src/async/context.cpp',
'src/async/match.cpp',
- 'src/async/scope.cpp',
'src/bus.cpp',
'src/event.cpp',
'src/exception.cpp',
diff --git a/src/async/context.cpp b/src/async/context.cpp
index 1da888c..85e0200 100644
--- a/src/async/context.cpp
+++ b/src/async/context.cpp
@@ -39,7 +39,10 @@
context& ctx;
event_t::time_resolution timeout{};
- static task<> loop(context& ctx);
+ // TODO: It seems like we should be able to do a normal `task<>` here
+ // but spawn on it compile fails.
+ static exec::basic_task<void, exec::__task::__raw_task_context<void>>
+ loop(context& ctx);
static void wait_once(context& ctx);
};
@@ -99,7 +102,8 @@
context& ctx;
};
-task<> wait_process_completion::loop(context& ctx)
+exec::basic_task<void, exec::__task::__raw_task_context<void>>
+ wait_process_completion::loop(context& ctx)
{
while (!ctx.final_stop.stop_requested())
{
@@ -142,10 +146,7 @@
wait_for_wait_process_stopped();
// Wait for all the internal tasks to complete.
- stdexec::sync_wait(internal_tasks.empty() |
- execution::upon_error([&](auto&& e) {
- pending_exceptions.emplace_back(std::move(e));
- }));
+ stdexec::sync_wait(internal_tasks.on_empty());
// Finish up the loop and join the thread.
// (There shouldn't be anything going on by this point anyhow.)
@@ -162,8 +163,7 @@
void context::worker_run()
{
// Start the sdbus 'wait/process' loop; treat it as an internal task.
- internal_tasks.spawn(details::wait_process_completion::loop(*this) |
- execution::upon_stopped([]() {}));
+ internal_tasks.spawn(details::wait_process_completion::loop(*this));
// Run the execution::run_loop to handle all the tasks.
loop.run();
@@ -212,11 +212,8 @@
}
// Spawn the watch for completion / exceptions.
- internal_tasks.spawn(pending_tasks.empty() |
- execution::then([this]() { spawn_complete(); }) |
- execution::upon_error([this](auto&& e) {
- spawn_complete(std::move(e));
- }));
+ internal_tasks.spawn(pending_tasks.on_empty() |
+ execution::then([this]() { spawn_complete(); }));
}
void context::caller_run()
diff --git a/src/async/scope.cpp b/src/async/scope.cpp
deleted file mode 100644
index d9eb8a7..0000000
--- a/src/async/scope.cpp
+++ /dev/null
@@ -1,96 +0,0 @@
-#include <sdbusplus/async/scope.hpp>
-
-#include <exception>
-
-namespace sdbusplus::async
-{
-scope::~scope() noexcept(false)
-{
- if (pending_count != 0)
- {
- throw std::logic_error(
- "sdbusplus::async::scope destructed while tasks are pending.");
- }
-}
-
-void scope::started_task() noexcept
-{
- std::lock_guard l{lock};
- started = true;
- ++pending_count;
-}
-
-void scope::ended_task(std::exception_ptr&& e) noexcept
-{
- scope_ns::scope_completion* p = nullptr;
-
- {
- std::lock_guard l{lock};
- --pending_count; // decrement count.
-
- if (e)
- {
- pending_exceptions.emplace_back(std::exchange(e, {}));
- }
-
- // If the scope is complete, get the pending completion, if it exists.
- if (!pending_exceptions.empty() || (pending_count == 0))
- {
- p = std::exchange(pending, nullptr);
- }
- }
-
- if (p)
- {
- std::exception_ptr fwd_exception = {};
- if (!pending_exceptions.empty())
- {
- fwd_exception = pending_exceptions.front();
- pending_exceptions.pop_front();
- }
- p->complete(std::move(fwd_exception));
- }
-}
-
-scope_ns::scope_sender scope::empty() noexcept
-{
- return scope_ns::scope_sender(*this);
-}
-
-namespace scope_ns
-{
-void scope_completion::arm() noexcept
-{
- bool done = false;
- std::exception_ptr e{};
-
- {
- std::lock_guard l{s.lock};
- if (!s.started)
- {
- done = false;
- }
- else if (!s.pending_exceptions.empty())
- {
- e = s.pending_exceptions.front();
- s.pending_exceptions.pop_front();
- done = true;
- }
- else if (s.pending_count == 0)
- {
- done = true;
- }
- else
- {
- s.pending = this;
- }
- }
-
- if (done)
- {
- this->complete(std::move(e));
- }
-}
-} // namespace scope_ns
-
-} // namespace sdbusplus::async
diff --git a/test/async/context.cpp b/test/async/context.cpp
index 7d7f6fa..8cdd93c 100644
--- a/test/async/context.cpp
+++ b/test/async/context.cpp
@@ -48,48 +48,6 @@
}
}
-TEST_F(Context, SpawnThrowingTask)
-{
- ctx->spawn(stdexec::just() |
- stdexec::then([]() { throw std::logic_error("Oops"); }));
-
- EXPECT_THROW(runToStop(), std::logic_error);
- ctx->run();
-}
-
-TEST_F(Context, SpawnThrowingCoroutine)
-{
- struct _
- {
- static auto one() -> sdbusplus::async::task<>
- {
- throw std::logic_error("Oops");
- co_return;
- }
- };
-
- ctx->spawn(_::one());
- EXPECT_THROW(runToStop(), std::logic_error);
- ctx->run();
-};
-
-TEST_F(Context, SpawnManyThrowingTasks)
-{
- static constexpr size_t count = 100;
- for (size_t i = 0; i < count; ++i)
- {
- ctx->spawn(stdexec::just() |
- stdexec::then([]() { throw std::logic_error("Oops"); }));
- }
- spawnStop();
-
- for (size_t i = 0; i < count; ++i)
- {
- EXPECT_THROW(ctx->run(), std::logic_error);
- }
- ctx->run();
-}
-
TEST_F(Context, SpawnDelayedTask)
{
using namespace std::literals;
@@ -153,8 +111,7 @@
ctx->spawn(sdbusplus::async::sleep_for(*ctx, 1ms) |
stdexec::then([&m](...) { m.reset(); }));
- EXPECT_THROW(runToStop(), sdbusplus::exception::UnhandledStop);
- EXPECT_NO_THROW(ctx->run());
+ runToStop();
EXPECT_FALSE(ran);
}
@@ -182,7 +139,6 @@
ctx->spawn(sdbusplus::async::sleep_for(*ctx, 1ms) |
stdexec::then([&]() { m.reset(); }));
- EXPECT_THROW(runToStop(), sdbusplus::exception::UnhandledStop);
- EXPECT_NO_THROW(ctx->run());
+ runToStop();
EXPECT_FALSE(ran);
}