EventService: Support for Server Sent Events(SSE)
Add support for Server Sent Events(SSE)
Filters support is not part of this commit.
Tested:
- GET on URI /redfish/v1/EventService/Subscriptions/SSE/
from chrome browser, can see all BMC Events on browser.
- Redfish validator is successful.
Change-Id: Icd10cdad20c4529f64c97b67d46f2e4a7e0c329c
Signed-off-by: AppaRao Puli <apparao.puli@linux.intel.com>
diff --git a/redfish-core/include/event_service_manager.hpp b/redfish-core/include/event_service_manager.hpp
index 7c96420..03d5dc3 100644
--- a/redfish-core/include/event_service_manager.hpp
+++ b/redfish-core/include/event_service_manager.hpp
@@ -25,6 +25,7 @@
#include <boost/container/flat_map.hpp>
#include <error_messages.hpp>
#include <http_client.hpp>
+#include <server_sent_events.hpp>
#include <utils/json_utils.hpp>
#include <cstdlib>
@@ -295,23 +296,38 @@
crow::connections::systemBus->get_io_context(), id, host, port,
path);
}
+
+ Subscription(const std::shared_ptr<crow::Request::Adaptor>& adaptor) :
+ eventSeqNum(1)
+ {
+ sseConn = std::make_shared<crow::ServerSentEvents>(adaptor);
+ }
+
~Subscription()
{}
void sendEvent(const std::string& msg)
{
- std::vector<std::pair<std::string, std::string>> reqHeaders;
- for (const auto& header : httpHeaders)
+ if (conn != nullptr)
{
- for (const auto& item : header.items())
+ std::vector<std::pair<std::string, std::string>> reqHeaders;
+ for (const auto& header : httpHeaders)
{
- std::string key = item.key();
- std::string val = item.value();
- reqHeaders.emplace_back(std::pair(key, val));
+ for (const auto& item : header.items())
+ {
+ std::string key = item.key();
+ std::string val = item.value();
+ reqHeaders.emplace_back(std::pair(key, val));
+ }
}
+ conn->setHeaders(reqHeaders);
+ conn->sendData(msg);
}
- conn->setHeaders(reqHeaders);
- conn->sendData(msg);
+
+ if (sseConn != nullptr)
+ {
+ sseConn->sendData(eventSeqNum, msg);
+ }
}
void sendTestEventLog()
@@ -462,7 +478,8 @@
std::string port;
std::string path;
std::string uriProto;
- std::shared_ptr<crow::HttpClient> conn;
+ std::shared_ptr<crow::HttpClient> conn = nullptr;
+ std::shared_ptr<crow::ServerSentEvents> sseConn = nullptr;
};
static constexpr const bool defaultEnabledState = true;
@@ -572,6 +589,22 @@
BMCWEB_LOG_DEBUG << "Invalid subscription Protocol exist.";
continue;
}
+
+ std::string subscriptionType;
+ if (!json_util::getValueFromJsonObject(jsonObj, "SubscriptionType",
+ subscriptionType))
+ {
+ subscriptionType = defaulSubscriptionType;
+ }
+ // SSE connections are initiated from client
+ // and can't be re-established from server.
+ if (subscriptionType == "SSE")
+ {
+ BMCWEB_LOG_DEBUG
+ << "The subscription type is SSE, so skipping.";
+ continue;
+ }
+
std::string destination;
if (!json_util::getValueFromJsonObject(jsonObj, "Destination",
destination))
@@ -597,6 +630,7 @@
subValue->destinationUrl = destination;
subValue->protocol = protocol;
+ subValue->subscriptionType = subscriptionType;
if (!json_util::getValueFromJsonObject(
jsonObj, "DeliveryRetryPolicy", subValue->retryPolicy))
{
@@ -607,12 +641,6 @@
{
subValue->eventFormatType = defaulEventFormatType;
}
- if (!json_util::getValueFromJsonObject(jsonObj, "SubscriptionType",
- subValue->subscriptionType))
- {
- subValue->subscriptionType = defaulSubscriptionType;
- }
-
json_util::getValueFromJsonObject(jsonObj, "Context",
subValue->customText);
json_util::getValueFromJsonObject(jsonObj, "MessageIds",
@@ -649,9 +677,17 @@
for (const auto& it : subscriptionsMap)
{
- nlohmann::json entry;
std::shared_ptr<Subscription> subValue = it.second;
+ // Don't preserve SSE connections. Its initiated from
+ // client side and can't be re-established from server.
+ if (subValue->subscriptionType == "SSE")
+ {
+ BMCWEB_LOG_DEBUG
+ << "The subscription type is SSE, so skipping.";
+ continue;
+ }
+ nlohmann::json entry;
entry["Context"] = subValue->customText;
entry["DeliveryRetryPolicy"] = subValue->retryPolicy;
entry["Destination"] = subValue->destinationUrl;
diff --git a/redfish-core/include/redfish.hpp b/redfish-core/include/redfish.hpp
index 303519b..959c643 100644
--- a/redfish-core/include/redfish.hpp
+++ b/redfish-core/include/redfish.hpp
@@ -189,6 +189,7 @@
nodes.emplace_back(std::make_unique<TaskCollection>(app));
nodes.emplace_back(std::make_unique<Task>(app));
nodes.emplace_back(std::make_unique<EventService>(app));
+ nodes.emplace_back(std::make_unique<EventServiceSSE>(app));
nodes.emplace_back(std::make_unique<EventDestinationCollection>(app));
nodes.emplace_back(std::make_unique<EventDestination>(app));
nodes.emplace_back(std::make_unique<SubmitTestEvent>(app));
diff --git a/redfish-core/include/server_sent_events.hpp b/redfish-core/include/server_sent_events.hpp
new file mode 100644
index 0000000..1c4d2a5
--- /dev/null
+++ b/redfish-core/include/server_sent_events.hpp
@@ -0,0 +1,290 @@
+
+/*
+// Copyright (c) 2020 Intel Corporation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+*/
+#pragma once
+#include "node.hpp"
+
+#include <boost/asio/strand.hpp>
+#include <boost/beast/core.hpp>
+#include <boost/beast/http.hpp>
+#include <boost/beast/version.hpp>
+
+#include <cstdlib>
+#include <functional>
+#include <iostream>
+#include <memory>
+#include <queue>
+#include <string>
+
+namespace crow
+{
+
+static constexpr uint8_t maxReqQueueSize = 50;
+
+enum class SseConnState
+{
+ startInit,
+ initInProgress,
+ initialized,
+ initFailed,
+ sendInProgress,
+ sendFailed,
+ idle,
+ suspended,
+ closed
+};
+
+class ServerSentEvents : public std::enable_shared_from_this<ServerSentEvents>
+{
+ private:
+ std::shared_ptr<crow::Request::Adaptor> sseConn;
+ std::queue<std::pair<uint64_t, std::string>> requestDataQueue;
+ std::string outBuffer;
+ SseConnState state;
+ int retryCount;
+ int maxRetryAttempts;
+
+ void sendEvent(const std::string& id, const std::string& msg)
+ {
+ if (msg.empty())
+ {
+ BMCWEB_LOG_DEBUG << "Empty data, bailing out.";
+ return;
+ }
+
+ if (state == SseConnState::sendInProgress)
+ {
+ return;
+ }
+ state = SseConnState::sendInProgress;
+
+ if (!id.empty())
+ {
+ outBuffer += "id: ";
+ outBuffer.append(id.begin(), id.end());
+ outBuffer += "\n";
+ }
+
+ outBuffer += "data: ";
+ for (char character : msg)
+ {
+ outBuffer += character;
+ if (character == '\n')
+ {
+ outBuffer += "data: ";
+ }
+ }
+ outBuffer += "\n\n";
+
+ doWrite();
+ }
+
+ void doWrite()
+ {
+ if (outBuffer.empty())
+ {
+ BMCWEB_LOG_DEBUG << "All data sent successfully.";
+ // Send is successful, Lets remove data from queue
+ // check for next request data in queue.
+ requestDataQueue.pop();
+ state = SseConnState::idle;
+ checkQueue();
+ return;
+ }
+
+ sseConn->async_write_some(
+ boost::asio::buffer(outBuffer.data(), outBuffer.size()),
+ [self(shared_from_this())](boost::beast::error_code ec,
+ const std::size_t& bytesTransferred) {
+ self->outBuffer.erase(0, bytesTransferred);
+
+ if (ec == boost::asio::error::eof)
+ {
+ // Send is successful, Lets remove data from queue
+ // check for next request data in queue.
+ self->requestDataQueue.pop();
+ self->state = SseConnState::idle;
+ self->checkQueue();
+ return;
+ }
+
+ if (ec)
+ {
+ BMCWEB_LOG_ERROR << "async_write_some() failed: "
+ << ec.message();
+ self->state = SseConnState::sendFailed;
+ self->checkQueue();
+ return;
+ }
+ BMCWEB_LOG_DEBUG << "async_write_some() bytes transferred: "
+ << bytesTransferred;
+ boost::ignore_unused(bytesTransferred);
+
+ self->doWrite();
+ });
+ }
+
+ void startSSE()
+ {
+ if (state == SseConnState::initInProgress)
+ {
+ return;
+ }
+ state = SseConnState::initInProgress;
+
+ BMCWEB_LOG_DEBUG << "starting SSE connection ";
+ using BodyType = boost::beast::http::buffer_body;
+ auto response =
+ std::make_shared<boost::beast::http::response<BodyType>>(
+ boost::beast::http::status::ok, 11);
+ auto serializer =
+ std::make_shared<boost::beast::http::response_serializer<BodyType>>(
+ *response);
+
+ // TODO: Add hostname in http header.
+ response->set(boost::beast::http::field::server, "iBMC");
+ response->set(boost::beast::http::field::content_type,
+ "text/event-stream");
+ response->body().data = nullptr;
+ response->body().size = 0;
+ response->body().more = true;
+
+ boost::beast::http::async_write_header(
+ *sseConn, *serializer,
+ [this, response, serializer](const boost::beast::error_code& ec,
+ const std::size_t& bytesTransferred) {
+ if (ec)
+ {
+ BMCWEB_LOG_ERROR << "Error sending header" << ec;
+ state = SseConnState::initFailed;
+ checkQueue();
+ return;
+ }
+
+ BMCWEB_LOG_DEBUG << "startSSE Header sent.";
+ state = SseConnState::initialized;
+ checkQueue();
+ });
+ }
+
+ void checkQueue(const bool newRecord = false)
+ {
+ if (requestDataQueue.empty())
+ {
+ BMCWEB_LOG_DEBUG << "requestDataQueue is empty\n";
+ return;
+ }
+
+ if (retryCount >= maxRetryAttempts)
+ {
+ BMCWEB_LOG_ERROR << "Maximum number of retries is reached.";
+
+ // Clear queue.
+ while (!requestDataQueue.empty())
+ {
+ requestDataQueue.pop();
+ }
+
+ // TODO: Take 'DeliveryRetryPolicy' action.
+ // For now, doing 'SuspendRetries' action.
+ state = SseConnState::suspended;
+ return;
+ }
+
+ if ((state == SseConnState::initFailed) ||
+ (state == SseConnState::sendFailed))
+ {
+ if (newRecord)
+ {
+ // We are already running async wait and retry.
+ // Since record is added to queue, it gets the
+ // turn in FIFO.
+ return;
+ }
+
+ retryCount++;
+ // TODO: Perform async wait for retryTimeoutInterval before proceed.
+ }
+ else
+ {
+ // reset retry count.
+ retryCount = 0;
+ }
+
+ switch (state)
+ {
+ case SseConnState::initInProgress:
+ case SseConnState::sendInProgress:
+ case SseConnState::suspended:
+ // do nothing
+ break;
+ case SseConnState::initFailed:
+ {
+ startSSE();
+ break;
+ }
+ case SseConnState::initialized:
+ case SseConnState::idle:
+ case SseConnState::sendFailed:
+ {
+ std::pair<uint64_t, std::string> reqData =
+ requestDataQueue.front();
+ sendEvent(std::to_string(reqData.first), reqData.second);
+ break;
+ }
+ default:
+ break;
+ }
+
+ return;
+ }
+
+ public:
+ ServerSentEvents(const ServerSentEvents&) = delete;
+ ServerSentEvents& operator=(const ServerSentEvents&) = delete;
+ ServerSentEvents(ServerSentEvents&&) = delete;
+ ServerSentEvents& operator=(ServerSentEvents&&) = delete;
+
+ ServerSentEvents(const std::shared_ptr<crow::Request::Adaptor>& adaptor) :
+ sseConn(std::move(adaptor)), state(SseConnState::startInit),
+ retryCount(0), maxRetryAttempts(5)
+ {
+ startSSE();
+ }
+
+ ~ServerSentEvents()
+ {}
+
+ void sendData(const uint64_t& id, const std::string& data)
+ {
+ if (state == SseConnState::suspended)
+ {
+ return;
+ }
+
+ if (requestDataQueue.size() <= maxReqQueueSize)
+ {
+ requestDataQueue.push(std::pair(id, data));
+ checkQueue(true);
+ }
+ else
+ {
+ BMCWEB_LOG_ERROR << "Request queue is full. So ignoring data.";
+ }
+ }
+};
+
+} // namespace crow
diff --git a/redfish-core/lib/event_service.hpp b/redfish-core/lib/event_service.hpp
index ba1ea19..adb6238 100644
--- a/redfish-core/lib/event_service.hpp
+++ b/redfish-core/lib/event_service.hpp
@@ -398,6 +398,66 @@
}
};
+class EventServiceSSE : public Node
+{
+ public:
+ EventServiceSSE(CrowApp& app) :
+ Node(app, "/redfish/v1/EventService/Subscriptions/SSE/")
+ {
+ entityPrivileges = {
+ {boost::beast::http::verb::get, {{"ConfigureManager"}}},
+ {boost::beast::http::verb::head, {{"ConfigureManager"}}},
+ {boost::beast::http::verb::patch, {{"ConfigureManager"}}},
+ {boost::beast::http::verb::put, {{"ConfigureManager"}}},
+ {boost::beast::http::verb::delete_, {{"ConfigureManager"}}},
+ {boost::beast::http::verb::post, {{"ConfigureManager"}}}};
+ }
+
+ private:
+ void doGet(crow::Response& res, const crow::Request& req,
+ const std::vector<std::string>& params) override
+ {
+ if (EventServiceManager::getInstance().getNumberOfSubscriptions() >=
+ maxNoOfSubscriptions)
+ {
+ messages::eventSubscriptionLimitExceeded(res);
+ res.end();
+ return;
+ }
+
+ std::shared_ptr<crow::Request::Adaptor> sseConn =
+ std::make_shared<crow::Request::Adaptor>(std::move(req.socket()));
+ std::shared_ptr<Subscription> subValue =
+ std::make_shared<Subscription>(sseConn);
+
+ // GET on this URI means, Its SSE subscriptionType.
+ subValue->subscriptionType = "SSE";
+ subValue->protocol = "Redfish";
+
+ char* filters = req.urlParams.get("$filter");
+ if (filters == nullptr)
+ {
+ subValue->eventFormatType = "Event";
+ subValue->retryPolicy = "TerminateAfterRetries";
+ }
+ else
+ {
+ // TODO: Need to read this from query params.
+ subValue->eventFormatType = "Event";
+ subValue->retryPolicy = "TerminateAfterRetries";
+ }
+
+ std::string id =
+ EventServiceManager::getInstance().addSubscription(subValue, false);
+ if (id.empty())
+ {
+ messages::internalError(res);
+ res.end();
+ return;
+ }
+ }
+};
+
class EventDestination : public Node
{
public: