Break out sse into a compile unit
Verify similar to beb96b0 Break out websockets
Break out the SSE functions into a separate compile unit. This allows
the SSE sockets in beast to be compiled separately, which significantly
reduces the overall compile time by a few seconds. Code is identical
with the exceptions of minor header definitions to convert header-only
to compile unit.
Change-Id: I5aae4f17cbd2badf75b3e0bb644a2309f6300663
Signed-off-by: Ed Tanous <etanous@nvidia.com>
diff --git a/http/server_sent_event_impl.hpp b/http/server_sent_event_impl.hpp
new file mode 100644
index 0000000..fe9ed2e
--- /dev/null
+++ b/http/server_sent_event_impl.hpp
@@ -0,0 +1,276 @@
+// SPDX-License-Identifier: Apache-2.0
+// SPDX-FileCopyrightText: Copyright OpenBMC Authors
+#pragma once
+#include "boost_formatters.hpp"
+#include "http_body.hpp"
+#include "http_request.hpp"
+#include "io_context_singleton.hpp"
+#include "logging.hpp"
+#include "server_sent_event.hpp"
+
+#include <boost/asio/buffer.hpp>
+#include <boost/asio/error.hpp>
+#include <boost/asio/steady_timer.hpp>
+#include <boost/beast/core/error.hpp>
+#include <boost/beast/core/multi_buffer.hpp>
+#include <boost/beast/http/field.hpp>
+#include <boost/beast/http/serializer.hpp>
+#include <boost/beast/http/write.hpp>
+
+#include <array>
+#include <chrono>
+#include <cstddef>
+#include <functional>
+#include <memory>
+#include <optional>
+#include <string>
+#include <string_view>
+#include <utility>
+
+namespace crow
+{
+
+namespace sse_socket
+{
+
+template <typename Adaptor>
+class ConnectionImpl : public Connection
+{
+ public:
+ ConnectionImpl(
+ Adaptor&& adaptorIn,
+ std::function<void(Connection&, const Request&)> openHandlerIn,
+ std::function<void(Connection&)> closeHandlerIn) :
+ adaptor(std::move(adaptorIn)), timer(getIoContext()),
+ openHandler(std::move(openHandlerIn)),
+ closeHandler(std::move(closeHandlerIn))
+
+ {
+ BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
+ }
+
+ ConnectionImpl(const ConnectionImpl&) = delete;
+ ConnectionImpl(const ConnectionImpl&&) = delete;
+ ConnectionImpl& operator=(const ConnectionImpl&) = delete;
+ ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
+
+ ~ConnectionImpl() override
+ {
+ BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
+ }
+
+ void start(const Request& req)
+ {
+ BMCWEB_LOG_DEBUG("Starting SSE connection");
+
+ res.set(boost::beast::http::field::content_type, "text/event-stream");
+ boost::beast::http::response_serializer<BodyType>& serial =
+ serializer.emplace(res);
+
+ boost::beast::http::async_write_header(
+ adaptor, serial,
+ std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
+ shared_from_this(), req));
+ }
+
+ void close(const std::string_view msg) override
+ {
+ BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg);
+ // send notification to handler for cleanup
+ if (closeHandler)
+ {
+ closeHandler(*this);
+ }
+ BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
+ boost::beast::get_lowest_layer(adaptor).close();
+ }
+
+ void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
+ const Request& req,
+ const boost::system::error_code& ec,
+ size_t /*bytesSent*/)
+ {
+ serializer.reset();
+ if (ec)
+ {
+ BMCWEB_LOG_ERROR("Error sending header{}", ec);
+ close("async_write_header failed");
+ return;
+ }
+ BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
+ if (!openHandler)
+ {
+ BMCWEB_LOG_CRITICAL("No open handler???");
+ return;
+ }
+ openHandler(*this, req);
+
+ // SSE stream header sent, So let us setup monitor.
+ // Any read data on this stream will be error in case of SSE.
+ adaptor.async_read_some(boost::asio::buffer(buffer),
+ std::bind_front(&ConnectionImpl::afterReadError,
+ this, shared_from_this()));
+ }
+
+ void afterReadError(const std::shared_ptr<Connection>& /*self*/,
+ const boost::system::error_code& ec, size_t bytesRead)
+ {
+ BMCWEB_LOG_DEBUG("Read {}", bytesRead);
+ if (ec == boost::asio::error::operation_aborted)
+ {
+ return;
+ }
+ if (ec)
+ {
+ BMCWEB_LOG_ERROR("Read error: {}", ec);
+ }
+
+ close("Close SSE connection");
+ }
+
+ void doWrite()
+ {
+ if (doingWrite)
+ {
+ return;
+ }
+ if (inputBuffer.size() == 0)
+ {
+ BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
+ return;
+ }
+ startTimeout();
+ doingWrite = true;
+
+ adaptor.async_write_some(
+ inputBuffer.data(),
+ std::bind_front(&ConnectionImpl::doWriteCallback, this,
+ shared_from_this()));
+ }
+
+ void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
+ const boost::beast::error_code& ec,
+ size_t bytesTransferred)
+ {
+ timer.cancel();
+ doingWrite = false;
+ inputBuffer.consume(bytesTransferred);
+
+ if (ec == boost::asio::error::eof)
+ {
+ BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
+ close("SSE stream closed");
+ return;
+ }
+
+ if (ec)
+ {
+ BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
+ close("async_write_some failed");
+ return;
+ }
+ BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
+ bytesTransferred);
+
+ doWrite();
+ }
+
+ void sendSseEvent(std::string_view id, std::string_view msg) override
+ {
+ if (msg.empty())
+ {
+ BMCWEB_LOG_DEBUG("Empty data, bailing out.");
+ return;
+ }
+
+ dataFormat(id, msg);
+
+ doWrite();
+ }
+
+ void dataFormat(std::string_view id, std::string_view msg)
+ {
+ constexpr size_t bufferLimit = 10485760U; // 10MB
+ if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit)
+ {
+ BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client");
+ close("Buffer overflow");
+ return;
+ }
+ std::string rawData;
+ if (!id.empty())
+ {
+ rawData += "id: ";
+ rawData.append(id);
+ rawData += "\n";
+ }
+
+ rawData += "data: ";
+ for (char character : msg)
+ {
+ rawData += character;
+ if (character == '\n')
+ {
+ rawData += "data: ";
+ }
+ }
+ rawData += "\n\n";
+
+ size_t copied = boost::asio::buffer_copy(
+ inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData));
+ inputBuffer.commit(copied);
+ }
+
+ void startTimeout()
+ {
+ std::weak_ptr<Connection> weakSelf = weak_from_this();
+ timer.expires_after(std::chrono::seconds(30));
+ timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
+ this, weak_from_this()));
+ }
+
+ void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
+ const boost::system::error_code& ec)
+ {
+ std::shared_ptr<Connection> self = weakSelf.lock();
+ if (!self)
+ {
+ BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
+ logPtr(self.get()));
+ return;
+ }
+
+ if (ec == boost::asio::error::operation_aborted)
+ {
+ BMCWEB_LOG_DEBUG("Timer operation aborted");
+ // Canceled wait means the path succeeded.
+ return;
+ }
+ if (ec)
+ {
+ BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
+ }
+
+ BMCWEB_LOG_WARNING("{} Connection timed out, closing",
+ logPtr(self.get()));
+
+ self->close("closing connection");
+ }
+
+ private:
+ std::array<char, 1> buffer{};
+ boost::beast::multi_buffer inputBuffer;
+
+ Adaptor adaptor;
+
+ using BodyType = bmcweb::HttpBody;
+ boost::beast::http::response<BodyType> res;
+ std::optional<boost::beast::http::response_serializer<BodyType>> serializer;
+ boost::asio::steady_timer timer;
+ bool doingWrite = false;
+
+ std::function<void(Connection&, const Request&)> openHandler;
+ std::function<void(Connection&)> closeHandler;
+};
+} // namespace sse_socket
+} // namespace crow