Break out sse into a compile unit
Verify similar to beb96b0 Break out websockets
Break out the SSE functions into a separate compile unit. This allows
the SSE sockets in beast to be compiled separately, which significantly
reduces the overall compile time by a few seconds. Code is identical
with the exceptions of minor header definitions to convert header-only
to compile unit.
Change-Id: I5aae4f17cbd2badf75b3e0bb644a2309f6300663
Signed-off-by: Ed Tanous <etanous@nvidia.com>
diff --git a/http/routing/sserule.cpp b/http/routing/sserule.cpp
new file mode 100644
index 0000000..2b71caf
--- /dev/null
+++ b/http/routing/sserule.cpp
@@ -0,0 +1,66 @@
+// SPDX-License-Identifier: Apache-2.0
+// SPDX-FileCopyrightText: Copyright OpenBMC Authors
+
+#include "sserule.hpp"
+
+#include "async_resp.hpp"
+#include "baserule.hpp"
+#include "http_request.hpp"
+#include "http_response.hpp"
+#include "logging.hpp"
+#include "server_sent_event_impl.hpp"
+
+#include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/ssl/stream.hpp>
+#include <boost/beast/http/status.hpp>
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+namespace crow
+{
+
+SseSocketRule::SseSocketRule(const std::string& ruleIn) : BaseRule(ruleIn)
+{
+ isUpgrade = true;
+ // Clear GET handler
+ methodsBitfield = 0;
+}
+
+void SseSocketRule::validate() {}
+
+void SseSocketRule::handle(const Request& /*req*/,
+ const std::shared_ptr<bmcweb::AsyncResp>& asyncResp,
+ const std::vector<std::string>& /*params*/)
+{
+ BMCWEB_LOG_ERROR(
+ "Handle called on websocket rule. This should never happen");
+ asyncResp->res.result(boost::beast::http::status::internal_server_error);
+}
+
+void SseSocketRule::handleUpgrade(
+ const Request& req, const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/,
+ boost::asio::ip::tcp::socket&& adaptor)
+{
+ std::shared_ptr<
+ crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>
+ myConnection = std::make_shared<
+ crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>(
+ std::move(adaptor), openHandler, closeHandler);
+ myConnection->start(req);
+}
+void SseSocketRule::handleUpgrade(
+ const Request& req, const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/,
+ boost::asio::ssl::stream<boost::asio::ip::tcp::socket>&& adaptor)
+{
+ std::shared_ptr<crow::sse_socket::ConnectionImpl<
+ boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>>
+ myConnection = std::make_shared<crow::sse_socket::ConnectionImpl<
+ boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>>(
+ std::move(adaptor), openHandler, closeHandler);
+ myConnection->start(req);
+}
+
+} // namespace crow
diff --git a/http/routing/sserule.hpp b/http/routing/sserule.hpp
index e32af57..9fb6ba3 100644
--- a/http/routing/sserule.hpp
+++ b/http/routing/sserule.hpp
@@ -5,18 +5,14 @@
#include "async_resp.hpp"
#include "baserule.hpp"
#include "http_request.hpp"
-#include "http_response.hpp"
-#include "logging.hpp"
#include "server_sent_event.hpp"
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/stream.hpp>
-#include <boost/beast/http/status.hpp>
#include <functional>
#include <memory>
#include <string>
-#include <utility>
#include <vector>
namespace crow
@@ -27,48 +23,21 @@
using self_t = SseSocketRule;
public:
- explicit SseSocketRule(const std::string& ruleIn) : BaseRule(ruleIn)
- {
- isUpgrade = true;
- // Clear GET handler
- methodsBitfield = 0;
- }
+ explicit SseSocketRule(const std::string& ruleIn);
- void validate() override {}
+ void validate() override;
void handle(const Request& /*req*/,
const std::shared_ptr<bmcweb::AsyncResp>& asyncResp,
- const std::vector<std::string>& /*params*/) override
- {
- BMCWEB_LOG_ERROR(
- "Handle called on websocket rule. This should never happen");
- asyncResp->res.result(
- boost::beast::http::status::internal_server_error);
- }
+ const std::vector<std::string>& /*params*/) override;
void handleUpgrade(const Request& req,
const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/,
- boost::asio::ip::tcp::socket&& adaptor) override
- {
- std::shared_ptr<
- crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>
- myConnection = std::make_shared<
- crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>(
- std::move(adaptor), openHandler, closeHandler);
- myConnection->start(req);
- }
+ boost::asio::ip::tcp::socket&& adaptor) override;
void handleUpgrade(const Request& req,
const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/,
boost::asio::ssl::stream<boost::asio::ip::tcp::socket>&&
- adaptor) override
- {
- std::shared_ptr<crow::sse_socket::ConnectionImpl<
- boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>>
- myConnection = std::make_shared<crow::sse_socket::ConnectionImpl<
- boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>>(
- std::move(adaptor), openHandler, closeHandler);
- myConnection->start(req);
- }
+ adaptor) override;
template <typename Func>
self_t& onopen(Func f)
diff --git a/http/server_sent_event.hpp b/http/server_sent_event.hpp
index 31db6ff..deeb1a9 100644
--- a/http/server_sent_event.hpp
+++ b/http/server_sent_event.hpp
@@ -2,29 +2,11 @@
// SPDX-FileCopyrightText: Copyright OpenBMC Authors
#pragma once
#include "boost_formatters.hpp"
-#include "http_body.hpp"
-#include "http_request.hpp"
-#include "io_context_singleton.hpp"
-#include "logging.hpp"
-#include <boost/asio/buffer.hpp>
-#include <boost/asio/error.hpp>
-#include <boost/asio/steady_timer.hpp>
-#include <boost/beast/core/error.hpp>
-#include <boost/beast/core/multi_buffer.hpp>
-#include <boost/beast/http/field.hpp>
-#include <boost/beast/http/serializer.hpp>
#include <boost/beast/http/write.hpp>
-#include <array>
-#include <chrono>
-#include <cstddef>
-#include <functional>
#include <memory>
-#include <optional>
-#include <string>
#include <string_view>
-#include <utility>
namespace crow
{
@@ -45,245 +27,5 @@
virtual void close(std::string_view msg = "quit") = 0;
virtual void sendSseEvent(std::string_view id, std::string_view msg) = 0;
};
-
-template <typename Adaptor>
-class ConnectionImpl : public Connection
-{
- public:
- ConnectionImpl(
- Adaptor&& adaptorIn,
- std::function<void(Connection&, const Request&)> openHandlerIn,
- std::function<void(Connection&)> closeHandlerIn) :
- adaptor(std::move(adaptorIn)), timer(getIoContext()),
- openHandler(std::move(openHandlerIn)),
- closeHandler(std::move(closeHandlerIn))
-
- {
- BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
- }
-
- ConnectionImpl(const ConnectionImpl&) = delete;
- ConnectionImpl(const ConnectionImpl&&) = delete;
- ConnectionImpl& operator=(const ConnectionImpl&) = delete;
- ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
-
- ~ConnectionImpl() override
- {
- BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
- }
-
- void start(const Request& req)
- {
- BMCWEB_LOG_DEBUG("Starting SSE connection");
-
- res.set(boost::beast::http::field::content_type, "text/event-stream");
- boost::beast::http::response_serializer<BodyType>& serial =
- serializer.emplace(res);
-
- boost::beast::http::async_write_header(
- adaptor, serial,
- std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
- shared_from_this(), req));
- }
-
- void close(const std::string_view msg) override
- {
- BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg);
- // send notification to handler for cleanup
- if (closeHandler)
- {
- closeHandler(*this);
- }
- BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
- boost::beast::get_lowest_layer(adaptor).close();
- }
-
- void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
- const Request& req,
- const boost::system::error_code& ec,
- size_t /*bytesSent*/)
- {
- serializer.reset();
- if (ec)
- {
- BMCWEB_LOG_ERROR("Error sending header{}", ec);
- close("async_write_header failed");
- return;
- }
- BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
- if (!openHandler)
- {
- BMCWEB_LOG_CRITICAL("No open handler???");
- return;
- }
- openHandler(*this, req);
-
- // SSE stream header sent, So let us setup monitor.
- // Any read data on this stream will be error in case of SSE.
- adaptor.async_read_some(boost::asio::buffer(buffer),
- std::bind_front(&ConnectionImpl::afterReadError,
- this, shared_from_this()));
- }
-
- void afterReadError(const std::shared_ptr<Connection>& /*self*/,
- const boost::system::error_code& ec, size_t bytesRead)
- {
- BMCWEB_LOG_DEBUG("Read {}", bytesRead);
- if (ec == boost::asio::error::operation_aborted)
- {
- return;
- }
- if (ec)
- {
- BMCWEB_LOG_ERROR("Read error: {}", ec);
- }
-
- close("Close SSE connection");
- }
-
- void doWrite()
- {
- if (doingWrite)
- {
- return;
- }
- if (inputBuffer.size() == 0)
- {
- BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
- return;
- }
- startTimeout();
- doingWrite = true;
-
- adaptor.async_write_some(
- inputBuffer.data(),
- std::bind_front(&ConnectionImpl::doWriteCallback, this,
- shared_from_this()));
- }
-
- void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
- const boost::beast::error_code& ec,
- size_t bytesTransferred)
- {
- timer.cancel();
- doingWrite = false;
- inputBuffer.consume(bytesTransferred);
-
- if (ec == boost::asio::error::eof)
- {
- BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
- close("SSE stream closed");
- return;
- }
-
- if (ec)
- {
- BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
- close("async_write_some failed");
- return;
- }
- BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
- bytesTransferred);
-
- doWrite();
- }
-
- void sendSseEvent(std::string_view id, std::string_view msg) override
- {
- if (msg.empty())
- {
- BMCWEB_LOG_DEBUG("Empty data, bailing out.");
- return;
- }
-
- dataFormat(id, msg);
-
- doWrite();
- }
-
- void dataFormat(std::string_view id, std::string_view msg)
- {
- constexpr size_t bufferLimit = 10485760U; // 10MB
- if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit)
- {
- BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client");
- close("Buffer overflow");
- return;
- }
- std::string rawData;
- if (!id.empty())
- {
- rawData += "id: ";
- rawData.append(id);
- rawData += "\n";
- }
-
- rawData += "data: ";
- for (char character : msg)
- {
- rawData += character;
- if (character == '\n')
- {
- rawData += "data: ";
- }
- }
- rawData += "\n\n";
-
- size_t copied = boost::asio::buffer_copy(
- inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData));
- inputBuffer.commit(copied);
- }
-
- void startTimeout()
- {
- std::weak_ptr<Connection> weakSelf = weak_from_this();
- timer.expires_after(std::chrono::seconds(30));
- timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
- this, weak_from_this()));
- }
-
- void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
- const boost::system::error_code& ec)
- {
- std::shared_ptr<Connection> self = weakSelf.lock();
- if (!self)
- {
- BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
- logPtr(self.get()));
- return;
- }
-
- if (ec == boost::asio::error::operation_aborted)
- {
- BMCWEB_LOG_DEBUG("Timer operation aborted");
- // Canceled wait means the path succeeded.
- return;
- }
- if (ec)
- {
- BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
- }
-
- BMCWEB_LOG_WARNING("{} Connection timed out, closing",
- logPtr(self.get()));
-
- self->close("closing connection");
- }
-
- private:
- std::array<char, 1> buffer{};
- boost::beast::multi_buffer inputBuffer;
-
- Adaptor adaptor;
-
- using BodyType = bmcweb::HttpBody;
- boost::beast::http::response<BodyType> res;
- std::optional<boost::beast::http::response_serializer<BodyType>> serializer;
- boost::asio::steady_timer timer;
- bool doingWrite = false;
-
- std::function<void(Connection&, const Request&)> openHandler;
- std::function<void(Connection&)> closeHandler;
-};
} // namespace sse_socket
} // namespace crow
diff --git a/http/server_sent_event_impl.hpp b/http/server_sent_event_impl.hpp
new file mode 100644
index 0000000..fe9ed2e
--- /dev/null
+++ b/http/server_sent_event_impl.hpp
@@ -0,0 +1,276 @@
+// SPDX-License-Identifier: Apache-2.0
+// SPDX-FileCopyrightText: Copyright OpenBMC Authors
+#pragma once
+#include "boost_formatters.hpp"
+#include "http_body.hpp"
+#include "http_request.hpp"
+#include "io_context_singleton.hpp"
+#include "logging.hpp"
+#include "server_sent_event.hpp"
+
+#include <boost/asio/buffer.hpp>
+#include <boost/asio/error.hpp>
+#include <boost/asio/steady_timer.hpp>
+#include <boost/beast/core/error.hpp>
+#include <boost/beast/core/multi_buffer.hpp>
+#include <boost/beast/http/field.hpp>
+#include <boost/beast/http/serializer.hpp>
+#include <boost/beast/http/write.hpp>
+
+#include <array>
+#include <chrono>
+#include <cstddef>
+#include <functional>
+#include <memory>
+#include <optional>
+#include <string>
+#include <string_view>
+#include <utility>
+
+namespace crow
+{
+
+namespace sse_socket
+{
+
+template <typename Adaptor>
+class ConnectionImpl : public Connection
+{
+ public:
+ ConnectionImpl(
+ Adaptor&& adaptorIn,
+ std::function<void(Connection&, const Request&)> openHandlerIn,
+ std::function<void(Connection&)> closeHandlerIn) :
+ adaptor(std::move(adaptorIn)), timer(getIoContext()),
+ openHandler(std::move(openHandlerIn)),
+ closeHandler(std::move(closeHandlerIn))
+
+ {
+ BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
+ }
+
+ ConnectionImpl(const ConnectionImpl&) = delete;
+ ConnectionImpl(const ConnectionImpl&&) = delete;
+ ConnectionImpl& operator=(const ConnectionImpl&) = delete;
+ ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
+
+ ~ConnectionImpl() override
+ {
+ BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
+ }
+
+ void start(const Request& req)
+ {
+ BMCWEB_LOG_DEBUG("Starting SSE connection");
+
+ res.set(boost::beast::http::field::content_type, "text/event-stream");
+ boost::beast::http::response_serializer<BodyType>& serial =
+ serializer.emplace(res);
+
+ boost::beast::http::async_write_header(
+ adaptor, serial,
+ std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
+ shared_from_this(), req));
+ }
+
+ void close(const std::string_view msg) override
+ {
+ BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg);
+ // send notification to handler for cleanup
+ if (closeHandler)
+ {
+ closeHandler(*this);
+ }
+ BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
+ boost::beast::get_lowest_layer(adaptor).close();
+ }
+
+ void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
+ const Request& req,
+ const boost::system::error_code& ec,
+ size_t /*bytesSent*/)
+ {
+ serializer.reset();
+ if (ec)
+ {
+ BMCWEB_LOG_ERROR("Error sending header{}", ec);
+ close("async_write_header failed");
+ return;
+ }
+ BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
+ if (!openHandler)
+ {
+ BMCWEB_LOG_CRITICAL("No open handler???");
+ return;
+ }
+ openHandler(*this, req);
+
+ // SSE stream header sent, So let us setup monitor.
+ // Any read data on this stream will be error in case of SSE.
+ adaptor.async_read_some(boost::asio::buffer(buffer),
+ std::bind_front(&ConnectionImpl::afterReadError,
+ this, shared_from_this()));
+ }
+
+ void afterReadError(const std::shared_ptr<Connection>& /*self*/,
+ const boost::system::error_code& ec, size_t bytesRead)
+ {
+ BMCWEB_LOG_DEBUG("Read {}", bytesRead);
+ if (ec == boost::asio::error::operation_aborted)
+ {
+ return;
+ }
+ if (ec)
+ {
+ BMCWEB_LOG_ERROR("Read error: {}", ec);
+ }
+
+ close("Close SSE connection");
+ }
+
+ void doWrite()
+ {
+ if (doingWrite)
+ {
+ return;
+ }
+ if (inputBuffer.size() == 0)
+ {
+ BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
+ return;
+ }
+ startTimeout();
+ doingWrite = true;
+
+ adaptor.async_write_some(
+ inputBuffer.data(),
+ std::bind_front(&ConnectionImpl::doWriteCallback, this,
+ shared_from_this()));
+ }
+
+ void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
+ const boost::beast::error_code& ec,
+ size_t bytesTransferred)
+ {
+ timer.cancel();
+ doingWrite = false;
+ inputBuffer.consume(bytesTransferred);
+
+ if (ec == boost::asio::error::eof)
+ {
+ BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
+ close("SSE stream closed");
+ return;
+ }
+
+ if (ec)
+ {
+ BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
+ close("async_write_some failed");
+ return;
+ }
+ BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
+ bytesTransferred);
+
+ doWrite();
+ }
+
+ void sendSseEvent(std::string_view id, std::string_view msg) override
+ {
+ if (msg.empty())
+ {
+ BMCWEB_LOG_DEBUG("Empty data, bailing out.");
+ return;
+ }
+
+ dataFormat(id, msg);
+
+ doWrite();
+ }
+
+ void dataFormat(std::string_view id, std::string_view msg)
+ {
+ constexpr size_t bufferLimit = 10485760U; // 10MB
+ if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit)
+ {
+ BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client");
+ close("Buffer overflow");
+ return;
+ }
+ std::string rawData;
+ if (!id.empty())
+ {
+ rawData += "id: ";
+ rawData.append(id);
+ rawData += "\n";
+ }
+
+ rawData += "data: ";
+ for (char character : msg)
+ {
+ rawData += character;
+ if (character == '\n')
+ {
+ rawData += "data: ";
+ }
+ }
+ rawData += "\n\n";
+
+ size_t copied = boost::asio::buffer_copy(
+ inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData));
+ inputBuffer.commit(copied);
+ }
+
+ void startTimeout()
+ {
+ std::weak_ptr<Connection> weakSelf = weak_from_this();
+ timer.expires_after(std::chrono::seconds(30));
+ timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
+ this, weak_from_this()));
+ }
+
+ void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
+ const boost::system::error_code& ec)
+ {
+ std::shared_ptr<Connection> self = weakSelf.lock();
+ if (!self)
+ {
+ BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
+ logPtr(self.get()));
+ return;
+ }
+
+ if (ec == boost::asio::error::operation_aborted)
+ {
+ BMCWEB_LOG_DEBUG("Timer operation aborted");
+ // Canceled wait means the path succeeded.
+ return;
+ }
+ if (ec)
+ {
+ BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
+ }
+
+ BMCWEB_LOG_WARNING("{} Connection timed out, closing",
+ logPtr(self.get()));
+
+ self->close("closing connection");
+ }
+
+ private:
+ std::array<char, 1> buffer{};
+ boost::beast::multi_buffer inputBuffer;
+
+ Adaptor adaptor;
+
+ using BodyType = bmcweb::HttpBody;
+ boost::beast::http::response<BodyType> res;
+ std::optional<boost::beast::http::response_serializer<BodyType>> serializer;
+ boost::asio::steady_timer timer;
+ bool doingWrite = false;
+
+ std::function<void(Connection&, const Request&)> openHandler;
+ std::function<void(Connection&)> closeHandler;
+};
+} // namespace sse_socket
+} // namespace crow
diff --git a/meson.build b/meson.build
index 9eb5488..54f7cde 100644
--- a/meson.build
+++ b/meson.build
@@ -367,6 +367,7 @@
srcfiles_bmcweb = files(
'http/mutual_tls.cpp',
+ 'http/routing/sserule.cpp',
'http/routing/websocketrule.cpp',
'redfish-core/src/dbus_log_watcher.cpp',
'redfish-core/src/error_message_utils.cpp',
diff --git a/test/http/server_sent_event_test.cpp b/test/http/server_sent_event_test.cpp
index e18686e..18d9545 100644
--- a/test/http/server_sent_event_test.cpp
+++ b/test/http/server_sent_event_test.cpp
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright OpenBMC Authors
#include "http/server_sent_event.hpp"
+#include "http/server_sent_event_impl.hpp"
#include "http_request.hpp"
#include "test_stream.hpp"