message: Add call_async method
This makes it possible to perform an async method call that is agnostic
to the event loop running the sd_bus state machine.
Change-Id: I32bc0fdf89c44cc6bab1c4622b143d6e06098659
Signed-off-by: William A. Kennington III <wak@google.com>
diff --git a/include/sdbusplus/message.hpp b/include/sdbusplus/message.hpp
index d737939..77a54c3 100644
--- a/include/sdbusplus/message.hpp
+++ b/include/sdbusplus/message.hpp
@@ -7,9 +7,13 @@
#include <sdbusplus/message/native_types.hpp>
#include <sdbusplus/message/read.hpp>
#include <sdbusplus/sdbus.hpp>
+#include <sdbusplus/slot.hpp>
+#include <exception>
#include <memory>
+#include <optional>
#include <type_traits>
+#include <utility>
namespace sdbusplus
{
@@ -43,6 +47,15 @@
/* @brief Alias 'msg' to a unique_ptr type for auto-release. */
using msg = std::unique_ptr<sd_bus_message, MsgDeleter>;
+template <typename CbT>
+int call_async_cb(sd_bus_message* m, void* userdata, sd_bus_error*) noexcept;
+
+template <typename CbT>
+void call_async_del(void* userdata) noexcept
+{
+ delete reinterpret_cast<CbT*>(userdata);
+}
+
} // namespace details
/** @class message
@@ -388,6 +401,51 @@
return message(reply, _intf, std::false_type());
}
+ /** @brief Perform an async message call.
+ *
+ * @param[in] cb - The callback to run when the response is available.
+ * @param[in] timeout_us - The timeout for the method call.
+ *
+ * @return The slot handle that manages the lifetime of the call object.
+ */
+ template <typename Cb>
+ [[nodiscard]] slot::slot
+ call_async(Cb&& cb, std::optional<SdBusDuration> timeout = std::nullopt)
+ {
+ sd_bus_slot* slot;
+ auto timeout_us = timeout ? timeout->count() : 0;
+ using CbT = std::remove_cv_t<std::remove_reference_t<Cb>>;
+ int r = _intf->sd_bus_call_async(nullptr, &slot, get(),
+ details::call_async_cb<CbT>, nullptr,
+ timeout_us);
+ if (r < 0)
+ {
+ throw exception::SdBusError(-r, "sd_bus_call_async");
+ }
+ slot::slot ret(std::move(slot));
+ if constexpr (std::is_pointer_v<CbT>)
+ {
+ _intf->sd_bus_slot_set_userdata(slot, reinterpret_cast<void*>(cb));
+ }
+ else if constexpr (std::is_function_v<CbT>)
+ {
+ _intf->sd_bus_slot_set_userdata(slot, reinterpret_cast<void*>(&cb));
+ }
+ else
+ {
+ r = _intf->sd_bus_slot_set_destroy_callback(
+ slot, details::call_async_del<CbT>);
+ if (r < 0)
+ {
+ throw exception::SdBusError(-r,
+ "sd_bus_slot_set_destroy_callback");
+ }
+ _intf->sd_bus_slot_set_userdata(slot,
+ new CbT(std::forward<Cb>(cb)));
+ }
+ return ret;
+ }
+
friend struct sdbusplus::bus::bus;
/** @brief Get a pointer to the owned 'msgp_t'.
@@ -404,6 +462,32 @@
details::msg _msg;
};
+namespace details
+{
+
+template <typename CbT>
+int call_async_cb(sd_bus_message* m, void* userdata, sd_bus_error*) noexcept
+{
+ try
+ {
+ if constexpr (std::is_pointer_v<CbT>)
+ {
+ (*reinterpret_cast<CbT>(userdata))(message(m));
+ }
+ else
+ {
+ (*reinterpret_cast<CbT*>(userdata))(message(m));
+ }
+ }
+ catch (...)
+ {
+ std::terminate();
+ }
+ return 1;
+}
+
+} // namespace details
+
} // namespace message
} // namespace sdbusplus
diff --git a/include/sdbusplus/sdbus.hpp b/include/sdbusplus/sdbus.hpp
index 25941da..783bd21 100644
--- a/include/sdbusplus/sdbus.hpp
+++ b/include/sdbusplus/sdbus.hpp
@@ -34,6 +34,11 @@
sd_bus_error* ret_error,
sd_bus_message** reply) = 0;
+ virtual int sd_bus_call_async(sd_bus* bus, sd_bus_slot** slot,
+ sd_bus_message* m,
+ sd_bus_message_handler_t callback,
+ void* userdata, uint64_t usec) = 0;
+
virtual int sd_bus_detach_event(sd_bus* bus) = 0;
virtual int sd_bus_emit_interfaces_added_strv(sd_bus* bus, const char* path,
@@ -138,6 +143,12 @@
virtual int sd_bus_message_verify_type(sd_bus_message* m, char type,
const char* contents) = 0;
+ virtual int sd_bus_slot_set_destroy_callback(sd_bus_slot* slot,
+ sd_bus_destroy_t callback) = 0;
+
+ virtual void* sd_bus_slot_set_userdata(sd_bus_slot* slot,
+ void* userdata) = 0;
+
virtual int sd_bus_process(sd_bus* bus, sd_bus_message** r) = 0;
virtual sd_bus* sd_bus_ref(sd_bus* bus) = 0;
@@ -193,6 +204,13 @@
return ::sd_bus_call(bus, m, usec, ret_error, reply);
}
+ int sd_bus_call_async(sd_bus* bus, sd_bus_slot** slot, sd_bus_message* m,
+ sd_bus_message_handler_t callback, void* userdata,
+ uint64_t usec) override
+ {
+ return ::sd_bus_call_async(bus, slot, m, callback, userdata, usec);
+ }
+
int sd_bus_detach_event(sd_bus* bus) override
{
return ::sd_bus_detach_event(bus);
@@ -462,6 +480,17 @@
return ::sd_bus_message_verify_type(m, type, contents);
}
+ int sd_bus_slot_set_destroy_callback(sd_bus_slot* slot,
+ sd_bus_destroy_t callback) override
+ {
+ return ::sd_bus_slot_set_destroy_callback(slot, callback);
+ }
+
+ void* sd_bus_slot_set_userdata(sd_bus_slot* slot, void* userdata) override
+ {
+ return ::sd_bus_slot_set_userdata(slot, userdata);
+ }
+
int sd_bus_process(sd_bus* bus, sd_bus_message** r) override
{
return ::sd_bus_process(bus, r);
diff --git a/include/sdbusplus/test/sdbus_mock.hpp b/include/sdbusplus/test/sdbus_mock.hpp
index df312e2..e4df6a3 100644
--- a/include/sdbusplus/test/sdbus_mock.hpp
+++ b/include/sdbusplus/test/sdbus_mock.hpp
@@ -21,6 +21,10 @@
MOCK_METHOD3(sd_bus_attach_event, int(sd_bus*, sd_event*, int));
MOCK_METHOD5(sd_bus_call, int(sd_bus*, sd_bus_message*, uint64_t,
sd_bus_error*, sd_bus_message**));
+ MOCK_METHOD(int, sd_bus_call_async,
+ (sd_bus*, sd_bus_slot**, sd_bus_message*,
+ sd_bus_message_handler_t, void*, uint64_t),
+ (override));
MOCK_METHOD1(sd_bus_detach_event, int(sd_bus*));
MOCK_METHOD3(sd_bus_emit_interfaces_added_strv,
@@ -107,6 +111,11 @@
MOCK_METHOD3(sd_bus_message_verify_type,
int(sd_bus_message*, char, const char*));
+ MOCK_METHOD(int, sd_bus_slot_set_destroy_callback,
+ (sd_bus_slot*, sd_bus_destroy_t), (override));
+ MOCK_METHOD(void*, sd_bus_slot_set_userdata, (sd_bus_slot*, void*),
+ (override));
+
MOCK_METHOD2(sd_bus_process, int(sd_bus*, sd_bus_message**));
MOCK_METHOD1(sd_bus_ref, sd_bus*(sd_bus*));
MOCK_METHOD3(sd_bus_request_name, int(sd_bus*, const char*, uint64_t));
diff --git a/test/meson.build b/test/meson.build
index ac0516e..61eb737 100644
--- a/test/meson.build
+++ b/test/meson.build
@@ -24,6 +24,7 @@
'bus/match',
'exception/sdbus_error',
'message/append',
+ 'message/call',
'message/read',
'message/native_types',
'message/types',
diff --git a/test/message/call.cpp b/test/message/call.cpp
new file mode 100644
index 0000000..0ef015f
--- /dev/null
+++ b/test/message/call.cpp
@@ -0,0 +1,102 @@
+#include <sdbusplus/bus.hpp>
+#include <sdbusplus/message.hpp>
+
+#include <chrono>
+#include <string>
+
+#include <gtest/gtest.h>
+
+namespace sdbusplus
+{
+namespace message
+{
+
+using namespace std::literals::chrono_literals;
+
+std::string globalId;
+
+void setGlobalId(message&& m)
+{
+ m.read(globalId);
+}
+
+message newBusIdReq(bus::bus& b)
+{
+ return b.new_method_call("org.freedesktop.DBus", "/org/freedesktop/DBus",
+ "org.freedesktop.DBus", "GetId");
+}
+
+std::string syncBusId(bus::bus& b)
+{
+ std::string ret;
+ newBusIdReq(b).call().read(ret);
+ return ret;
+}
+
+TEST(CallAsync, Function)
+{
+ auto b = bus::new_default();
+ globalId = "";
+ while (b.process_discard())
+ ;
+ auto slot = newBusIdReq(b).call_async(setGlobalId);
+ b.wait(1s);
+ b.process_discard();
+ EXPECT_EQ(syncBusId(b), globalId);
+}
+
+TEST(CallAsync, FunctionPointer)
+{
+ auto b = bus::new_default();
+ globalId = "";
+ while (b.process_discard())
+ ;
+ auto slot = newBusIdReq(b).call_async(&setGlobalId);
+ b.wait(1s);
+ b.process_discard();
+ EXPECT_EQ(syncBusId(b), globalId);
+}
+
+TEST(CallAsync, Lambda)
+{
+ auto b = bus::new_default();
+ std::string id;
+ while (b.process_discard())
+ ;
+ auto slot = newBusIdReq(b).call_async([&](message&& m) { m.read(id); });
+ b.wait(1s);
+ b.process_discard();
+ EXPECT_EQ(syncBusId(b), id);
+}
+
+TEST(CallAsync, SlotDrop)
+{
+ auto b = bus::new_default();
+ globalId = "";
+ while (b.process_discard())
+ ;
+ {
+ auto slot = newBusIdReq(b).call_async(setGlobalId);
+ }
+ b.wait(1s);
+ b.process_discard();
+ EXPECT_EQ("", globalId);
+}
+
+TEST(CallAsync, ExceptionCaught)
+{
+ EXPECT_DEATH(
+ [] {
+ auto b = bus::new_bus();
+ while (b.process_discard())
+ ;
+ auto slot = newBusIdReq(b).call_async(
+ [&](message&&) { throw std::runtime_error("testerror"); });
+ b.wait(1s);
+ b.process_discard();
+ }(),
+ "testerror");
+}
+
+} // namespace message
+} // namespace sdbusplus