Make dbus monitor compatible with phosphor-rest
This patchset makes the dbus monitor compatible with the upstream dbus
monitor, which should help adoption.
Performance seems greatly improved compared to the python
implementation. The example given in the documentation of watching for
sensors and state changes is checked in as a test script
websocket_test.py, and seems to consume less of the CPU than the actual
sensors that get produced (about 4% CPU on my ast2500) when producing 30
sensor updates per second. This can likely be improved in the future by
batching change events, but it seems to be performant enough for the
moment.
Tested: Used test script checked in, and verified webui can register
state change events properly.
Change-Id: I7d4c61d0259b7773eb46df0f59f8fea1c7796450
Signed-off-by: Ed Tanous <ed.tanous@intel.com>
diff --git a/include/dbus_monitor.hpp b/include/dbus_monitor.hpp
index f0b2167..e8b1a32 100644
--- a/include/dbus_monitor.hpp
+++ b/include/dbus_monitor.hpp
@@ -2,73 +2,190 @@
#include <dbus_singleton.hpp>
#include <sdbusplus/bus/match.hpp>
#include <crow/app.h>
+#include <crow/websocket.h>
#include <boost/container/flat_map.hpp>
+#include <boost/container/flat_set.hpp>
+
+namespace nlohmann {
+template <typename... Args>
+struct adl_serializer<sdbusplus::message::variant<Args...>> {
+ static void to_json(json& j, const sdbusplus::message::variant<Args...>& v) {
+ mapbox::util::apply_visitor([&](auto&& val) { j = val; }, v);
+ }
+};
+} // namespace nlohmann
namespace crow {
namespace dbus_monitor {
struct DbusWebsocketSession {
std::vector<std::unique_ptr<sdbusplus::bus::match::match>> matches;
+ boost::container::flat_set<std::string> interfaces;
};
static boost::container::flat_map<crow::websocket::Connection*,
DbusWebsocketSession>
sessions;
-int onPropertyUpdate(sd_bus_message* m, void* userdata,
- sd_bus_error* ret_error) {
+inline int onPropertyUpdate(sd_bus_message* m, void* userdata,
+ sd_bus_error* ret_error) {
if (ret_error == nullptr || sd_bus_error_is_set(ret_error)) {
- BMCWEB_LOG_ERROR << "Sdbus error in on_property_update";
+ BMCWEB_LOG_ERROR << "Got sdbus error on match";
+ return 0;
+ }
+ crow::websocket::Connection* connection =
+ static_cast<crow::websocket::Connection*>(userdata);
+ auto thisSession = sessions.find(connection);
+ if (thisSession == sessions.end()) {
+ BMCWEB_LOG_ERROR << "Couldn't find dbus connection " << connection;
return 0;
}
sdbusplus::message::message message(m);
- std::string objectName;
- std::vector<
- std::pair<std::string, sdbusplus::message::variant<
- std::string, bool, int64_t, uint64_t, double>>>
- values;
- message.read(objectName, values);
- nlohmann::json j;
- const std::string& path = message.get_path();
- for (auto& value : values) {
- mapbox::util::apply_visitor([&](auto&& val) { j[path] = val; },
- value.second);
- }
- std::string dataToSend = j.dump();
+ using VariantType =
+ sdbusplus::message::variant<std::string, bool, int64_t, uint64_t, double>;
+ nlohmann::json j{{"event", message.get_member()},
+ {"path", message.get_path()}};
+ if (strcmp(message.get_member(), "PropertiesChanged") == 0) {
+ std::string interface_name;
+ boost::container::flat_map<std::string, VariantType> values;
+ message.read(interface_name, values);
+ j["properties"] = values;
+ j["interface"] = std::move(interface_name);
- for (const std::pair<crow::websocket::Connection*, DbusWebsocketSession>&
- session : sessions) {
- session.first->sendText(dataToSend);
+ } else if (strcmp(message.get_member(), "InterfacesAdded") == 0) {
+ std::string object_name;
+ boost::container::flat_map<
+ std::string, boost::container::flat_map<std::string, VariantType>>
+ values;
+ message.read(object_name, values);
+ for (const std::pair<std::string,
+ boost::container::flat_map<std::string, VariantType>>&
+ paths : values) {
+ auto it = thisSession->second.interfaces.find(paths.first);
+ if (it != thisSession->second.interfaces.end()) {
+ j["interfaces"][paths.first] = paths.second;
+ }
+ }
+ } else {
+ BMCWEB_LOG_CRITICAL << "message " << message.get_member()
+ << " was unexpected";
+ return 0;
}
+
+ connection->sendText(j.dump());
+ return 0;
};
template <typename... Middlewares>
void requestRoutes(Crow<Middlewares...>& app) {
- BMCWEB_ROUTE(app, "/dbus_monitor")
+ BMCWEB_ROUTE(app, "/subscribe")
.websocket()
.onopen([&](crow::websocket::Connection& conn) {
- std::string pathNamespace(conn.req.urlParams.get("path_namespace"));
- if (pathNamespace.empty()) {
- conn.sendText(
- nlohmann::json({"error", "Did not specify path_namespace"})
- .dump());
- conn.close("error");
- }
+ BMCWEB_LOG_DEBUG << "Connection " << &conn << " opened";
sessions[&conn] = DbusWebsocketSession();
- std::string matchString(
- "type='signal',"
- "interface='org.freedesktop.DBus.Properties',"
- "path_namespace='" +
- pathNamespace + "'");
- sessions[&conn].matches.emplace_back(
- std::make_unique<sdbusplus::bus::match::match>(
- *crow::connections::systemBus, matchString, onPropertyUpdate));
})
.onclose([&](crow::websocket::Connection& conn,
const std::string& reason) { sessions.erase(&conn); })
.onmessage([&](crow::websocket::Connection& conn, const std::string& data,
bool is_binary) {
- BMCWEB_LOG_ERROR << "Got unexpected message from client on sensorws";
+ DbusWebsocketSession& thisSession = sessions[&conn];
+ BMCWEB_LOG_DEBUG << "Connection " << &conn << " recevied " << data;
+ nlohmann::json j = nlohmann::json::parse(data, nullptr, false);
+ if (j.is_discarded()) {
+ BMCWEB_LOG_ERROR << "Unable to parse json data for monitor";
+ conn.close("Unable to parse json request");
+ return;
+ }
+ nlohmann::json::iterator interfaces = j.find("interfaces");
+ if (interfaces != j.end()) {
+ thisSession.interfaces.reserve(interfaces->size());
+ for (auto& interface : *interfaces) {
+ const std::string* str = interface.get_ptr<const std::string*>();
+ if (str != nullptr) {
+ thisSession.interfaces.insert(*str);
+ }
+ }
+ }
+
+ nlohmann::json::iterator paths = j.find("paths");
+ if (paths != j.end()) {
+ int interfaceCount = thisSession.interfaces.size();
+ if (interfaceCount == 0) {
+ interfaceCount = 1;
+ }
+ // Reserve our matches upfront. For each path there is 1 for
+ // interfacesAdded, and InterfaceCount number for PropertiesChanged
+ thisSession.matches.reserve(thisSession.matches.size() +
+ paths->size() * (1 + interfaceCount));
+ }
+ std::string object_manager_match_string;
+ std::string properties_match_string;
+ std::string object_manager_interfaces_match_string;
+ // These regexes derived on the rules here:
+ // https://dbus.freedesktop.org/doc/dbus-specification.html#message-protocol-names
+ std::regex validPath("^/([A-Za-z0-9_]+/?)*$");
+ std::regex validInterface(
+ "^[A-Za-z_][A-Za-z0-9_]*(\\.[A-Za-z_][A-Za-z0-9_]*)+$");
+
+ for (const auto& thisPath : *paths) {
+ const std::string* thisPathString =
+ thisPath.get_ptr<const std::string*>();
+ if (thisPathString == nullptr) {
+ BMCWEB_LOG_ERROR << "subscribe path isn't a string?";
+ conn.close();
+ return;
+ }
+ if (!std::regex_match(*thisPathString, validPath)) {
+ BMCWEB_LOG_ERROR << "Invalid path name " << *thisPathString;
+ conn.close();
+ return;
+ }
+ properties_match_string =
+ ("type='signal',"
+ "interface='org.freedesktop.DBus.Properties',"
+ "path_namespace='" +
+ *thisPathString +
+ "',"
+ "member='PropertiesChanged'");
+ // If interfaces weren't specified, add a single match for all
+ // interfaces
+ if (thisSession.interfaces.size() == 0) {
+ BMCWEB_LOG_DEBUG << "Creating match " << properties_match_string;
+
+ thisSession.matches.emplace_back(
+ std::make_unique<sdbusplus::bus::match::match>(
+ *crow::connections::systemBus, properties_match_string,
+ onPropertyUpdate, &conn));
+ } else {
+ // If interfaces were specified, add a match for each interface
+ for (const std::string& interface : thisSession.interfaces) {
+ if (!std::regex_match(interface, validInterface)) {
+ BMCWEB_LOG_ERROR << "Invalid interface name " << interface;
+ conn.close();
+ return;
+ }
+ std::string ifaceMatchString =
+ properties_match_string + ",arg0='" + interface + "'";
+ BMCWEB_LOG_DEBUG << "Creating match " << ifaceMatchString;
+ thisSession.matches.emplace_back(
+ std::make_unique<sdbusplus::bus::match::match>(
+ *crow::connections::systemBus, ifaceMatchString,
+ onPropertyUpdate, &conn));
+ }
+ }
+ object_manager_match_string =
+ ("type='signal',"
+ "interface='org.freedesktop.DBus.ObjectManager',"
+ "path_namespace='" +
+ *thisPathString +
+ "',"
+ "member='InterfacesAdded'");
+ BMCWEB_LOG_DEBUG << "Creating match " << object_manager_match_string;
+ thisSession.matches.emplace_back(
+ std::make_unique<sdbusplus::bus::match::match>(
+ *crow::connections::systemBus, object_manager_match_string,
+ onPropertyUpdate, &conn));
+ }
});
}
} // namespace dbus_monitor
diff --git a/scripts/websocket_test.py b/scripts/websocket_test.py
new file mode 100644
index 0000000..8d9c8ae
--- /dev/null
+++ b/scripts/websocket_test.py
@@ -0,0 +1,21 @@
+import json
+import ssl
+import websocket
+
+websocket.enableTrace(True)
+
+ws = websocket.create_connection('wss://10.243.48.93:18080/subscribe',
+ sslopt={"cert_reqs": ssl.CERT_NONE},
+ cookie="XSRF-TOKEN=m0KhYNbxFmUEI4Sr1I22; SESSION=0mdwzoQy3gggQxW3vrEw")
+request = json.dumps({
+ "paths": ["/xyz/openbmc_project/logging", "/xyz/openbmc_project/sensors"],
+ "interfaces": ["xyz.openbmc_project.Logging.Entry", "xyz.openbmc_project.Sensor.Value"]
+})
+
+ws.send(request)
+print("Sent")
+print("Receiving...")
+while True:
+ result = ws.recv()
+ print("Received '%s'" % result)
+ws.close()