Add Server-Sent-Event support
Server-Sent-Event is a standard describing how servers can initiate
data transmission towards clients once an initial client connection has
been established. Unlike websockets (which are bidirectional),
Server-Sent-Events(SSE) are unidirectional and commonly used to send
message updates or continuous data streams to a browser client.
This is base patch for adding Server-Sent-Events routing support to
bmcweb. Redfish EventService SSE style subscription uses SSE route for
sending the Events/MetricReports to client which establishes the
connection.
Tested this patch with along with EventService SSE support patches and
verified the functionalty on browser.
Tested:
- Tested using follow-up patches on top which adds
support for Redfish EventService SSE style subscription
and observed events are getting sent periodically.
- Created SSE subscription from the browser by visiting
https://<BMC IP>/redfish/v1/EventService/SSE
Change-Id: I36956565cbba30c2007852c9471f477f6d1736e9
Signed-off-by: AppaRao Puli <apparao.puli@linux.intel.com>
Signed-off-by: P Dheeraj Srujan Kumar <p.dheeraj.srujan.kumar@intel.com>
Signed-off-by: V-Sanjana <sanjana.v@intel.com>
diff --git a/http/app.hpp b/http/app.hpp
index d3cf48c..6388d84 100644
--- a/http/app.hpp
+++ b/http/app.hpp
@@ -105,6 +105,11 @@
router.validate();
}
+ static bool isSseRoute(Request& req)
+ {
+ return Router::isSseRoute(req);
+ }
+
void run()
{
validate();
diff --git a/http/http_connection.hpp b/http/http_connection.hpp
index 7b66ac8..1b85c6b 100644
--- a/http/http_connection.hpp
+++ b/http/http_connection.hpp
@@ -244,10 +244,11 @@
self->completeRequest(thisRes);
});
- if (thisReq.isUpgrade() &&
- boost::iequals(
- thisReq.getHeaderValue(boost::beast::http::field::upgrade),
- "websocket"))
+ if ((thisReq.isUpgrade() &&
+ boost::iequals(
+ thisReq.getHeaderValue(boost::beast::http::field::upgrade),
+ "websocket")) ||
+ (Handler::isSseRoute(*req)))
{
asyncResp->res.setCompleteRequestHandler(
[self(shared_from_this())](crow::Response& thisRes) {
diff --git a/http/http_response.hpp b/http/http_response.hpp
index 1a4ef16..06b6939 100644
--- a/http/http_response.hpp
+++ b/http/http_response.hpp
@@ -16,10 +16,18 @@
template <typename Adaptor, typename Handler>
class Connection;
+namespace sse_socket
+{
+template <typename Adaptor>
+class ConnectionImpl;
+} // namespace sse_socket
+
struct Response
{
template <typename Adaptor, typename Handler>
friend class crow::Connection;
+ template <typename Adaptor>
+ friend class crow::sse_socket::ConnectionImpl;
using response_type =
boost::beast::http::response<boost::beast::http::string_body>;
diff --git a/http/routing.hpp b/http/routing.hpp
index eb87e72..ead3af9 100644
--- a/http/routing.hpp
+++ b/http/routing.hpp
@@ -8,6 +8,7 @@
#include "http_response.hpp"
#include "logging.hpp"
#include "privileges.hpp"
+#include "server_sent_event.hpp"
#include "sessions.hpp"
#include "utility.hpp"
#include "utils/dbus_utils.hpp"
@@ -373,6 +374,72 @@
std::function<void(crow::websocket::Connection&)> errorHandler;
};
+class SseSocketRule : public BaseRule
+{
+ using self_t = SseSocketRule;
+
+ public:
+ explicit SseSocketRule(const std::string& ruleIn) : BaseRule(ruleIn) {}
+
+ void validate() override {}
+
+ void handle(const Request& /*req*/,
+ const std::shared_ptr<bmcweb::AsyncResp>& asyncResp,
+ const std::vector<std::string>& /*params*/) override
+ {
+ asyncResp->res.result(boost::beast::http::status::not_found);
+ }
+
+#ifndef BMCWEB_ENABLE_SSL
+ void handleUpgrade(const Request& req,
+ const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/,
+ boost::asio::ip::tcp::socket&& adaptor) override
+ {
+ std::shared_ptr<
+ crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>
+ myConnection = std::make_shared<
+ crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>(
+ req, std::move(adaptor), openHandler, closeHandler);
+ myConnection->start();
+ }
+#else
+ void handleUpgrade(const Request& req,
+ const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/,
+ boost::beast::ssl_stream<boost::asio::ip::tcp::socket>&&
+ adaptor) override
+ {
+ std::shared_ptr<crow::sse_socket::ConnectionImpl<
+ boost::beast::ssl_stream<boost::asio::ip::tcp::socket>>>
+ myConnection = std::make_shared<crow::sse_socket::ConnectionImpl<
+ boost::beast::ssl_stream<boost::asio::ip::tcp::socket>>>(
+ req, std::move(adaptor), openHandler, closeHandler);
+ myConnection->start();
+ }
+#endif
+
+ template <typename Func>
+ self_t& onopen(Func f)
+ {
+ openHandler = f;
+ return *this;
+ }
+
+ template <typename Func>
+ self_t& onclose(Func f)
+ {
+ closeHandler = f;
+ return *this;
+ }
+
+ private:
+ std::function<void(std::shared_ptr<crow::sse_socket::Connection>&,
+ const crow::Request&,
+ const std::shared_ptr<bmcweb::AsyncResp>&)>
+ openHandler;
+ std::function<void(std::shared_ptr<crow::sse_socket::Connection>&)>
+ closeHandler;
+};
+
template <typename T>
struct RuleParameterTraits
{
@@ -386,6 +453,21 @@
return *p;
}
+ SseSocketRule& serverSentEvent()
+ {
+ self_t* self = static_cast<self_t*>(this);
+ SseSocketRule* p = new SseSocketRule(self->rule);
+ self->ruleToUpgrade.reset(p);
+ return *p;
+ }
+
+ self_t& name(std::string_view name) noexcept
+ {
+ self_t* self = static_cast<self_t*>(this);
+ self->nameStr = name;
+ return *self;
+ }
+
self_t& methods(boost::beast::http::verb method)
{
self_t* self = static_cast<self_t*>(this);
@@ -1102,6 +1184,15 @@
return true;
}
+ static bool isSseRoute(Request& req)
+ {
+ return std::any_of(sse_socket::sseRoutes.begin(),
+ sse_socket::sseRoutes.end(),
+ [&req](const char* sseRoute) {
+ return (req.url().encoded_path() == sseRoute);
+ });
+ }
+
static bool
isUserPrivileged(Request& req,
const std::shared_ptr<bmcweb::AsyncResp>& asyncResp,
diff --git a/http/server_sent_event.hpp b/http/server_sent_event.hpp
new file mode 100644
index 0000000..58659c8
--- /dev/null
+++ b/http/server_sent_event.hpp
@@ -0,0 +1,368 @@
+#pragma once
+#include "async_resolve.hpp"
+#include "async_resp.hpp"
+#include "http_request.hpp"
+#include "http_response.hpp"
+
+#include <boost/algorithm/string/predicate.hpp>
+#include <boost/asio/buffer.hpp>
+#include <boost/asio/steady_timer.hpp>
+#include <boost/beast/core/multi_buffer.hpp>
+#include <boost/beast/http/buffer_body.hpp>
+#include <boost/beast/websocket.hpp>
+
+#include <array>
+#include <functional>
+
+#ifdef BMCWEB_ENABLE_SSL
+#include <boost/beast/websocket/ssl.hpp>
+#endif
+
+namespace crow
+{
+
+namespace sse_socket
+{
+static constexpr const std::array<const char*, 1> sseRoutes = {
+ "/redfish/v1/EventService/SSE"};
+
+struct Connection : std::enable_shared_from_this<Connection>
+{
+ public:
+ explicit Connection(const crow::Request& reqIn) : req(reqIn) {}
+
+ Connection(const Connection&) = delete;
+ Connection(Connection&&) = delete;
+ Connection& operator=(const Connection&) = delete;
+ Connection& operator=(const Connection&&) = delete;
+ virtual ~Connection() = default;
+
+ virtual boost::asio::io_context& getIoContext() = 0;
+ virtual void sendSSEHeader() = 0;
+ virtual void completeRequest(crow::Response& thisRes) = 0;
+ virtual void close(std::string_view msg = "quit") = 0;
+ virtual void sendEvent(std::string_view id, std::string_view msg) = 0;
+
+ crow::Request req;
+};
+
+template <typename Adaptor>
+class ConnectionImpl : public Connection
+{
+ public:
+ ConnectionImpl(
+ const crow::Request& reqIn, Adaptor adaptorIn,
+ std::function<void(std::shared_ptr<Connection>&, const crow::Request&,
+ const std::shared_ptr<bmcweb::AsyncResp>&)>
+ openHandlerIn,
+ std::function<void(std::shared_ptr<Connection>&)> closeHandlerIn) :
+ Connection(reqIn),
+ adaptor(std::move(adaptorIn)), openHandler(std::move(openHandlerIn)),
+ closeHandler(std::move(closeHandlerIn))
+ {
+ BMCWEB_LOG_DEBUG << "SseConnectionImpl: SSE constructor " << this;
+ }
+
+ ConnectionImpl(const ConnectionImpl&) = delete;
+ ConnectionImpl(const ConnectionImpl&&) = delete;
+ ConnectionImpl& operator=(const ConnectionImpl&) = delete;
+ ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
+
+ ~ConnectionImpl() override
+ {
+ BMCWEB_LOG_DEBUG << "SSE ConnectionImpl: SSE destructor " << this;
+ }
+
+ boost::asio::io_context& getIoContext() override
+ {
+ return static_cast<boost::asio::io_context&>(
+ adaptor.get_executor().context());
+ }
+
+ void start()
+ {
+ if (openHandler)
+ {
+ auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
+ std::shared_ptr<Connection> self = this->shared_from_this();
+
+ asyncResp->res.setCompleteRequestHandler(
+ [self(shared_from_this())](crow::Response& thisRes) {
+ if (thisRes.resultInt() != 200)
+ {
+ self->completeRequest(thisRes);
+ }
+ });
+
+ openHandler(self, req, asyncResp);
+ }
+ }
+
+ void close(const std::string_view msg) override
+ {
+ BMCWEB_LOG_DEBUG << "Closing SSE connection " << this << " - " << msg;
+ boost::beast::get_lowest_layer(adaptor).close();
+
+ // send notification to handler for cleanup
+ if (closeHandler)
+ {
+ std::shared_ptr<Connection> self = shared_from_this();
+ closeHandler(self);
+ }
+ }
+
+ void sendSSEHeader() override
+ {
+ BMCWEB_LOG_DEBUG << "Starting SSE connection";
+ auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
+ using BodyType = boost::beast::http::buffer_body;
+ auto response =
+ std::make_shared<boost::beast::http::response<BodyType>>(
+ boost::beast::http::status::ok, 11);
+
+ serializer.emplace(*asyncResp->res.stringResponse);
+
+ response->set(boost::beast::http::field::content_type,
+ "text/event-stream");
+ response->body().more = true;
+
+ boost::beast::http::async_write_header(
+ adaptor, *serializer,
+ std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
+ shared_from_this()));
+ }
+
+ void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
+ const boost::beast::error_code& ec,
+ const std::size_t& /*unused*/)
+ {
+ if (ec)
+ {
+ BMCWEB_LOG_ERROR << "Error sending header" << ec;
+ close("async_write_header failed");
+ return;
+ }
+ BMCWEB_LOG_DEBUG << "SSE header sent - Connection established";
+
+ serializer.reset();
+
+ // SSE stream header sent, So let us setup monitor.
+ // Any read data on this stream will be error in case of SSE.
+ setupRead();
+ }
+
+ void setupRead()
+ {
+ std::weak_ptr<Connection> weakSelf = weak_from_this();
+
+ boost::beast::http::async_read_some(
+ adaptor, outputBuffer, *parser,
+ std::bind_front(&ConnectionImpl::setupReadCallback, this,
+ weak_from_this()));
+ }
+
+ void setupReadCallback(const std::weak_ptr<Connection>& weakSelf,
+ const boost::system::error_code& ec,
+ size_t bytesRead)
+ {
+ std::shared_ptr<Connection> self = weakSelf.lock();
+ BMCWEB_LOG_DEBUG << "async_read_some: Read " << bytesRead << " bytes";
+ if (ec)
+ {
+ BMCWEB_LOG_ERROR << "Read error: " << ec;
+ }
+
+ // After establishing SSE stream, Reading data on this
+ // stream means client is disobeys the SSE protocol.
+ // Read the data to avoid buffer attacks and close connection.
+
+ self->close("Close SSE connection");
+ }
+
+ void doWrite()
+ {
+ onTimeout();
+
+ if (doingWrite)
+ {
+ return;
+ }
+ if (inputBuffer.size() == 0)
+ {
+ BMCWEB_LOG_DEBUG << "inputBuffer is empty... Bailing out";
+ return;
+ }
+ doingWrite = true;
+
+ adaptor.async_write_some(
+ inputBuffer.data(),
+ std::bind_front(&ConnectionImpl::doWriteCallback, this,
+ shared_from_this()));
+ }
+
+ void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
+ const boost::beast::error_code& ec,
+ const size_t bytesTransferred)
+ {
+ doingWrite = false;
+ inputBuffer.consume(bytesTransferred);
+
+ if (ec == boost::asio::error::eof)
+ {
+ BMCWEB_LOG_ERROR << "async_write_some() SSE stream closed";
+ close("SSE stream closed");
+ return;
+ }
+
+ if (ec)
+ {
+ BMCWEB_LOG_ERROR << "async_write_some() failed: " << ec.message();
+ close("async_write_some failed");
+ return;
+ }
+ BMCWEB_LOG_DEBUG << "async_write_some() bytes transferred: "
+ << bytesTransferred;
+
+ doWrite();
+ }
+
+ void completeRequest(crow::Response& thisRes) override
+ {
+ auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
+ asyncResp->res = std::move(thisRes);
+
+ if (asyncResp->res.body().empty() && !asyncResp->res.jsonValue.empty())
+ {
+ asyncResp->res.addHeader(boost::beast::http::field::content_type,
+ "application/json");
+ asyncResp->res.body() = asyncResp->res.jsonValue.dump(
+ 2, ' ', true, nlohmann::json::error_handler_t::replace);
+ }
+
+ asyncResp->res.preparePayload();
+
+ serializer.emplace(*asyncResp->res.stringResponse);
+
+ boost::beast::http::async_write_some(
+ adaptor, *serializer,
+ std::bind_front(&ConnectionImpl::completeRequestCallback, this,
+ shared_from_this()));
+ }
+
+ void completeRequestCallback(const std::shared_ptr<Connection>& /*self*/,
+ const boost::system::error_code& ec,
+ std::size_t bytesTransferred)
+ {
+ auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
+ BMCWEB_LOG_DEBUG << this << " async_write " << bytesTransferred
+ << " bytes";
+ if (ec)
+ {
+ BMCWEB_LOG_DEBUG << this << " from async_write failed";
+ return;
+ }
+
+ BMCWEB_LOG_DEBUG << this << " Closing SSE connection - Request invalid";
+ serializer.reset();
+ close("Request invalid");
+ asyncResp->res.releaseCompleteRequestHandler();
+ }
+
+ void sendEvent(std::string_view id, std::string_view msg) override
+ {
+ if (msg.empty())
+ {
+ BMCWEB_LOG_DEBUG << "Empty data, bailing out.";
+ return;
+ }
+
+ dataFormat(id);
+
+ doWrite();
+ }
+
+ void dataFormat(std::string_view id)
+ {
+ std::string_view msg;
+ std::string rawData;
+ if (!id.empty())
+ {
+ rawData += "id: ";
+ rawData.append(id.begin(), id.end());
+ rawData += "\n";
+ }
+
+ rawData += "data: ";
+ for (char character : msg)
+ {
+ rawData += character;
+ if (character == '\n')
+ {
+ rawData += "data: ";
+ }
+ }
+ rawData += "\n\n";
+
+ boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()),
+ boost::asio::buffer(rawData));
+ inputBuffer.commit(rawData.size());
+ }
+
+ void onTimeout()
+ {
+ boost::asio::steady_timer timer(ioc);
+ std::weak_ptr<Connection> weakSelf = weak_from_this();
+ timer.expires_after(std::chrono::seconds(30));
+ timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
+ this, weak_from_this()));
+ }
+
+ void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
+ const boost::system::error_code ec)
+ {
+ std::shared_ptr<Connection> self = weakSelf.lock();
+ if (!self)
+ {
+ BMCWEB_LOG_CRITICAL << self << " Failed to capture connection";
+ return;
+ }
+
+ if (ec == boost::asio::error::operation_aborted)
+ {
+ BMCWEB_LOG_DEBUG << "operation aborted";
+ // Canceled wait means the path succeeeded.
+ return;
+ }
+ if (ec)
+ {
+ BMCWEB_LOG_CRITICAL << self << " timer failed " << ec;
+ }
+
+ BMCWEB_LOG_WARNING << self << "Connection timed out, closing";
+
+ self->close("closing connection");
+ }
+
+ private:
+ Adaptor adaptor;
+
+ boost::beast::multi_buffer outputBuffer;
+ boost::beast::multi_buffer inputBuffer;
+
+ std::optional<boost::beast::http::response_serializer<
+ boost::beast::http::string_body>>
+ serializer;
+ boost::asio::io_context& ioc =
+ crow::connections::systemBus->get_io_context();
+ bool doingWrite = false;
+ std::optional<
+ boost::beast::http::request_parser<boost::beast::http::string_body>>
+ parser;
+
+ std::function<void(std::shared_ptr<Connection>&, const crow::Request&,
+ const std::shared_ptr<bmcweb::AsyncResp>&)>
+ openHandler;
+ std::function<void(std::shared_ptr<Connection>&)> closeHandler;
+};
+} // namespace sse_socket
+} // namespace crow