Add unit test for SSE

Writing this test exposed some bugs in SSE that got merged.
sendSSEHeader was never called, leading to a connection that starts
and immediately closes with no error code.

This issue has been corrected in code, such that the sockets start.

To allow for unit tests, the io_service needs to be passed into the
class, previously, the SSE connection was pulling the io_context from
the DBus connection, which is odd, given that the SSE connection has
no other dependencies on DBus.

Unit tests should help keep it working.

Tested: Unit tests pass.

Change-Id: I48080d2a94b6349989f556cd1c7b103bad498526
Signed-off-by: Ed Tanous <ed@tanous.net>
diff --git a/http/routing/sserule.hpp b/http/routing/sserule.hpp
index c0a4e50..1a422a8 100644
--- a/http/routing/sserule.hpp
+++ b/http/routing/sserule.hpp
@@ -31,7 +31,7 @@
     }
 
 #ifndef BMCWEB_ENABLE_SSL
-    void handleUpgrade(const Request& /*req*/,
+    void handleUpgrade(const Request& req,
                        const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/,
                        boost::asio::ip::tcp::socket&& adaptor) override
     {
@@ -39,11 +39,11 @@
             crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>
             myConnection = std::make_shared<
                 crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>(
-                std::move(adaptor), openHandler, closeHandler);
+                *req.ioService, std::move(adaptor), openHandler, closeHandler);
         myConnection->start();
     }
 #else
-    void handleUpgrade(const Request& /*req*/,
+    void handleUpgrade(const Request& req,
                        const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/,
                        boost::beast::ssl_stream<boost::asio::ip::tcp::socket>&&
                            adaptor) override
@@ -52,7 +52,7 @@
             boost::beast::ssl_stream<boost::asio::ip::tcp::socket>>>
             myConnection = std::make_shared<crow::sse_socket::ConnectionImpl<
                 boost::beast::ssl_stream<boost::asio::ip::tcp::socket>>>(
-                std::move(adaptor), openHandler, closeHandler);
+                *req.ioService, std::move(adaptor), openHandler, closeHandler);
         myConnection->start();
     }
 #endif
diff --git a/http/server_sent_event.hpp b/http/server_sent_event.hpp
index f4ad21a..730bdce 100644
--- a/http/server_sent_event.hpp
+++ b/http/server_sent_event.hpp
@@ -1,20 +1,16 @@
 #pragma once
-#include "dbus_singleton.hpp"
 #include "http_request.hpp"
 #include "http_response.hpp"
 
 #include <boost/asio/buffer.hpp>
 #include <boost/asio/steady_timer.hpp>
 #include <boost/beast/core/multi_buffer.hpp>
-#include <boost/beast/http/buffer_body.hpp>
 #include <boost/beast/websocket.hpp>
 
 #include <array>
+#include <cstddef>
 #include <functional>
-
-#ifdef BMCWEB_ENABLE_SSL
-#include <boost/beast/websocket/ssl.hpp>
-#endif
+#include <optional>
 
 namespace crow
 {
@@ -41,12 +37,13 @@
 class ConnectionImpl : public Connection
 {
   public:
-    ConnectionImpl(Adaptor&& adaptorIn,
+    ConnectionImpl(boost::asio::io_context& ioIn, Adaptor&& adaptorIn,
                    std::function<void(Connection&)> openHandlerIn,
                    std::function<void(Connection&)> closeHandlerIn) :
         adaptor(std::move(adaptorIn)),
-        timer(ioc), openHandler(std::move(openHandlerIn)),
+        ioc(ioIn), timer(ioc), openHandler(std::move(openHandlerIn)),
         closeHandler(std::move(closeHandlerIn))
+
     {
         BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
     }
@@ -75,10 +72,12 @@
             return;
         }
         openHandler(*this);
+        sendSSEHeader();
     }
 
     void close(const std::string_view msg) override
     {
+        BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg);
         // send notification to handler for cleanup
         if (closeHandler)
         {
@@ -91,13 +90,10 @@
     void sendSSEHeader()
     {
         BMCWEB_LOG_DEBUG("Starting SSE connection");
-        using BodyType = boost::beast::http::buffer_body;
-        boost::beast::http::response<BodyType> res(
-            boost::beast::http::status::ok, 11, BodyType{});
+
         res.set(boost::beast::http::field::content_type, "text/event-stream");
-        res.body().more = true;
         boost::beast::http::response_serializer<BodyType>& serial =
-            serializer.emplace(std::move(res));
+            serializer.emplace(res);
 
         boost::beast::http::async_write_header(
             adaptor, serial,
@@ -118,19 +114,17 @@
         }
         BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
 
-        serializer.reset();
-
         // SSE stream header sent, So let us setup monitor.
         // Any read data on this stream will be error in case of SSE.
-
-        adaptor.async_wait(boost::asio::ip::tcp::socket::wait_error,
-                           std::bind_front(&ConnectionImpl::afterReadError,
-                                           this, shared_from_this()));
+        adaptor.async_read_some(boost::asio::buffer(buffer),
+                                std::bind_front(&ConnectionImpl::afterReadError,
+                                                this, shared_from_this()));
     }
 
     void afterReadError(const std::shared_ptr<Connection>& /*self*/,
-                        const boost::system::error_code& ec)
+                        const boost::system::error_code& ec, size_t bytesRead)
     {
+        BMCWEB_LOG_DEBUG("Read {}", bytesRead);
         if (ec == boost::asio::error::operation_aborted)
         {
             return;
@@ -160,18 +154,13 @@
         adaptor.async_write_some(
             inputBuffer.data(),
             std::bind_front(&ConnectionImpl::doWriteCallback, this,
-                            weak_from_this()));
+                            shared_from_this()));
     }
 
-    void doWriteCallback(const std::weak_ptr<Connection>& weak,
+    void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
                          const boost::beast::error_code& ec,
                          size_t bytesTransferred)
     {
-        auto self = weak.lock();
-        if (self == nullptr)
-        {
-            return;
-        }
         timer.cancel();
         doingWrite = false;
         inputBuffer.consume(bytesTransferred);
@@ -210,6 +199,13 @@
 
     void dataFormat(std::string_view id, std::string_view msg)
     {
+        constexpr size_t bufferLimit = 10485760U; // 10MB
+        if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit)
+        {
+            BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client");
+            close("Buffer overflow");
+            return;
+        }
         std::string rawData;
         if (!id.empty())
         {
@@ -255,7 +251,7 @@
 
         if (ec == boost::asio::error::operation_aborted)
         {
-            BMCWEB_LOG_DEBUG("operation aborted");
+            BMCWEB_LOG_DEBUG("Timer operation aborted");
             // Canceled wait means the path succeeded.
             return;
         }
@@ -264,22 +260,22 @@
             BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
         }
 
-        BMCWEB_LOG_WARNING("{}Connection timed out, closing",
+        BMCWEB_LOG_WARNING("{} Connection timed out, closing",
                            logPtr(self.get()));
 
         self->close("closing connection");
     }
 
   private:
-    Adaptor adaptor;
-
+    std::array<char, 1> buffer{};
     boost::beast::multi_buffer inputBuffer;
 
-    std::optional<boost::beast::http::response_serializer<
-        boost::beast::http::buffer_body>>
-        serializer;
-    boost::asio::io_context& ioc =
-        crow::connections::systemBus->get_io_context();
+    Adaptor adaptor;
+
+    using BodyType = bmcweb::FileBody;
+    boost::beast::http::response<BodyType> res;
+    std::optional<boost::beast::http::response_serializer<BodyType>> serializer;
+    boost::asio::io_context& ioc;
     boost::asio::steady_timer timer;
     bool doingWrite = false;