async: add scope
In order to be able to create multiple subtasks as a collection
and in order to make sure we keep good track of any tasks that
are created by the context, add a `scope` as a container of subtasks.
This is partially modelled after the C++ P2519 proposal.
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I6e99b2fa2829d80c320491991f7e038f122963a9
diff --git a/include/sdbusplus/async.hpp b/include/sdbusplus/async.hpp
index 70b8abf..6c9e834 100644
--- a/include/sdbusplus/async.hpp
+++ b/include/sdbusplus/async.hpp
@@ -4,5 +4,6 @@
#include <sdbusplus/async/execution.hpp>
#include <sdbusplus/async/match.hpp>
#include <sdbusplus/async/proxy.hpp>
+#include <sdbusplus/async/scope.hpp>
#include <sdbusplus/async/task.hpp>
#include <sdbusplus/async/timer.hpp>
diff --git a/include/sdbusplus/async/context.hpp b/include/sdbusplus/async/context.hpp
index a59dc72..02235d3 100644
--- a/include/sdbusplus/async/context.hpp
+++ b/include/sdbusplus/async/context.hpp
@@ -1,6 +1,7 @@
#pragma once
#include <sdbusplus/async/execution.hpp>
+#include <sdbusplus/async/scope.hpp>
#include <sdbusplus/async/task.hpp>
#include <sdbusplus/bus.hpp>
#include <sdbusplus/event.hpp>
@@ -58,6 +59,16 @@
template <execution::sender_of<execution::set_value_t()> Snd>
void run(Snd&& startup);
+ /** Spawn a Sender to run on the context.
+ *
+ * @param[in] sender - The Sender to run.
+ */
+ template <execution::sender_of<execution::set_value_t()> Snd>
+ void spawn(Snd&& sender)
+ {
+ pending_tasks.spawn(std::forward<Snd>(sender));
+ }
+
bus_t& get_bus() noexcept
{
return bus;
@@ -76,6 +87,7 @@
bus_t bus;
event_source_t dbus_source;
event_t event_loop{};
+ scope pending_tasks{};
/** The async run-loop from std::execution. */
execution::run_loop loop{};
diff --git a/include/sdbusplus/async/scope.hpp b/include/sdbusplus/async/scope.hpp
new file mode 100644
index 0000000..c217a01
--- /dev/null
+++ b/include/sdbusplus/async/scope.hpp
@@ -0,0 +1,161 @@
+#pragma once
+
+#include <sdbusplus/async/execution.hpp>
+
+#include <atomic>
+
+namespace sdbusplus::async
+{
+
+namespace scope_ns
+{
+template <execution::sender Sender>
+struct scope_receiver;
+}
+
+/** A collection of tasks.
+ *
+ * It is sometimes useful to run a group of tasks, possibly even detached
+ * from an execution context, but the only option in std::execution is
+ * start_detached, which does not have any mechanism to deal with errors
+ * from those tasks and can potentially leak tasks. Alternatively, you
+ * can save the result of `execution::connect`, but you need to maintain a
+ * lifetime of it until the connected-task completes.
+ *
+ * The scope maintains the lifetime of the tasks it `spawns` so that the
+ * operational-state created by `connect` is preserved (and deleted) as
+ * appropriate.
+ */
+struct scope
+{
+ scope() = default;
+
+ // The scope destructor can throw if it was destructed while there are
+ // outstanding tasks.
+ ~scope() noexcept(false);
+
+ /** Spawn a Sender to run on the scope.
+ *
+ * @param[in] sender - The sender to run.
+ */
+ template <execution::sender_of<execution::set_value_t()> Sender>
+ void spawn(Sender&& sender);
+
+ template <execution::sender>
+ friend struct scope_ns::scope_receiver;
+
+ private:
+ void started_task() noexcept;
+ void ended_task() noexcept;
+
+ std::atomic<size_t> count = 0;
+};
+
+namespace scope_ns
+{
+
+/** The Sender-completion Reciever. */
+template <execution::sender Sender>
+struct scope_receiver
+{
+ template <typename OpState>
+ explicit scope_receiver(OpState* os, scope* s) : op_state(os), s(s)
+ {}
+
+ friend void tag_invoke(execution::set_value_t, scope_receiver&& self,
+ auto&&...) noexcept
+ {
+ self.complete();
+ }
+
+ friend void tag_invoke(execution::set_error_t, scope_receiver&& self,
+ auto&&) noexcept
+ {
+ std::terminate(); // TODO: save the exception back into the scope.
+ self.complete();
+ }
+
+ friend void tag_invoke(execution::set_stopped_t,
+ scope_receiver&& self) noexcept
+ {
+ // std::terminate(); // TODO: this implies a child had an
+ // unhandled_stop. Need to turn it into an exception.
+ self.complete();
+ }
+
+ friend decltype(auto) tag_invoke(execution::get_env_t,
+ const scope_receiver& self) noexcept
+ {
+ return self;
+ }
+
+ void complete() noexcept;
+
+ void* op_state;
+ scope* s = nullptr;
+};
+
+/** The holder of the connect operational-state. */
+template <execution::sender Sender>
+struct scope_operation_state
+{
+ using state_t = execution::connect_result_t<Sender, scope_receiver<Sender>>;
+ std::optional<state_t> op_state;
+};
+
+template <execution::sender Sender>
+void scope_receiver<Sender>::complete() noexcept
+{
+ // The Sender is complete, so we need to clean up the saved operational
+ // state.
+
+ // Save the scope (since we're going to delete `this`).
+ auto owning_scope = s;
+
+ // Delete the operational state, which triggers deleting this.
+ delete static_cast<scope_ns::scope_operation_state<Sender>*>(op_state);
+
+ // Inform the scope that a task has completed.
+ owning_scope->ended_task();
+}
+
+// Most (non-movable) receivers cannot be emplaced without this template magic.
+// Ex. `spawn(std::execution::just())` doesnt' work without this.
+template <typename Fn>
+struct in_place_construct
+{
+ Fn fn;
+ using result_t = decltype(std::declval<Fn>()());
+
+ operator result_t()
+ {
+ return ((Fn &&) fn)();
+ }
+};
+
+} // namespace scope_ns
+
+template <execution::sender_of<execution::set_value_t()> Sender>
+void scope::spawn(Sender&& sender)
+{
+ // Create a holder of the operational state. Keep it in a unique_ptr
+ // so it is cleaned up if there are any exceptions in this function.
+ auto s = std::make_unique<
+ scope_ns::scope_operation_state<std::decay_t<Sender>>>();
+
+ // Associate the state and scope with the receiver and connect to the
+ // Sender.
+ s->op_state.emplace(scope_ns::in_place_construct{[&] {
+ return execution::connect(
+ std::forward<Sender>(sender),
+ scope_ns::scope_receiver<std::decay_t<Sender>>{s.get(), this});
+ }});
+
+ started_task();
+
+ // Start is noexcept, so it is safe to release the pointer which is now
+ // contained in the receiver.
+ execution::start(s.release()->op_state.value());
+}
+
+} // namespace sdbusplus::async
diff --git a/meson.build b/meson.build
index af8cdf3..448d167 100644
--- a/meson.build
+++ b/meson.build
@@ -26,6 +26,7 @@
libsdbusplus_src = files(
'src/async/context.cpp',
'src/async/match.cpp',
+ 'src/async/scope.cpp',
'src/bus.cpp',
'src/event.cpp',
'src/exception.cpp',
diff --git a/src/async/context.cpp b/src/async/context.cpp
index 007a266..38d3a89 100644
--- a/src/async/context.cpp
+++ b/src/async/context.cpp
@@ -164,10 +164,10 @@
// This shouldn't start detached because we want to be able to forward
// failures back to the 'run'. execution::ensure_started isn't
// implemented yet, so we don't have a lot of other options.
- execution::start_detached(std::move(startup));
+ spawn(std::move(startup));
// Also start up the sdbus 'wait/process' loop.
- execution::start_detached(details::wait_process_completion::loop(*this));
+ spawn(details::wait_process_completion::loop(*this));
// Run the execution::run_loop to handle all the tasks.
loop.run();
diff --git a/src/async/scope.cpp b/src/async/scope.cpp
new file mode 100644
index 0000000..efdc5f6
--- /dev/null
+++ b/src/async/scope.cpp
@@ -0,0 +1,26 @@
+#include <sdbusplus/async/scope.hpp>
+
+#include <exception>
+
+namespace sdbusplus::async
+{
+scope::~scope() noexcept(false)
+{
+ if (count != 0)
+ {
+ throw std::logic_error(
+ "sdbusplus::async::scope destructed while tasks are pending.");
+ }
+}
+
+void scope::started_task() noexcept
+{
+ ++count;
+}
+
+void scope::ended_task() noexcept
+{
+ --count;
+}
+
+} // namespace sdbusplus::async
diff --git a/test/async/context.cpp b/test/async/context.cpp
index 60d3d8d..83cfde4 100644
--- a/test/async/context.cpp
+++ b/test/async/context.cpp
@@ -8,3 +8,13 @@
ctx.run(std::execution::just() |
std::execution::then([&ctx]() { ctx.request_stop(); }));
}
+
+TEST(Context, SpawnedTask)
+{
+ sdbusplus::async::context ctx;
+
+ ctx.spawn(std::execution::just());
+
+ ctx.run(std::execution::just() |
+ std::execution::then([&ctx]() { ctx.request_stop(); }));
+}