Add Server-Sent-Event support

Server-Sent-Event is a standard describing how servers can initiate
data transmission towards clients once an initial client connection has
been established. Unlike websockets (which are bidirectional),
Server-Sent-Events(SSE) are unidirectional and commonly used to send
message updates or continuous data streams to a browser client.

This is base patch for adding Server-Sent-Events routing support to
bmcweb. Redfish EventService SSE style subscription uses SSE route for
sending the Events/MetricReports to client which establishes the
connection.

Tested this patch with along with EventService SSE support patches and
verified the functionalty on browser.

Tested:
 - Tested using follow-up patches on top which adds
   support for Redfish EventService SSE style subscription
   and observed events are getting sent periodically.
 - Created SSE subscription from the browser by visiting
   https://<BMC IP>/redfish/v1/EventService/SSE

Change-Id: I36956565cbba30c2007852c9471f477f6d1736e9
Signed-off-by: AppaRao Puli <apparao.puli@linux.intel.com>
Signed-off-by: P Dheeraj Srujan Kumar <p.dheeraj.srujan.kumar@intel.com>
Signed-off-by: V-Sanjana <sanjana.v@intel.com>
diff --git a/http/app.hpp b/http/app.hpp
index d3cf48c..6388d84 100644
--- a/http/app.hpp
+++ b/http/app.hpp
@@ -105,6 +105,11 @@
         router.validate();
     }
 
+    static bool isSseRoute(Request& req)
+    {
+        return Router::isSseRoute(req);
+    }
+
     void run()
     {
         validate();
diff --git a/http/http_connection.hpp b/http/http_connection.hpp
index 7b66ac8..1b85c6b 100644
--- a/http/http_connection.hpp
+++ b/http/http_connection.hpp
@@ -244,10 +244,11 @@
             self->completeRequest(thisRes);
         });
 
-        if (thisReq.isUpgrade() &&
-            boost::iequals(
-                thisReq.getHeaderValue(boost::beast::http::field::upgrade),
-                "websocket"))
+        if ((thisReq.isUpgrade() &&
+             boost::iequals(
+                 thisReq.getHeaderValue(boost::beast::http::field::upgrade),
+                 "websocket")) ||
+            (Handler::isSseRoute(*req)))
         {
             asyncResp->res.setCompleteRequestHandler(
                 [self(shared_from_this())](crow::Response& thisRes) {
diff --git a/http/http_response.hpp b/http/http_response.hpp
index 1a4ef16..06b6939 100644
--- a/http/http_response.hpp
+++ b/http/http_response.hpp
@@ -16,10 +16,18 @@
 template <typename Adaptor, typename Handler>
 class Connection;
 
+namespace sse_socket
+{
+template <typename Adaptor>
+class ConnectionImpl;
+} // namespace sse_socket
+
 struct Response
 {
     template <typename Adaptor, typename Handler>
     friend class crow::Connection;
+    template <typename Adaptor>
+    friend class crow::sse_socket::ConnectionImpl;
     using response_type =
         boost::beast::http::response<boost::beast::http::string_body>;
 
diff --git a/http/routing.hpp b/http/routing.hpp
index eb87e72..ead3af9 100644
--- a/http/routing.hpp
+++ b/http/routing.hpp
@@ -8,6 +8,7 @@
 #include "http_response.hpp"
 #include "logging.hpp"
 #include "privileges.hpp"
+#include "server_sent_event.hpp"
 #include "sessions.hpp"
 #include "utility.hpp"
 #include "utils/dbus_utils.hpp"
@@ -373,6 +374,72 @@
     std::function<void(crow::websocket::Connection&)> errorHandler;
 };
 
+class SseSocketRule : public BaseRule
+{
+    using self_t = SseSocketRule;
+
+  public:
+    explicit SseSocketRule(const std::string& ruleIn) : BaseRule(ruleIn) {}
+
+    void validate() override {}
+
+    void handle(const Request& /*req*/,
+                const std::shared_ptr<bmcweb::AsyncResp>& asyncResp,
+                const std::vector<std::string>& /*params*/) override
+    {
+        asyncResp->res.result(boost::beast::http::status::not_found);
+    }
+
+#ifndef BMCWEB_ENABLE_SSL
+    void handleUpgrade(const Request& req,
+                       const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/,
+                       boost::asio::ip::tcp::socket&& adaptor) override
+    {
+        std::shared_ptr<
+            crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>
+            myConnection = std::make_shared<
+                crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>(
+                req, std::move(adaptor), openHandler, closeHandler);
+        myConnection->start();
+    }
+#else
+    void handleUpgrade(const Request& req,
+                       const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/,
+                       boost::beast::ssl_stream<boost::asio::ip::tcp::socket>&&
+                           adaptor) override
+    {
+        std::shared_ptr<crow::sse_socket::ConnectionImpl<
+            boost::beast::ssl_stream<boost::asio::ip::tcp::socket>>>
+            myConnection = std::make_shared<crow::sse_socket::ConnectionImpl<
+                boost::beast::ssl_stream<boost::asio::ip::tcp::socket>>>(
+                req, std::move(adaptor), openHandler, closeHandler);
+        myConnection->start();
+    }
+#endif
+
+    template <typename Func>
+    self_t& onopen(Func f)
+    {
+        openHandler = f;
+        return *this;
+    }
+
+    template <typename Func>
+    self_t& onclose(Func f)
+    {
+        closeHandler = f;
+        return *this;
+    }
+
+  private:
+    std::function<void(std::shared_ptr<crow::sse_socket::Connection>&,
+                       const crow::Request&,
+                       const std::shared_ptr<bmcweb::AsyncResp>&)>
+        openHandler;
+    std::function<void(std::shared_ptr<crow::sse_socket::Connection>&)>
+        closeHandler;
+};
+
 template <typename T>
 struct RuleParameterTraits
 {
@@ -386,6 +453,21 @@
         return *p;
     }
 
+    SseSocketRule& serverSentEvent()
+    {
+        self_t* self = static_cast<self_t*>(this);
+        SseSocketRule* p = new SseSocketRule(self->rule);
+        self->ruleToUpgrade.reset(p);
+        return *p;
+    }
+
+    self_t& name(std::string_view name) noexcept
+    {
+        self_t* self = static_cast<self_t*>(this);
+        self->nameStr = name;
+        return *self;
+    }
+
     self_t& methods(boost::beast::http::verb method)
     {
         self_t* self = static_cast<self_t*>(this);
@@ -1102,6 +1184,15 @@
         return true;
     }
 
+    static bool isSseRoute(Request& req)
+    {
+        return std::any_of(sse_socket::sseRoutes.begin(),
+                           sse_socket::sseRoutes.end(),
+                           [&req](const char* sseRoute) {
+            return (req.url().encoded_path() == sseRoute);
+        });
+    }
+
     static bool
         isUserPrivileged(Request& req,
                          const std::shared_ptr<bmcweb::AsyncResp>& asyncResp,
diff --git a/http/server_sent_event.hpp b/http/server_sent_event.hpp
new file mode 100644
index 0000000..58659c8
--- /dev/null
+++ b/http/server_sent_event.hpp
@@ -0,0 +1,368 @@
+#pragma once
+#include "async_resolve.hpp"
+#include "async_resp.hpp"
+#include "http_request.hpp"
+#include "http_response.hpp"
+
+#include <boost/algorithm/string/predicate.hpp>
+#include <boost/asio/buffer.hpp>
+#include <boost/asio/steady_timer.hpp>
+#include <boost/beast/core/multi_buffer.hpp>
+#include <boost/beast/http/buffer_body.hpp>
+#include <boost/beast/websocket.hpp>
+
+#include <array>
+#include <functional>
+
+#ifdef BMCWEB_ENABLE_SSL
+#include <boost/beast/websocket/ssl.hpp>
+#endif
+
+namespace crow
+{
+
+namespace sse_socket
+{
+static constexpr const std::array<const char*, 1> sseRoutes = {
+    "/redfish/v1/EventService/SSE"};
+
+struct Connection : std::enable_shared_from_this<Connection>
+{
+  public:
+    explicit Connection(const crow::Request& reqIn) : req(reqIn) {}
+
+    Connection(const Connection&) = delete;
+    Connection(Connection&&) = delete;
+    Connection& operator=(const Connection&) = delete;
+    Connection& operator=(const Connection&&) = delete;
+    virtual ~Connection() = default;
+
+    virtual boost::asio::io_context& getIoContext() = 0;
+    virtual void sendSSEHeader() = 0;
+    virtual void completeRequest(crow::Response& thisRes) = 0;
+    virtual void close(std::string_view msg = "quit") = 0;
+    virtual void sendEvent(std::string_view id, std::string_view msg) = 0;
+
+    crow::Request req;
+};
+
+template <typename Adaptor>
+class ConnectionImpl : public Connection
+{
+  public:
+    ConnectionImpl(
+        const crow::Request& reqIn, Adaptor adaptorIn,
+        std::function<void(std::shared_ptr<Connection>&, const crow::Request&,
+                           const std::shared_ptr<bmcweb::AsyncResp>&)>
+            openHandlerIn,
+        std::function<void(std::shared_ptr<Connection>&)> closeHandlerIn) :
+        Connection(reqIn),
+        adaptor(std::move(adaptorIn)), openHandler(std::move(openHandlerIn)),
+        closeHandler(std::move(closeHandlerIn))
+    {
+        BMCWEB_LOG_DEBUG << "SseConnectionImpl: SSE constructor " << this;
+    }
+
+    ConnectionImpl(const ConnectionImpl&) = delete;
+    ConnectionImpl(const ConnectionImpl&&) = delete;
+    ConnectionImpl& operator=(const ConnectionImpl&) = delete;
+    ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
+
+    ~ConnectionImpl() override
+    {
+        BMCWEB_LOG_DEBUG << "SSE ConnectionImpl: SSE destructor " << this;
+    }
+
+    boost::asio::io_context& getIoContext() override
+    {
+        return static_cast<boost::asio::io_context&>(
+            adaptor.get_executor().context());
+    }
+
+    void start()
+    {
+        if (openHandler)
+        {
+            auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
+            std::shared_ptr<Connection> self = this->shared_from_this();
+
+            asyncResp->res.setCompleteRequestHandler(
+                [self(shared_from_this())](crow::Response& thisRes) {
+                if (thisRes.resultInt() != 200)
+                {
+                    self->completeRequest(thisRes);
+                }
+            });
+
+            openHandler(self, req, asyncResp);
+        }
+    }
+
+    void close(const std::string_view msg) override
+    {
+        BMCWEB_LOG_DEBUG << "Closing SSE connection " << this << " - " << msg;
+        boost::beast::get_lowest_layer(adaptor).close();
+
+        // send notification to handler for cleanup
+        if (closeHandler)
+        {
+            std::shared_ptr<Connection> self = shared_from_this();
+            closeHandler(self);
+        }
+    }
+
+    void sendSSEHeader() override
+    {
+        BMCWEB_LOG_DEBUG << "Starting SSE connection";
+        auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
+        using BodyType = boost::beast::http::buffer_body;
+        auto response =
+            std::make_shared<boost::beast::http::response<BodyType>>(
+                boost::beast::http::status::ok, 11);
+
+        serializer.emplace(*asyncResp->res.stringResponse);
+
+        response->set(boost::beast::http::field::content_type,
+                      "text/event-stream");
+        response->body().more = true;
+
+        boost::beast::http::async_write_header(
+            adaptor, *serializer,
+            std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
+                            shared_from_this()));
+    }
+
+    void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
+                               const boost::beast::error_code& ec,
+                               const std::size_t& /*unused*/)
+    {
+        if (ec)
+        {
+            BMCWEB_LOG_ERROR << "Error sending header" << ec;
+            close("async_write_header failed");
+            return;
+        }
+        BMCWEB_LOG_DEBUG << "SSE header sent - Connection established";
+
+        serializer.reset();
+
+        // SSE stream header sent, So let us setup monitor.
+        // Any read data on this stream will be error in case of SSE.
+        setupRead();
+    }
+
+    void setupRead()
+    {
+        std::weak_ptr<Connection> weakSelf = weak_from_this();
+
+        boost::beast::http::async_read_some(
+            adaptor, outputBuffer, *parser,
+            std::bind_front(&ConnectionImpl::setupReadCallback, this,
+                            weak_from_this()));
+    }
+
+    void setupReadCallback(const std::weak_ptr<Connection>& weakSelf,
+                           const boost::system::error_code& ec,
+                           size_t bytesRead)
+    {
+        std::shared_ptr<Connection> self = weakSelf.lock();
+        BMCWEB_LOG_DEBUG << "async_read_some: Read " << bytesRead << " bytes";
+        if (ec)
+        {
+            BMCWEB_LOG_ERROR << "Read error: " << ec;
+        }
+
+        // After establishing SSE stream, Reading data on this
+        // stream means client is disobeys the SSE protocol.
+        // Read the data to avoid buffer attacks and close connection.
+
+        self->close("Close SSE connection");
+    }
+
+    void doWrite()
+    {
+        onTimeout();
+
+        if (doingWrite)
+        {
+            return;
+        }
+        if (inputBuffer.size() == 0)
+        {
+            BMCWEB_LOG_DEBUG << "inputBuffer is empty... Bailing out";
+            return;
+        }
+        doingWrite = true;
+
+        adaptor.async_write_some(
+            inputBuffer.data(),
+            std::bind_front(&ConnectionImpl::doWriteCallback, this,
+                            shared_from_this()));
+    }
+
+    void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
+                         const boost::beast::error_code& ec,
+                         const size_t bytesTransferred)
+    {
+        doingWrite = false;
+        inputBuffer.consume(bytesTransferred);
+
+        if (ec == boost::asio::error::eof)
+        {
+            BMCWEB_LOG_ERROR << "async_write_some() SSE stream closed";
+            close("SSE stream closed");
+            return;
+        }
+
+        if (ec)
+        {
+            BMCWEB_LOG_ERROR << "async_write_some() failed: " << ec.message();
+            close("async_write_some failed");
+            return;
+        }
+        BMCWEB_LOG_DEBUG << "async_write_some() bytes transferred: "
+                         << bytesTransferred;
+
+        doWrite();
+    }
+
+    void completeRequest(crow::Response& thisRes) override
+    {
+        auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
+        asyncResp->res = std::move(thisRes);
+
+        if (asyncResp->res.body().empty() && !asyncResp->res.jsonValue.empty())
+        {
+            asyncResp->res.addHeader(boost::beast::http::field::content_type,
+                                     "application/json");
+            asyncResp->res.body() = asyncResp->res.jsonValue.dump(
+                2, ' ', true, nlohmann::json::error_handler_t::replace);
+        }
+
+        asyncResp->res.preparePayload();
+
+        serializer.emplace(*asyncResp->res.stringResponse);
+
+        boost::beast::http::async_write_some(
+            adaptor, *serializer,
+            std::bind_front(&ConnectionImpl::completeRequestCallback, this,
+                            shared_from_this()));
+    }
+
+    void completeRequestCallback(const std::shared_ptr<Connection>& /*self*/,
+                                 const boost::system::error_code& ec,
+                                 std::size_t bytesTransferred)
+    {
+        auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
+        BMCWEB_LOG_DEBUG << this << " async_write " << bytesTransferred
+                         << " bytes";
+        if (ec)
+        {
+            BMCWEB_LOG_DEBUG << this << " from async_write failed";
+            return;
+        }
+
+        BMCWEB_LOG_DEBUG << this << " Closing SSE connection - Request invalid";
+        serializer.reset();
+        close("Request invalid");
+        asyncResp->res.releaseCompleteRequestHandler();
+    }
+
+    void sendEvent(std::string_view id, std::string_view msg) override
+    {
+        if (msg.empty())
+        {
+            BMCWEB_LOG_DEBUG << "Empty data, bailing out.";
+            return;
+        }
+
+        dataFormat(id);
+
+        doWrite();
+    }
+
+    void dataFormat(std::string_view id)
+    {
+        std::string_view msg;
+        std::string rawData;
+        if (!id.empty())
+        {
+            rawData += "id: ";
+            rawData.append(id.begin(), id.end());
+            rawData += "\n";
+        }
+
+        rawData += "data: ";
+        for (char character : msg)
+        {
+            rawData += character;
+            if (character == '\n')
+            {
+                rawData += "data: ";
+            }
+        }
+        rawData += "\n\n";
+
+        boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()),
+                                 boost::asio::buffer(rawData));
+        inputBuffer.commit(rawData.size());
+    }
+
+    void onTimeout()
+    {
+        boost::asio::steady_timer timer(ioc);
+        std::weak_ptr<Connection> weakSelf = weak_from_this();
+        timer.expires_after(std::chrono::seconds(30));
+        timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
+                                         this, weak_from_this()));
+    }
+
+    void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
+                           const boost::system::error_code ec)
+    {
+        std::shared_ptr<Connection> self = weakSelf.lock();
+        if (!self)
+        {
+            BMCWEB_LOG_CRITICAL << self << " Failed to capture connection";
+            return;
+        }
+
+        if (ec == boost::asio::error::operation_aborted)
+        {
+            BMCWEB_LOG_DEBUG << "operation aborted";
+            // Canceled wait means the path succeeeded.
+            return;
+        }
+        if (ec)
+        {
+            BMCWEB_LOG_CRITICAL << self << " timer failed " << ec;
+        }
+
+        BMCWEB_LOG_WARNING << self << "Connection timed out, closing";
+
+        self->close("closing connection");
+    }
+
+  private:
+    Adaptor adaptor;
+
+    boost::beast::multi_buffer outputBuffer;
+    boost::beast::multi_buffer inputBuffer;
+
+    std::optional<boost::beast::http::response_serializer<
+        boost::beast::http::string_body>>
+        serializer;
+    boost::asio::io_context& ioc =
+        crow::connections::systemBus->get_io_context();
+    bool doingWrite = false;
+    std::optional<
+        boost::beast::http::request_parser<boost::beast::http::string_body>>
+        parser;
+
+    std::function<void(std::shared_ptr<Connection>&, const crow::Request&,
+                       const std::shared_ptr<bmcweb::AsyncResp>&)>
+        openHandler;
+    std::function<void(std::shared_ptr<Connection>&)> closeHandler;
+};
+} // namespace sse_socket
+} // namespace crow