Server-sent-event fixes

This makes several changes to server-sent events to allow it to merge
to master.  The routing system has been removed in leiu of using
content-type eventstream detection.  Timers have been added to the
sse connections, and sse connections now rely on async_wait, rather
than a full read.

Tested: WIP

Signed-off-by: Ed Tanous <edtanous@google.com>
Change-Id: Id0ff0ebc2b3a795b3dba008e440556a9fdd882c2
diff --git a/http/app.hpp b/http/app.hpp
index 6388d84..d3cf48c 100644
--- a/http/app.hpp
+++ b/http/app.hpp
@@ -105,11 +105,6 @@
         router.validate();
     }
 
-    static bool isSseRoute(Request& req)
-    {
-        return Router::isSseRoute(req);
-    }
-
     void run()
     {
         validate();
diff --git a/http/http_connection.hpp b/http/http_connection.hpp
index 1b85c6b..7ae22e9 100644
--- a/http/http_connection.hpp
+++ b/http/http_connection.hpp
@@ -243,12 +243,14 @@
             [self(shared_from_this())](crow::Response& thisRes) {
             self->completeRequest(thisRes);
         });
-
+        bool isSse =
+            isContentTypeAllowed(req->getHeaderValue("Accept"),
+                                 http_helpers::ContentType::EventStream, false);
         if ((thisReq.isUpgrade() &&
              boost::iequals(
                  thisReq.getHeaderValue(boost::beast::http::field::upgrade),
                  "websocket")) ||
-            (Handler::isSseRoute(*req)))
+            isSse)
         {
             asyncResp->res.setCompleteRequestHandler(
                 [self(shared_from_this())](crow::Response& thisRes) {
diff --git a/http/http_response.hpp b/http/http_response.hpp
index 06b6939..1a4ef16 100644
--- a/http/http_response.hpp
+++ b/http/http_response.hpp
@@ -16,18 +16,10 @@
 template <typename Adaptor, typename Handler>
 class Connection;
 
-namespace sse_socket
-{
-template <typename Adaptor>
-class ConnectionImpl;
-} // namespace sse_socket
-
 struct Response
 {
     template <typename Adaptor, typename Handler>
     friend class crow::Connection;
-    template <typename Adaptor>
-    friend class crow::sse_socket::ConnectionImpl;
     using response_type =
         boost::beast::http::response<boost::beast::http::string_body>;
 
diff --git a/http/routing.hpp b/http/routing.hpp
index ead3af9..ac1c310 100644
--- a/http/routing.hpp
+++ b/http/routing.hpp
@@ -391,7 +391,7 @@
     }
 
 #ifndef BMCWEB_ENABLE_SSL
-    void handleUpgrade(const Request& req,
+    void handleUpgrade(const Request& /*req*/,
                        const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/,
                        boost::asio::ip::tcp::socket&& adaptor) override
     {
@@ -399,11 +399,11 @@
             crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>
             myConnection = std::make_shared<
                 crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>(
-                req, std::move(adaptor), openHandler, closeHandler);
+                std::move(adaptor), openHandler, closeHandler);
         myConnection->start();
     }
 #else
-    void handleUpgrade(const Request& req,
+    void handleUpgrade(const Request& /*req*/,
                        const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/,
                        boost::beast::ssl_stream<boost::asio::ip::tcp::socket>&&
                            adaptor) override
@@ -412,7 +412,7 @@
             boost::beast::ssl_stream<boost::asio::ip::tcp::socket>>>
             myConnection = std::make_shared<crow::sse_socket::ConnectionImpl<
                 boost::beast::ssl_stream<boost::asio::ip::tcp::socket>>>(
-                req, std::move(adaptor), openHandler, closeHandler);
+                std::move(adaptor), openHandler, closeHandler);
         myConnection->start();
     }
 #endif
@@ -432,12 +432,8 @@
     }
 
   private:
-    std::function<void(std::shared_ptr<crow::sse_socket::Connection>&,
-                       const crow::Request&,
-                       const std::shared_ptr<bmcweb::AsyncResp>&)>
-        openHandler;
-    std::function<void(std::shared_ptr<crow::sse_socket::Connection>&)>
-        closeHandler;
+    std::function<void(crow::sse_socket::Connection&)> openHandler;
+    std::function<void(crow::sse_socket::Connection&)> closeHandler;
 };
 
 template <typename T>
@@ -461,13 +457,6 @@
         return *p;
     }
 
-    self_t& name(std::string_view name) noexcept
-    {
-        self_t* self = static_cast<self_t*>(this);
-        self->nameStr = name;
-        return *self;
-    }
-
     self_t& methods(boost::beast::http::verb method)
     {
         self_t* self = static_cast<self_t*>(this);
@@ -1184,15 +1173,6 @@
         return true;
     }
 
-    static bool isSseRoute(Request& req)
-    {
-        return std::any_of(sse_socket::sseRoutes.begin(),
-                           sse_socket::sseRoutes.end(),
-                           [&req](const char* sseRoute) {
-            return (req.url().encoded_path() == sseRoute);
-        });
-    }
-
     static bool
         isUserPrivileged(Request& req,
                          const std::shared_ptr<bmcweb::AsyncResp>& asyncResp,
diff --git a/http/server_sent_event.hpp b/http/server_sent_event.hpp
index 58659c8..02af0b7 100644
--- a/http/server_sent_event.hpp
+++ b/http/server_sent_event.hpp
@@ -1,6 +1,5 @@
 #pragma once
-#include "async_resolve.hpp"
-#include "async_resp.hpp"
+#include "dbus_singleton.hpp"
 #include "http_request.hpp"
 #include "http_response.hpp"
 
@@ -23,13 +22,10 @@
 
 namespace sse_socket
 {
-static constexpr const std::array<const char*, 1> sseRoutes = {
-    "/redfish/v1/EventService/SSE"};
-
 struct Connection : std::enable_shared_from_this<Connection>
 {
   public:
-    explicit Connection(const crow::Request& reqIn) : req(reqIn) {}
+    Connection() = default;
 
     Connection(const Connection&) = delete;
     Connection(Connection&&) = delete;
@@ -38,26 +34,19 @@
     virtual ~Connection() = default;
 
     virtual boost::asio::io_context& getIoContext() = 0;
-    virtual void sendSSEHeader() = 0;
-    virtual void completeRequest(crow::Response& thisRes) = 0;
     virtual void close(std::string_view msg = "quit") = 0;
     virtual void sendEvent(std::string_view id, std::string_view msg) = 0;
-
-    crow::Request req;
 };
 
 template <typename Adaptor>
 class ConnectionImpl : public Connection
 {
   public:
-    ConnectionImpl(
-        const crow::Request& reqIn, Adaptor adaptorIn,
-        std::function<void(std::shared_ptr<Connection>&, const crow::Request&,
-                           const std::shared_ptr<bmcweb::AsyncResp>&)>
-            openHandlerIn,
-        std::function<void(std::shared_ptr<Connection>&)> closeHandlerIn) :
-        Connection(reqIn),
-        adaptor(std::move(adaptorIn)), openHandler(std::move(openHandlerIn)),
+    ConnectionImpl(Adaptor&& adaptorIn,
+                   std::function<void(Connection&)> openHandlerIn,
+                   std::function<void(Connection&)> closeHandlerIn) :
+        adaptor(std::move(adaptorIn)),
+        timer(ioc), openHandler(std::move(openHandlerIn)),
         closeHandler(std::move(closeHandlerIn))
     {
         BMCWEB_LOG_DEBUG << "SseConnectionImpl: SSE constructor " << this;
@@ -81,61 +70,47 @@
 
     void start()
     {
-        if (openHandler)
+        if (!openHandler)
         {
-            auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
-            std::shared_ptr<Connection> self = this->shared_from_this();
-
-            asyncResp->res.setCompleteRequestHandler(
-                [self(shared_from_this())](crow::Response& thisRes) {
-                if (thisRes.resultInt() != 200)
-                {
-                    self->completeRequest(thisRes);
-                }
-            });
-
-            openHandler(self, req, asyncResp);
+            BMCWEB_LOG_CRITICAL << "No open handler???";
+            return;
         }
+        openHandler(*this);
     }
 
     void close(const std::string_view msg) override
     {
-        BMCWEB_LOG_DEBUG << "Closing SSE connection " << this << " - " << msg;
-        boost::beast::get_lowest_layer(adaptor).close();
-
         // send notification to handler for cleanup
         if (closeHandler)
         {
-            std::shared_ptr<Connection> self = shared_from_this();
-            closeHandler(self);
+            closeHandler(*this);
         }
+        BMCWEB_LOG_DEBUG << "Closing SSE connection " << this << " - " << msg;
+        boost::beast::get_lowest_layer(adaptor).close();
     }
 
-    void sendSSEHeader() override
+    void sendSSEHeader()
     {
         BMCWEB_LOG_DEBUG << "Starting SSE connection";
-        auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
         using BodyType = boost::beast::http::buffer_body;
-        auto response =
-            std::make_shared<boost::beast::http::response<BodyType>>(
-                boost::beast::http::status::ok, 11);
-
-        serializer.emplace(*asyncResp->res.stringResponse);
-
-        response->set(boost::beast::http::field::content_type,
-                      "text/event-stream");
-        response->body().more = true;
+        boost::beast::http::response<BodyType> res(
+            boost::beast::http::status::ok, 11, BodyType{});
+        res.set(boost::beast::http::field::content_type, "text/event-stream");
+        res.body().more = true;
+        boost::beast::http::response_serializer<BodyType>& ser =
+            serializer.emplace(std::move(res));
 
         boost::beast::http::async_write_header(
-            adaptor, *serializer,
+            adaptor, ser,
             std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
                             shared_from_this()));
     }
 
     void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
-                               const boost::beast::error_code& ec,
-                               const std::size_t& /*unused*/)
+                               const boost::system::error_code& ec,
+                               size_t /*bytesSent*/)
     {
+        serializer.reset();
         if (ec)
         {
             BMCWEB_LOG_ERROR << "Error sending header" << ec;
@@ -148,41 +123,29 @@
 
         // SSE stream header sent, So let us setup monitor.
         // Any read data on this stream will be error in case of SSE.
-        setupRead();
+
+        adaptor.async_wait(boost::asio::ip::tcp::socket::wait_error,
+                           std::bind_front(&ConnectionImpl::afterReadError,
+                                           this, shared_from_this()));
     }
 
-    void setupRead()
+    void afterReadError(const std::shared_ptr<Connection>& /*self*/,
+                        const boost::system::error_code& ec)
     {
-        std::weak_ptr<Connection> weakSelf = weak_from_this();
-
-        boost::beast::http::async_read_some(
-            adaptor, outputBuffer, *parser,
-            std::bind_front(&ConnectionImpl::setupReadCallback, this,
-                            weak_from_this()));
-    }
-
-    void setupReadCallback(const std::weak_ptr<Connection>& weakSelf,
-                           const boost::system::error_code& ec,
-                           size_t bytesRead)
-    {
-        std::shared_ptr<Connection> self = weakSelf.lock();
-        BMCWEB_LOG_DEBUG << "async_read_some: Read " << bytesRead << " bytes";
+        if (ec == boost::asio::error::operation_aborted)
+        {
+            return;
+        }
         if (ec)
         {
             BMCWEB_LOG_ERROR << "Read error: " << ec;
         }
 
-        // After establishing SSE stream, Reading data on this
-        // stream means client is disobeys the SSE protocol.
-        // Read the data to avoid buffer attacks and close connection.
-
-        self->close("Close SSE connection");
+        close("Close SSE connection");
     }
 
     void doWrite()
     {
-        onTimeout();
-
         if (doingWrite)
         {
             return;
@@ -192,18 +155,25 @@
             BMCWEB_LOG_DEBUG << "inputBuffer is empty... Bailing out";
             return;
         }
+        startTimeout();
         doingWrite = true;
 
         adaptor.async_write_some(
             inputBuffer.data(),
             std::bind_front(&ConnectionImpl::doWriteCallback, this,
-                            shared_from_this()));
+                            weak_from_this()));
     }
 
-    void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
+    void doWriteCallback(const std::weak_ptr<Connection>& weak,
                          const boost::beast::error_code& ec,
-                         const size_t bytesTransferred)
+                         size_t bytesTransferred)
     {
+        auto self = weak.lock();
+        if (self == nullptr)
+        {
+            return;
+        }
+        timer.cancel();
         doingWrite = false;
         inputBuffer.consume(bytesTransferred);
 
@@ -226,48 +196,6 @@
         doWrite();
     }
 
-    void completeRequest(crow::Response& thisRes) override
-    {
-        auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
-        asyncResp->res = std::move(thisRes);
-
-        if (asyncResp->res.body().empty() && !asyncResp->res.jsonValue.empty())
-        {
-            asyncResp->res.addHeader(boost::beast::http::field::content_type,
-                                     "application/json");
-            asyncResp->res.body() = asyncResp->res.jsonValue.dump(
-                2, ' ', true, nlohmann::json::error_handler_t::replace);
-        }
-
-        asyncResp->res.preparePayload();
-
-        serializer.emplace(*asyncResp->res.stringResponse);
-
-        boost::beast::http::async_write_some(
-            adaptor, *serializer,
-            std::bind_front(&ConnectionImpl::completeRequestCallback, this,
-                            shared_from_this()));
-    }
-
-    void completeRequestCallback(const std::shared_ptr<Connection>& /*self*/,
-                                 const boost::system::error_code& ec,
-                                 std::size_t bytesTransferred)
-    {
-        auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
-        BMCWEB_LOG_DEBUG << this << " async_write " << bytesTransferred
-                         << " bytes";
-        if (ec)
-        {
-            BMCWEB_LOG_DEBUG << this << " from async_write failed";
-            return;
-        }
-
-        BMCWEB_LOG_DEBUG << this << " Closing SSE connection - Request invalid";
-        serializer.reset();
-        close("Request invalid");
-        asyncResp->res.releaseCompleteRequestHandler();
-    }
-
     void sendEvent(std::string_view id, std::string_view msg) override
     {
         if (msg.empty())
@@ -276,19 +204,18 @@
             return;
         }
 
-        dataFormat(id);
+        dataFormat(id, msg);
 
         doWrite();
     }
 
-    void dataFormat(std::string_view id)
+    void dataFormat(std::string_view id, std::string_view msg)
     {
-        std::string_view msg;
         std::string rawData;
         if (!id.empty())
         {
             rawData += "id: ";
-            rawData.append(id.begin(), id.end());
+            rawData.append(id);
             rawData += "\n";
         }
 
@@ -308,9 +235,8 @@
         inputBuffer.commit(rawData.size());
     }
 
-    void onTimeout()
+    void startTimeout()
     {
-        boost::asio::steady_timer timer(ioc);
         std::weak_ptr<Connection> weakSelf = weak_from_this();
         timer.expires_after(std::chrono::seconds(30));
         timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
@@ -318,7 +244,7 @@
     }
 
     void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
-                           const boost::system::error_code ec)
+                           const boost::system::error_code& ec)
     {
         std::shared_ptr<Connection> self = weakSelf.lock();
         if (!self)
@@ -346,23 +272,18 @@
   private:
     Adaptor adaptor;
 
-    boost::beast::multi_buffer outputBuffer;
     boost::beast::multi_buffer inputBuffer;
 
     std::optional<boost::beast::http::response_serializer<
-        boost::beast::http::string_body>>
+        boost::beast::http::buffer_body>>
         serializer;
     boost::asio::io_context& ioc =
         crow::connections::systemBus->get_io_context();
+    boost::asio::steady_timer timer;
     bool doingWrite = false;
-    std::optional<
-        boost::beast::http::request_parser<boost::beast::http::string_body>>
-        parser;
 
-    std::function<void(std::shared_ptr<Connection>&, const crow::Request&,
-                       const std::shared_ptr<bmcweb::AsyncResp>&)>
-        openHandler;
-    std::function<void(std::shared_ptr<Connection>&)> closeHandler;
+    std::function<void(Connection&)> openHandler;
+    std::function<void(Connection&)> closeHandler;
 };
 } // namespace sse_socket
 } // namespace crow
diff --git a/include/http_utility.hpp b/include/http_utility.hpp
index 8f2478f..d18ac4b 100644
--- a/include/http_utility.hpp
+++ b/include/http_utility.hpp
@@ -26,6 +26,7 @@
     HTML,
     JSON,
     OctetStream,
+    EventStream,
 };
 
 struct ContentTypePair
@@ -34,11 +35,12 @@
     ContentType contentTypeEnum;
 };
 
-constexpr std::array<ContentTypePair, 4> contentTypes{{
+constexpr std::array<ContentTypePair, 5> contentTypes{{
     {"application/cbor", ContentType::CBOR},
     {"application/json", ContentType::JSON},
     {"application/octet-stream", ContentType::OctetStream},
     {"text/html", ContentType::HTML},
+    {"text/event-stream", ContentType::EventStream},
 }};
 
 inline ContentType