Add SSE style subscription support to eventservice
This commit adds the SSE style eventservice subscription style event
Using this, end user can subscribe for Redfish event logs using GET
on SSE uris from browser.
Tested:
- From Browser did GET on above SSE URI and
generated some Redfish event logs(power cycle)
and saw redfish event logs streaming on browser.
- After SSE registration, Check Subscription collections
and GET on individual subscription and saw desired
response.
- Ran RedfishValidation and its passed.
Change-Id: I7f4b7a34974080739c4ba968ed570489af0474de
Signed-off-by: AppaRao Puli <apparao.puli@linux.intel.com>
Signed-off-by: P Dheeraj Srujan Kumar <p.dheeraj.srujan.kumar@intel.com>
Signed-off-by: Ed Tanous <edtanous@google.com>
diff --git a/redfish-core/include/event_service_manager.hpp b/redfish-core/include/event_service_manager.hpp
index 7ae47dd..ffd053f 100644
--- a/redfish-core/include/event_service_manager.hpp
+++ b/redfish-core/include/event_service_manager.hpp
@@ -24,6 +24,7 @@
#include "registries.hpp"
#include "registries/base_message_registry.hpp"
#include "registries/openbmc_message_registry.hpp"
+#include "registries/privilege_registry.hpp"
#include "registries/task_event_message_registry.hpp"
#include "server_sent_events.hpp"
#include "str_utility.hpp"
@@ -38,6 +39,7 @@
#include <boost/url/format.hpp>
#include <sdbusplus/bus/match.hpp>
+#include <algorithm>
#include <cstdlib>
#include <ctime>
#include <fstream>
@@ -53,9 +55,13 @@
static constexpr const char* eventFormatType = "Event";
static constexpr const char* metricReportFormatType = "MetricReport";
+static constexpr const char* subscriptionTypeSSE = "SSE";
static constexpr const char* eventServiceFile =
"/var/lib/bmcweb/eventservice_config.json";
+static constexpr const uint8_t maxNoOfSubscriptions = 20;
+static constexpr const uint8_t maxNoOfSSESubscriptions = 10;
+
namespace registries
{
inline std::span<const MessageEntry>
@@ -382,15 +388,20 @@
boost::asio::io_context& ioc) :
host(inHost),
port(inPort), policy(std::make_shared<crow::ConnectionPolicy>()),
- client(ioc, policy), path(inPath), uriProto(inUriProto)
+ path(inPath), uriProto(inUriProto)
{
+ client.emplace(ioc, policy);
// Subscription constructor
policy->invalidResp = retryRespHandler;
}
+ explicit Subscription(crow::sse_socket::Connection& connIn) :
+ sseConn(&connIn)
+ {}
+
~Subscription() = default;
- bool sendEvent(std::string& msg)
+ bool sendEvent(std::string&& msg)
{
persistent_data::EventServiceConfig eventServiceConfig =
persistent_data::EventServiceStore::getInstance()
@@ -402,13 +413,17 @@
bool useSSL = (uriProto == "https");
// A connection pool will be created if one does not already exist
- client.sendData(msg, host, port, path, useSSL, httpHeaders,
- boost::beast::http::verb::post);
- eventSeqNum++;
+ if (client)
+ {
+ client->sendData(std::move(msg), host, port, path, useSSL,
+ httpHeaders, boost::beast::http::verb::post);
+ return true;
+ }
if (sseConn != nullptr)
{
- sseConn->sendData(eventSeqNum, msg);
+ eventSeqNum++;
+ sseConn->sendEvent(std::to_string(eventSeqNum), msg);
}
return true;
}
@@ -437,7 +452,7 @@
std::string strMsg = msg.dump(2, ' ', true,
nlohmann::json::error_handler_t::replace);
- return this->sendEvent(strMsg);
+ return sendEvent(std::move(strMsg));
}
#ifndef BMCWEB_ENABLE_REDFISH_DBUS_LOG_ENTRIES
@@ -503,10 +518,10 @@
msg["Id"] = std::to_string(eventSeqNum);
msg["Name"] = "Event Log";
msg["Events"] = logEntryArray;
-
std::string strMsg = msg.dump(2, ' ', true,
nlohmann::json::error_handler_t::replace);
- this->sendEvent(strMsg);
+ sendEvent(std::move(strMsg));
+ eventSeqNum++;
}
#endif
@@ -546,7 +561,7 @@
std::string strMsg = msg.dump(2, ' ', true,
nlohmann::json::error_handler_t::replace);
- this->sendEvent(strMsg);
+ sendEvent(std::move(strMsg));
}
void updateRetryConfig(uint32_t retryAttempts,
@@ -561,15 +576,32 @@
return eventSeqNum;
}
+ void setSubscriptionId(const std::string& id2)
+ {
+ BMCWEB_LOG_DEBUG << "Subscription ID: " << id2;
+ subId = id2;
+ }
+
+ std::string getSubscriptionId()
+ {
+ return subId;
+ }
+
+ bool matchSseId(const crow::sse_socket::Connection& thisConn)
+ {
+ return &thisConn == sseConn;
+ }
+
private:
+ std::string subId;
uint64_t eventSeqNum = 1;
std::string host;
uint16_t port = 0;
std::shared_ptr<crow::ConnectionPolicy> policy;
- crow::HttpClient client;
+ crow::sse_socket::Connection* sseConn = nullptr;
+ std::optional<crow::HttpClient> client;
std::string path;
std::string uriProto;
- std::shared_ptr<crow::ServerSentEvents> sseConn = nullptr;
// Check used to indicate what response codes are valid as part of our retry
// policy. 2XX is considered acceptable
@@ -828,8 +860,8 @@
for (const auto& it :
EventServiceManager::getInstance().subscriptionsMap)
{
- std::shared_ptr<Subscription> entry = it.second;
- entry->updateRetryConfig(retryAttempts, retryTimeoutInterval);
+ Subscription& entry = *it.second;
+ entry.updateRetryConfig(retryAttempts, retryTimeoutInterval);
}
}
}
@@ -942,6 +974,8 @@
// Update retry configuration.
subValue->updateRetryConfig(retryAttempts, retryTimeoutInterval);
+ // Set Subscription ID for back trace
+ subValue->setSubscriptionId(id);
return id;
}
@@ -966,11 +1000,38 @@
}
}
- size_t getNumberOfSubscriptions()
+ void deleteSseSubscription(const crow::sse_socket::Connection& thisConn)
+ {
+ for (const auto& it : subscriptionsMap)
+ {
+ std::shared_ptr<Subscription> entry = it.second;
+ bool entryIsThisConn = entry->matchSseId(thisConn);
+ if (entryIsThisConn)
+ {
+ persistent_data::EventServiceStore::getInstance()
+ .subscriptionsConfigMap.erase(
+ it.second->getSubscriptionId());
+ return;
+ }
+ }
+ }
+
+ size_t getNumberOfSubscriptions() const
{
return subscriptionsMap.size();
}
+ size_t getNumberOfSSESubscriptions() const
+ {
+ auto size = std::count_if(
+ subscriptionsMap.begin(), subscriptionsMap.end(),
+ [](const std::pair<std::string, std::shared_ptr<Subscription>>&
+ entry) {
+ return (entry.second->subscriptionType == subscriptionTypeSSE);
+ });
+ return static_cast<size_t>(size);
+ }
+
std::vector<std::string> getAllIDs()
{
std::vector<std::string> idList;
@@ -981,7 +1042,7 @@
return idList;
}
- bool isDestinationExist(const std::string& destUrl)
+ bool isDestinationExist(const std::string& destUrl) const
{
for (const auto& it : subscriptionsMap)
{
@@ -997,7 +1058,7 @@
bool sendTestEventLog()
{
- for (const auto& it : this->subscriptionsMap)
+ for (const auto& it : subscriptionsMap)
{
std::shared_ptr<Subscription> entry = it.second;
if (!entry->sendTestEventLog())
@@ -1027,7 +1088,7 @@
eventRecord.emplace_back(std::move(eventMessage));
- for (const auto& it : this->subscriptionsMap)
+ for (const auto& it : subscriptionsMap)
{
std::shared_ptr<Subscription> entry = it.second;
bool isSubscribed = false;
@@ -1062,7 +1123,7 @@
std::string strMsg = msgJson.dump(
2, ' ', true, nlohmann::json::error_handler_t::replace);
- entry->sendEvent(strMsg);
+ entry->sendEvent(std::move(strMsg));
eventId++; // increament the eventId
}
else
@@ -1073,7 +1134,7 @@
}
void sendBroadcastMsg(const std::string& broadcastMsg)
{
- for (const auto& it : this->subscriptionsMap)
+ for (const auto& it : subscriptionsMap)
{
std::shared_ptr<Subscription> entry = it.second;
nlohmann::json msgJson;
@@ -1085,7 +1146,7 @@
std::string strMsg = msgJson.dump(
2, ' ', true, nlohmann::json::error_handler_t::replace);
- entry->sendEvent(strMsg);
+ entry->sendEvent(std::move(strMsg));
}
}
@@ -1188,7 +1249,7 @@
return;
}
- for (const auto& it : this->subscriptionsMap)
+ for (const auto& it : subscriptionsMap)
{
std::shared_ptr<Subscription> entry = it.second;
if (entry->eventFormatType == "Event")
diff --git a/redfish-core/include/redfish.hpp b/redfish-core/include/redfish.hpp
index 2bf0f45..8e7b411 100644
--- a/redfish-core/include/redfish.hpp
+++ b/redfish-core/include/redfish.hpp
@@ -24,6 +24,7 @@
#include "environment_metrics.hpp"
#include "ethernet.hpp"
#include "event_service.hpp"
+#include "eventservice_sse.hpp"
#include "fabric_adapters.hpp"
#include "hypervisor_system.hpp"
#include "log_services.hpp"
@@ -222,6 +223,7 @@
requestRoutesTaskCollection(app);
requestRoutesTask(app);
requestRoutesEventService(app);
+ requestRoutesEventServiceSse(app);
requestRoutesEventDestinationCollection(app);
requestRoutesEventDestination(app);
requestRoutesFabricAdapters(app);
diff --git a/redfish-core/include/redfish_aggregator.hpp b/redfish-core/include/redfish_aggregator.hpp
index 0407333..3b48bda 100644
--- a/redfish-core/include/redfish_aggregator.hpp
+++ b/redfish-core/include/redfish_aggregator.hpp
@@ -702,10 +702,10 @@
std::bind_front(processResponse, prefix, asyncResp);
std::string data = thisReq.req.body();
- client.sendDataWithCallback(data, std::string(sat->second.host()),
- sat->second.port_number(), targetURI,
- false /*useSSL*/, thisReq.fields(),
- thisReq.method(), cb);
+ client.sendDataWithCallback(
+ std::move(data), std::string(sat->second.host()),
+ sat->second.port_number(), targetURI, false /*useSSL*/,
+ thisReq.fields(), thisReq.method(), cb);
}
// Forward a request for a collection URI to each known satellite BMC
@@ -721,10 +721,10 @@
std::string targetURI(thisReq.target());
std::string data = thisReq.req.body();
- client.sendDataWithCallback(data, std::string(sat.second.host()),
- sat.second.port_number(), targetURI,
- false /*useSSL*/, thisReq.fields(),
- thisReq.method(), cb);
+ client.sendDataWithCallback(
+ std::move(data), std::string(sat.second.host()),
+ sat.second.port_number(), targetURI, false /*useSSL*/,
+ thisReq.fields(), thisReq.method(), cb);
}
}
diff --git a/redfish-core/lib/event_service.hpp b/redfish-core/lib/event_service.hpp
index 5a66c97..02bb21f 100644
--- a/redfish-core/lib/event_service.hpp
+++ b/redfish-core/lib/event_service.hpp
@@ -43,8 +43,6 @@
"Task"};
#endif
-static constexpr const uint8_t maxNoOfSubscriptions = 20;
-
inline void requestRoutesEventService(App& app)
{
BMCWEB_ROUTE(app, "/redfish/v1/EventService/")
@@ -62,6 +60,9 @@
"#EventService.v1_5_0.EventService";
asyncResp->res.jsonValue["Id"] = "EventService";
asyncResp->res.jsonValue["Name"] = "Event Service";
+ asyncResp->res.jsonValue["ServerSentEventUri"] =
+ "/redfish/v1/EventService/SSE";
+
asyncResp->res.jsonValue["Subscriptions"]["@odata.id"] =
"/redfish/v1/EventService/Subscriptions";
asyncResp->res
diff --git a/redfish-core/lib/eventservice_sse.hpp b/redfish-core/lib/eventservice_sse.hpp
new file mode 100644
index 0000000..864383c
--- /dev/null
+++ b/redfish-core/lib/eventservice_sse.hpp
@@ -0,0 +1,53 @@
+#pragma once
+
+#include <app.hpp>
+#include <event_service_manager.hpp>
+
+namespace redfish
+{
+
+inline void createSubscription(crow::sse_socket::Connection& conn)
+{
+ EventServiceManager& manager =
+ EventServiceManager::getInstance(&conn.getIoContext());
+ if ((manager.getNumberOfSubscriptions() >= maxNoOfSubscriptions) ||
+ manager.getNumberOfSSESubscriptions() >= maxNoOfSSESubscriptions)
+ {
+ BMCWEB_LOG_WARNING << "Max SSE subscriptions reached";
+ conn.close("Max SSE subscriptions reached");
+ return;
+ }
+ std::shared_ptr<redfish::Subscription> subValue =
+ std::make_shared<redfish::Subscription>(conn);
+
+ // GET on this URI means, Its SSE subscriptionType.
+ subValue->subscriptionType = redfish::subscriptionTypeSSE;
+
+ subValue->protocol = "Redfish";
+ subValue->retryPolicy = "TerminateAfterRetries";
+ subValue->eventFormatType = "Event";
+
+ std::string id = manager.addSubscription(subValue, false);
+ if (id.empty())
+ {
+ conn.close("Internal Error");
+ }
+}
+
+inline void deleteSubscription(crow::sse_socket::Connection& conn)
+{
+ redfish::EventServiceManager::getInstance(&conn.getIoContext())
+ .deleteSseSubscription(conn);
+}
+
+inline void requestRoutesEventServiceSse(App& app)
+{
+ // Note, this endpoint is given the same privilege level as creating a
+ // subscription, because functionally, that's the operation being done
+ BMCWEB_ROUTE(app, "/redfish/v1/EventService/SSE")
+ .privileges(redfish::privileges::postEventDestinationCollection)
+ .serverSentEvent()
+ .onopen(createSubscription)
+ .onclose(deleteSubscription);
+}
+} // namespace redfish