Add unit test for SSE

Writing this test exposed some bugs in SSE that got merged.
sendSSEHeader was never called, leading to a connection that starts
and immediately closes with no error code.

This issue has been corrected in code, such that the sockets start.

To allow for unit tests, the io_service needs to be passed into the
class, previously, the SSE connection was pulling the io_context from
the DBus connection, which is odd, given that the SSE connection has
no other dependencies on DBus.

Unit tests should help keep it working.

Tested: Unit tests pass.

Change-Id: I48080d2a94b6349989f556cd1c7b103bad498526
Signed-off-by: Ed Tanous <ed@tanous.net>
diff --git a/http/routing/sserule.hpp b/http/routing/sserule.hpp
index c0a4e50..1a422a8 100644
--- a/http/routing/sserule.hpp
+++ b/http/routing/sserule.hpp
@@ -31,7 +31,7 @@
     }
 
 #ifndef BMCWEB_ENABLE_SSL
-    void handleUpgrade(const Request& /*req*/,
+    void handleUpgrade(const Request& req,
                        const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/,
                        boost::asio::ip::tcp::socket&& adaptor) override
     {
@@ -39,11 +39,11 @@
             crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>
             myConnection = std::make_shared<
                 crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>(
-                std::move(adaptor), openHandler, closeHandler);
+                *req.ioService, std::move(adaptor), openHandler, closeHandler);
         myConnection->start();
     }
 #else
-    void handleUpgrade(const Request& /*req*/,
+    void handleUpgrade(const Request& req,
                        const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/,
                        boost::beast::ssl_stream<boost::asio::ip::tcp::socket>&&
                            adaptor) override
@@ -52,7 +52,7 @@
             boost::beast::ssl_stream<boost::asio::ip::tcp::socket>>>
             myConnection = std::make_shared<crow::sse_socket::ConnectionImpl<
                 boost::beast::ssl_stream<boost::asio::ip::tcp::socket>>>(
-                std::move(adaptor), openHandler, closeHandler);
+                *req.ioService, std::move(adaptor), openHandler, closeHandler);
         myConnection->start();
     }
 #endif
diff --git a/http/server_sent_event.hpp b/http/server_sent_event.hpp
index f4ad21a..730bdce 100644
--- a/http/server_sent_event.hpp
+++ b/http/server_sent_event.hpp
@@ -1,20 +1,16 @@
 #pragma once
-#include "dbus_singleton.hpp"
 #include "http_request.hpp"
 #include "http_response.hpp"
 
 #include <boost/asio/buffer.hpp>
 #include <boost/asio/steady_timer.hpp>
 #include <boost/beast/core/multi_buffer.hpp>
-#include <boost/beast/http/buffer_body.hpp>
 #include <boost/beast/websocket.hpp>
 
 #include <array>
+#include <cstddef>
 #include <functional>
-
-#ifdef BMCWEB_ENABLE_SSL
-#include <boost/beast/websocket/ssl.hpp>
-#endif
+#include <optional>
 
 namespace crow
 {
@@ -41,12 +37,13 @@
 class ConnectionImpl : public Connection
 {
   public:
-    ConnectionImpl(Adaptor&& adaptorIn,
+    ConnectionImpl(boost::asio::io_context& ioIn, Adaptor&& adaptorIn,
                    std::function<void(Connection&)> openHandlerIn,
                    std::function<void(Connection&)> closeHandlerIn) :
         adaptor(std::move(adaptorIn)),
-        timer(ioc), openHandler(std::move(openHandlerIn)),
+        ioc(ioIn), timer(ioc), openHandler(std::move(openHandlerIn)),
         closeHandler(std::move(closeHandlerIn))
+
     {
         BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
     }
@@ -75,10 +72,12 @@
             return;
         }
         openHandler(*this);
+        sendSSEHeader();
     }
 
     void close(const std::string_view msg) override
     {
+        BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg);
         // send notification to handler for cleanup
         if (closeHandler)
         {
@@ -91,13 +90,10 @@
     void sendSSEHeader()
     {
         BMCWEB_LOG_DEBUG("Starting SSE connection");
-        using BodyType = boost::beast::http::buffer_body;
-        boost::beast::http::response<BodyType> res(
-            boost::beast::http::status::ok, 11, BodyType{});
+
         res.set(boost::beast::http::field::content_type, "text/event-stream");
-        res.body().more = true;
         boost::beast::http::response_serializer<BodyType>& serial =
-            serializer.emplace(std::move(res));
+            serializer.emplace(res);
 
         boost::beast::http::async_write_header(
             adaptor, serial,
@@ -118,19 +114,17 @@
         }
         BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
 
-        serializer.reset();
-
         // SSE stream header sent, So let us setup monitor.
         // Any read data on this stream will be error in case of SSE.
-
-        adaptor.async_wait(boost::asio::ip::tcp::socket::wait_error,
-                           std::bind_front(&ConnectionImpl::afterReadError,
-                                           this, shared_from_this()));
+        adaptor.async_read_some(boost::asio::buffer(buffer),
+                                std::bind_front(&ConnectionImpl::afterReadError,
+                                                this, shared_from_this()));
     }
 
     void afterReadError(const std::shared_ptr<Connection>& /*self*/,
-                        const boost::system::error_code& ec)
+                        const boost::system::error_code& ec, size_t bytesRead)
     {
+        BMCWEB_LOG_DEBUG("Read {}", bytesRead);
         if (ec == boost::asio::error::operation_aborted)
         {
             return;
@@ -160,18 +154,13 @@
         adaptor.async_write_some(
             inputBuffer.data(),
             std::bind_front(&ConnectionImpl::doWriteCallback, this,
-                            weak_from_this()));
+                            shared_from_this()));
     }
 
-    void doWriteCallback(const std::weak_ptr<Connection>& weak,
+    void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
                          const boost::beast::error_code& ec,
                          size_t bytesTransferred)
     {
-        auto self = weak.lock();
-        if (self == nullptr)
-        {
-            return;
-        }
         timer.cancel();
         doingWrite = false;
         inputBuffer.consume(bytesTransferred);
@@ -210,6 +199,13 @@
 
     void dataFormat(std::string_view id, std::string_view msg)
     {
+        constexpr size_t bufferLimit = 10485760U; // 10MB
+        if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit)
+        {
+            BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client");
+            close("Buffer overflow");
+            return;
+        }
         std::string rawData;
         if (!id.empty())
         {
@@ -255,7 +251,7 @@
 
         if (ec == boost::asio::error::operation_aborted)
         {
-            BMCWEB_LOG_DEBUG("operation aborted");
+            BMCWEB_LOG_DEBUG("Timer operation aborted");
             // Canceled wait means the path succeeded.
             return;
         }
@@ -264,22 +260,22 @@
             BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
         }
 
-        BMCWEB_LOG_WARNING("{}Connection timed out, closing",
+        BMCWEB_LOG_WARNING("{} Connection timed out, closing",
                            logPtr(self.get()));
 
         self->close("closing connection");
     }
 
   private:
-    Adaptor adaptor;
-
+    std::array<char, 1> buffer{};
     boost::beast::multi_buffer inputBuffer;
 
-    std::optional<boost::beast::http::response_serializer<
-        boost::beast::http::buffer_body>>
-        serializer;
-    boost::asio::io_context& ioc =
-        crow::connections::systemBus->get_io_context();
+    Adaptor adaptor;
+
+    using BodyType = bmcweb::FileBody;
+    boost::beast::http::response<BodyType> res;
+    std::optional<boost::beast::http::response_serializer<BodyType>> serializer;
+    boost::asio::io_context& ioc;
     boost::asio::steady_timer timer;
     bool doingWrite = false;
 
diff --git a/meson.build b/meson.build
index 33c8b12..d644b79 100644
--- a/meson.build
+++ b/meson.build
@@ -429,6 +429,7 @@
   'test/http/http_response_test.cpp',
   'test/http/utility_test.cpp',
   'test/http/verb_test.cpp',
+  'test/http/server_sent_event_test.cpp',
   'test/http/mutual_tls_meta.cpp',
   'test/http/http_file_body_test.cpp',
   'test/include/dbus_utility_test.cpp',
diff --git a/test/http/server_sent_event_test.cpp b/test/http/server_sent_event_test.cpp
new file mode 100644
index 0000000..d3f9403
--- /dev/null
+++ b/test/http/server_sent_event_test.cpp
@@ -0,0 +1,111 @@
+#include "boost/asio/read.hpp"
+#include "boost/asio/read_until.hpp"
+#include "http/http_request.hpp"
+#include "http/http_response.hpp"
+#include "http/server_sent_event.hpp"
+
+#include <boost/asio/steady_timer.hpp>
+#include <boost/beast/_experimental/test/stream.hpp>
+
+#include <memory>
+#include <string>
+#include <string_view>
+
+#include "gtest/gtest.h"
+namespace crow
+{
+namespace sse_socket
+{
+
+namespace
+{
+
+TEST(ServerSentEvent, SseWorks)
+{
+    boost::asio::io_context io;
+    boost::beast::test::stream stream(io);
+    boost::beast::test::stream out(io);
+    stream.connect(out);
+
+    bool openCalled = false;
+    auto openHandler = [&openCalled](Connection&) { openCalled = true; };
+    bool closeCalled = false;
+    auto closeHandler = [&closeCalled](Connection&) { closeCalled = true; };
+
+    std::shared_ptr<ConnectionImpl<boost::beast::test::stream>> conn =
+        std::make_shared<ConnectionImpl<boost::beast::test::stream>>(
+            io, std::move(stream), openHandler, closeHandler);
+    conn->start();
+    // Connect
+    {
+        constexpr std::string_view expected =
+            "HTTP/1.1 200 OK\r\n"
+            "Content-Type: text/event-stream\r\n"
+            "\r\n";
+
+        while (out.str().size() != expected.size())
+        {
+            io.run_for(std::chrono::milliseconds(1));
+        }
+
+        std::string eventContent;
+        eventContent.resize(expected.size());
+        boost::asio::read(out, boost::asio::buffer(eventContent));
+
+        EXPECT_EQ(eventContent, expected);
+        EXPECT_TRUE(openCalled);
+        EXPECT_FALSE(closeCalled);
+        EXPECT_TRUE(out.str().empty());
+    }
+    // Send one event
+    {
+        conn->sendEvent("TestEventId", "TestEventContent");
+        std::string_view expected = "id: TestEventId\n"
+                                    "data: TestEventContent\n"
+                                    "\n";
+
+        while (out.str().size() < expected.size())
+        {
+            io.run_for(std::chrono::milliseconds(1));
+        }
+        EXPECT_EQ(out.str(), expected);
+
+        std::string eventContent;
+        eventContent.resize(expected.size());
+        boost::asio::read(out, boost::asio::buffer(eventContent));
+
+        EXPECT_EQ(eventContent, expected);
+        EXPECT_TRUE(out.str().empty());
+    }
+    // Send second event
+    {
+        conn->sendEvent("TestEventId2", "TestEvent\nContent2");
+        constexpr std::string_view expected = "id: TestEventId2\n"
+                                              "data: TestEvent\n"
+                                              "data: Content2\n"
+                                              "\n";
+
+        while (out.str().size() < expected.size())
+        {
+            io.run_for(std::chrono::milliseconds(1));
+        }
+
+        std::string eventContent;
+        eventContent.resize(expected.size());
+        boost::asio::read(out, boost::asio::buffer(eventContent));
+        EXPECT_EQ(eventContent, expected);
+        EXPECT_TRUE(out.str().empty());
+    }
+    // close the remote
+    {
+        out.close();
+        while (!closeCalled)
+        {
+            io.run_for(std::chrono::milliseconds(1));
+        }
+    }
+}
+} // namespace
+
+} // namespace sse_socket
+} // namespace crow