EventService: Support for Server Sent Events(SSE)

Add support for Server Sent Events(SSE)
Filters support is not part of this commit.

Tested:
 - GET on URI /redfish/v1/EventService/Subscriptions/SSE/
   from chrome browser, can see all BMC Events on browser.
 - Redfish validator is successful.

Change-Id: Icd10cdad20c4529f64c97b67d46f2e4a7e0c329c
Signed-off-by: AppaRao Puli <apparao.puli@linux.intel.com>
diff --git a/redfish-core/include/event_service_manager.hpp b/redfish-core/include/event_service_manager.hpp
index 7c96420..03d5dc3 100644
--- a/redfish-core/include/event_service_manager.hpp
+++ b/redfish-core/include/event_service_manager.hpp
@@ -25,6 +25,7 @@
 #include <boost/container/flat_map.hpp>
 #include <error_messages.hpp>
 #include <http_client.hpp>
+#include <server_sent_events.hpp>
 #include <utils/json_utils.hpp>
 
 #include <cstdlib>
@@ -295,23 +296,38 @@
             crow::connections::systemBus->get_io_context(), id, host, port,
             path);
     }
+
+    Subscription(const std::shared_ptr<crow::Request::Adaptor>& adaptor) :
+        eventSeqNum(1)
+    {
+        sseConn = std::make_shared<crow::ServerSentEvents>(adaptor);
+    }
+
     ~Subscription()
     {}
 
     void sendEvent(const std::string& msg)
     {
-        std::vector<std::pair<std::string, std::string>> reqHeaders;
-        for (const auto& header : httpHeaders)
+        if (conn != nullptr)
         {
-            for (const auto& item : header.items())
+            std::vector<std::pair<std::string, std::string>> reqHeaders;
+            for (const auto& header : httpHeaders)
             {
-                std::string key = item.key();
-                std::string val = item.value();
-                reqHeaders.emplace_back(std::pair(key, val));
+                for (const auto& item : header.items())
+                {
+                    std::string key = item.key();
+                    std::string val = item.value();
+                    reqHeaders.emplace_back(std::pair(key, val));
+                }
             }
+            conn->setHeaders(reqHeaders);
+            conn->sendData(msg);
         }
-        conn->setHeaders(reqHeaders);
-        conn->sendData(msg);
+
+        if (sseConn != nullptr)
+        {
+            sseConn->sendData(eventSeqNum, msg);
+        }
     }
 
     void sendTestEventLog()
@@ -462,7 +478,8 @@
     std::string port;
     std::string path;
     std::string uriProto;
-    std::shared_ptr<crow::HttpClient> conn;
+    std::shared_ptr<crow::HttpClient> conn = nullptr;
+    std::shared_ptr<crow::ServerSentEvents> sseConn = nullptr;
 };
 
 static constexpr const bool defaultEnabledState = true;
@@ -572,6 +589,22 @@
                 BMCWEB_LOG_DEBUG << "Invalid subscription Protocol exist.";
                 continue;
             }
+
+            std::string subscriptionType;
+            if (!json_util::getValueFromJsonObject(jsonObj, "SubscriptionType",
+                                                   subscriptionType))
+            {
+                subscriptionType = defaulSubscriptionType;
+            }
+            // SSE connections are initiated from client
+            // and can't be re-established from server.
+            if (subscriptionType == "SSE")
+            {
+                BMCWEB_LOG_DEBUG
+                    << "The subscription type is SSE, so skipping.";
+                continue;
+            }
+
             std::string destination;
             if (!json_util::getValueFromJsonObject(jsonObj, "Destination",
                                                    destination))
@@ -597,6 +630,7 @@
 
             subValue->destinationUrl = destination;
             subValue->protocol = protocol;
+            subValue->subscriptionType = subscriptionType;
             if (!json_util::getValueFromJsonObject(
                     jsonObj, "DeliveryRetryPolicy", subValue->retryPolicy))
             {
@@ -607,12 +641,6 @@
             {
                 subValue->eventFormatType = defaulEventFormatType;
             }
-            if (!json_util::getValueFromJsonObject(jsonObj, "SubscriptionType",
-                                                   subValue->subscriptionType))
-            {
-                subValue->subscriptionType = defaulSubscriptionType;
-            }
-
             json_util::getValueFromJsonObject(jsonObj, "Context",
                                               subValue->customText);
             json_util::getValueFromJsonObject(jsonObj, "MessageIds",
@@ -649,9 +677,17 @@
 
         for (const auto& it : subscriptionsMap)
         {
-            nlohmann::json entry;
             std::shared_ptr<Subscription> subValue = it.second;
+            // Don't preserve SSE connections. Its initiated from
+            // client side and can't be re-established from server.
+            if (subValue->subscriptionType == "SSE")
+            {
+                BMCWEB_LOG_DEBUG
+                    << "The subscription type is SSE, so skipping.";
+                continue;
+            }
 
+            nlohmann::json entry;
             entry["Context"] = subValue->customText;
             entry["DeliveryRetryPolicy"] = subValue->retryPolicy;
             entry["Destination"] = subValue->destinationUrl;
diff --git a/redfish-core/include/redfish.hpp b/redfish-core/include/redfish.hpp
index 303519b..959c643 100644
--- a/redfish-core/include/redfish.hpp
+++ b/redfish-core/include/redfish.hpp
@@ -189,6 +189,7 @@
         nodes.emplace_back(std::make_unique<TaskCollection>(app));
         nodes.emplace_back(std::make_unique<Task>(app));
         nodes.emplace_back(std::make_unique<EventService>(app));
+        nodes.emplace_back(std::make_unique<EventServiceSSE>(app));
         nodes.emplace_back(std::make_unique<EventDestinationCollection>(app));
         nodes.emplace_back(std::make_unique<EventDestination>(app));
         nodes.emplace_back(std::make_unique<SubmitTestEvent>(app));
diff --git a/redfish-core/include/server_sent_events.hpp b/redfish-core/include/server_sent_events.hpp
new file mode 100644
index 0000000..1c4d2a5
--- /dev/null
+++ b/redfish-core/include/server_sent_events.hpp
@@ -0,0 +1,290 @@
+
+/*
+// Copyright (c) 2020 Intel Corporation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+*/
+#pragma once
+#include "node.hpp"
+
+#include <boost/asio/strand.hpp>
+#include <boost/beast/core.hpp>
+#include <boost/beast/http.hpp>
+#include <boost/beast/version.hpp>
+
+#include <cstdlib>
+#include <functional>
+#include <iostream>
+#include <memory>
+#include <queue>
+#include <string>
+
+namespace crow
+{
+
+static constexpr uint8_t maxReqQueueSize = 50;
+
+enum class SseConnState
+{
+    startInit,
+    initInProgress,
+    initialized,
+    initFailed,
+    sendInProgress,
+    sendFailed,
+    idle,
+    suspended,
+    closed
+};
+
+class ServerSentEvents : public std::enable_shared_from_this<ServerSentEvents>
+{
+  private:
+    std::shared_ptr<crow::Request::Adaptor> sseConn;
+    std::queue<std::pair<uint64_t, std::string>> requestDataQueue;
+    std::string outBuffer;
+    SseConnState state;
+    int retryCount;
+    int maxRetryAttempts;
+
+    void sendEvent(const std::string& id, const std::string& msg)
+    {
+        if (msg.empty())
+        {
+            BMCWEB_LOG_DEBUG << "Empty data, bailing out.";
+            return;
+        }
+
+        if (state == SseConnState::sendInProgress)
+        {
+            return;
+        }
+        state = SseConnState::sendInProgress;
+
+        if (!id.empty())
+        {
+            outBuffer += "id: ";
+            outBuffer.append(id.begin(), id.end());
+            outBuffer += "\n";
+        }
+
+        outBuffer += "data: ";
+        for (char character : msg)
+        {
+            outBuffer += character;
+            if (character == '\n')
+            {
+                outBuffer += "data: ";
+            }
+        }
+        outBuffer += "\n\n";
+
+        doWrite();
+    }
+
+    void doWrite()
+    {
+        if (outBuffer.empty())
+        {
+            BMCWEB_LOG_DEBUG << "All data sent successfully.";
+            // Send is successful, Lets remove data from queue
+            // check for next request data in queue.
+            requestDataQueue.pop();
+            state = SseConnState::idle;
+            checkQueue();
+            return;
+        }
+
+        sseConn->async_write_some(
+            boost::asio::buffer(outBuffer.data(), outBuffer.size()),
+            [self(shared_from_this())](boost::beast::error_code ec,
+                                       const std::size_t& bytesTransferred) {
+                self->outBuffer.erase(0, bytesTransferred);
+
+                if (ec == boost::asio::error::eof)
+                {
+                    // Send is successful, Lets remove data from queue
+                    // check for next request data in queue.
+                    self->requestDataQueue.pop();
+                    self->state = SseConnState::idle;
+                    self->checkQueue();
+                    return;
+                }
+
+                if (ec)
+                {
+                    BMCWEB_LOG_ERROR << "async_write_some() failed: "
+                                     << ec.message();
+                    self->state = SseConnState::sendFailed;
+                    self->checkQueue();
+                    return;
+                }
+                BMCWEB_LOG_DEBUG << "async_write_some() bytes transferred: "
+                                 << bytesTransferred;
+                boost::ignore_unused(bytesTransferred);
+
+                self->doWrite();
+            });
+    }
+
+    void startSSE()
+    {
+        if (state == SseConnState::initInProgress)
+        {
+            return;
+        }
+        state = SseConnState::initInProgress;
+
+        BMCWEB_LOG_DEBUG << "starting SSE connection ";
+        using BodyType = boost::beast::http::buffer_body;
+        auto response =
+            std::make_shared<boost::beast::http::response<BodyType>>(
+                boost::beast::http::status::ok, 11);
+        auto serializer =
+            std::make_shared<boost::beast::http::response_serializer<BodyType>>(
+                *response);
+
+        // TODO: Add hostname in http header.
+        response->set(boost::beast::http::field::server, "iBMC");
+        response->set(boost::beast::http::field::content_type,
+                      "text/event-stream");
+        response->body().data = nullptr;
+        response->body().size = 0;
+        response->body().more = true;
+
+        boost::beast::http::async_write_header(
+            *sseConn, *serializer,
+            [this, response, serializer](const boost::beast::error_code& ec,
+                                         const std::size_t& bytesTransferred) {
+                if (ec)
+                {
+                    BMCWEB_LOG_ERROR << "Error sending header" << ec;
+                    state = SseConnState::initFailed;
+                    checkQueue();
+                    return;
+                }
+
+                BMCWEB_LOG_DEBUG << "startSSE  Header sent.";
+                state = SseConnState::initialized;
+                checkQueue();
+            });
+    }
+
+    void checkQueue(const bool newRecord = false)
+    {
+        if (requestDataQueue.empty())
+        {
+            BMCWEB_LOG_DEBUG << "requestDataQueue is empty\n";
+            return;
+        }
+
+        if (retryCount >= maxRetryAttempts)
+        {
+            BMCWEB_LOG_ERROR << "Maximum number of retries is reached.";
+
+            // Clear queue.
+            while (!requestDataQueue.empty())
+            {
+                requestDataQueue.pop();
+            }
+
+            // TODO: Take 'DeliveryRetryPolicy' action.
+            // For now, doing 'SuspendRetries' action.
+            state = SseConnState::suspended;
+            return;
+        }
+
+        if ((state == SseConnState::initFailed) ||
+            (state == SseConnState::sendFailed))
+        {
+            if (newRecord)
+            {
+                // We are already running async wait and retry.
+                // Since record is added to queue, it gets the
+                // turn in FIFO.
+                return;
+            }
+
+            retryCount++;
+            // TODO: Perform async wait for retryTimeoutInterval before proceed.
+        }
+        else
+        {
+            // reset retry count.
+            retryCount = 0;
+        }
+
+        switch (state)
+        {
+            case SseConnState::initInProgress:
+            case SseConnState::sendInProgress:
+            case SseConnState::suspended:
+                // do nothing
+                break;
+            case SseConnState::initFailed:
+            {
+                startSSE();
+                break;
+            }
+            case SseConnState::initialized:
+            case SseConnState::idle:
+            case SseConnState::sendFailed:
+            {
+                std::pair<uint64_t, std::string> reqData =
+                    requestDataQueue.front();
+                sendEvent(std::to_string(reqData.first), reqData.second);
+                break;
+            }
+            default:
+                break;
+        }
+
+        return;
+    }
+
+  public:
+    ServerSentEvents(const ServerSentEvents&) = delete;
+    ServerSentEvents& operator=(const ServerSentEvents&) = delete;
+    ServerSentEvents(ServerSentEvents&&) = delete;
+    ServerSentEvents& operator=(ServerSentEvents&&) = delete;
+
+    ServerSentEvents(const std::shared_ptr<crow::Request::Adaptor>& adaptor) :
+        sseConn(std::move(adaptor)), state(SseConnState::startInit),
+        retryCount(0), maxRetryAttempts(5)
+    {
+        startSSE();
+    }
+
+    ~ServerSentEvents()
+    {}
+
+    void sendData(const uint64_t& id, const std::string& data)
+    {
+        if (state == SseConnState::suspended)
+        {
+            return;
+        }
+
+        if (requestDataQueue.size() <= maxReqQueueSize)
+        {
+            requestDataQueue.push(std::pair(id, data));
+            checkQueue(true);
+        }
+        else
+        {
+            BMCWEB_LOG_ERROR << "Request queue is full. So ignoring data.";
+        }
+    }
+};
+
+} // namespace crow
diff --git a/redfish-core/lib/event_service.hpp b/redfish-core/lib/event_service.hpp
index ba1ea19..adb6238 100644
--- a/redfish-core/lib/event_service.hpp
+++ b/redfish-core/lib/event_service.hpp
@@ -398,6 +398,66 @@
     }
 };
 
+class EventServiceSSE : public Node
+{
+  public:
+    EventServiceSSE(CrowApp& app) :
+        Node(app, "/redfish/v1/EventService/Subscriptions/SSE/")
+    {
+        entityPrivileges = {
+            {boost::beast::http::verb::get, {{"ConfigureManager"}}},
+            {boost::beast::http::verb::head, {{"ConfigureManager"}}},
+            {boost::beast::http::verb::patch, {{"ConfigureManager"}}},
+            {boost::beast::http::verb::put, {{"ConfigureManager"}}},
+            {boost::beast::http::verb::delete_, {{"ConfigureManager"}}},
+            {boost::beast::http::verb::post, {{"ConfigureManager"}}}};
+    }
+
+  private:
+    void doGet(crow::Response& res, const crow::Request& req,
+               const std::vector<std::string>& params) override
+    {
+        if (EventServiceManager::getInstance().getNumberOfSubscriptions() >=
+            maxNoOfSubscriptions)
+        {
+            messages::eventSubscriptionLimitExceeded(res);
+            res.end();
+            return;
+        }
+
+        std::shared_ptr<crow::Request::Adaptor> sseConn =
+            std::make_shared<crow::Request::Adaptor>(std::move(req.socket()));
+        std::shared_ptr<Subscription> subValue =
+            std::make_shared<Subscription>(sseConn);
+
+        // GET on this URI means, Its SSE subscriptionType.
+        subValue->subscriptionType = "SSE";
+        subValue->protocol = "Redfish";
+
+        char* filters = req.urlParams.get("$filter");
+        if (filters == nullptr)
+        {
+            subValue->eventFormatType = "Event";
+            subValue->retryPolicy = "TerminateAfterRetries";
+        }
+        else
+        {
+            // TODO: Need to read this from query params.
+            subValue->eventFormatType = "Event";
+            subValue->retryPolicy = "TerminateAfterRetries";
+        }
+
+        std::string id =
+            EventServiceManager::getInstance().addSubscription(subValue, false);
+        if (id.empty())
+        {
+            messages::internalError(res);
+            res.end();
+            return;
+        }
+    }
+};
+
 class EventDestination : public Node
 {
   public: