Add unit test for SSE
Writing this test exposed some bugs in SSE that got merged.
sendSSEHeader was never called, leading to a connection that starts
and immediately closes with no error code.
This issue has been corrected in code, such that the sockets start.
To allow for unit tests, the io_service needs to be passed into the
class, previously, the SSE connection was pulling the io_context from
the DBus connection, which is odd, given that the SSE connection has
no other dependencies on DBus.
Unit tests should help keep it working.
Tested: Unit tests pass.
Change-Id: I48080d2a94b6349989f556cd1c7b103bad498526
Signed-off-by: Ed Tanous <ed@tanous.net>
diff --git a/http/routing/sserule.hpp b/http/routing/sserule.hpp
index c0a4e50..1a422a8 100644
--- a/http/routing/sserule.hpp
+++ b/http/routing/sserule.hpp
@@ -31,7 +31,7 @@
}
#ifndef BMCWEB_ENABLE_SSL
- void handleUpgrade(const Request& /*req*/,
+ void handleUpgrade(const Request& req,
const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/,
boost::asio::ip::tcp::socket&& adaptor) override
{
@@ -39,11 +39,11 @@
crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>
myConnection = std::make_shared<
crow::sse_socket::ConnectionImpl<boost::asio::ip::tcp::socket>>(
- std::move(adaptor), openHandler, closeHandler);
+ *req.ioService, std::move(adaptor), openHandler, closeHandler);
myConnection->start();
}
#else
- void handleUpgrade(const Request& /*req*/,
+ void handleUpgrade(const Request& req,
const std::shared_ptr<bmcweb::AsyncResp>& /*asyncResp*/,
boost::beast::ssl_stream<boost::asio::ip::tcp::socket>&&
adaptor) override
@@ -52,7 +52,7 @@
boost::beast::ssl_stream<boost::asio::ip::tcp::socket>>>
myConnection = std::make_shared<crow::sse_socket::ConnectionImpl<
boost::beast::ssl_stream<boost::asio::ip::tcp::socket>>>(
- std::move(adaptor), openHandler, closeHandler);
+ *req.ioService, std::move(adaptor), openHandler, closeHandler);
myConnection->start();
}
#endif
diff --git a/http/server_sent_event.hpp b/http/server_sent_event.hpp
index f4ad21a..730bdce 100644
--- a/http/server_sent_event.hpp
+++ b/http/server_sent_event.hpp
@@ -1,20 +1,16 @@
#pragma once
-#include "dbus_singleton.hpp"
#include "http_request.hpp"
#include "http_response.hpp"
#include <boost/asio/buffer.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/beast/core/multi_buffer.hpp>
-#include <boost/beast/http/buffer_body.hpp>
#include <boost/beast/websocket.hpp>
#include <array>
+#include <cstddef>
#include <functional>
-
-#ifdef BMCWEB_ENABLE_SSL
-#include <boost/beast/websocket/ssl.hpp>
-#endif
+#include <optional>
namespace crow
{
@@ -41,12 +37,13 @@
class ConnectionImpl : public Connection
{
public:
- ConnectionImpl(Adaptor&& adaptorIn,
+ ConnectionImpl(boost::asio::io_context& ioIn, Adaptor&& adaptorIn,
std::function<void(Connection&)> openHandlerIn,
std::function<void(Connection&)> closeHandlerIn) :
adaptor(std::move(adaptorIn)),
- timer(ioc), openHandler(std::move(openHandlerIn)),
+ ioc(ioIn), timer(ioc), openHandler(std::move(openHandlerIn)),
closeHandler(std::move(closeHandlerIn))
+
{
BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
}
@@ -75,10 +72,12 @@
return;
}
openHandler(*this);
+ sendSSEHeader();
}
void close(const std::string_view msg) override
{
+ BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg);
// send notification to handler for cleanup
if (closeHandler)
{
@@ -91,13 +90,10 @@
void sendSSEHeader()
{
BMCWEB_LOG_DEBUG("Starting SSE connection");
- using BodyType = boost::beast::http::buffer_body;
- boost::beast::http::response<BodyType> res(
- boost::beast::http::status::ok, 11, BodyType{});
+
res.set(boost::beast::http::field::content_type, "text/event-stream");
- res.body().more = true;
boost::beast::http::response_serializer<BodyType>& serial =
- serializer.emplace(std::move(res));
+ serializer.emplace(res);
boost::beast::http::async_write_header(
adaptor, serial,
@@ -118,19 +114,17 @@
}
BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
- serializer.reset();
-
// SSE stream header sent, So let us setup monitor.
// Any read data on this stream will be error in case of SSE.
-
- adaptor.async_wait(boost::asio::ip::tcp::socket::wait_error,
- std::bind_front(&ConnectionImpl::afterReadError,
- this, shared_from_this()));
+ adaptor.async_read_some(boost::asio::buffer(buffer),
+ std::bind_front(&ConnectionImpl::afterReadError,
+ this, shared_from_this()));
}
void afterReadError(const std::shared_ptr<Connection>& /*self*/,
- const boost::system::error_code& ec)
+ const boost::system::error_code& ec, size_t bytesRead)
{
+ BMCWEB_LOG_DEBUG("Read {}", bytesRead);
if (ec == boost::asio::error::operation_aborted)
{
return;
@@ -160,18 +154,13 @@
adaptor.async_write_some(
inputBuffer.data(),
std::bind_front(&ConnectionImpl::doWriteCallback, this,
- weak_from_this()));
+ shared_from_this()));
}
- void doWriteCallback(const std::weak_ptr<Connection>& weak,
+ void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
const boost::beast::error_code& ec,
size_t bytesTransferred)
{
- auto self = weak.lock();
- if (self == nullptr)
- {
- return;
- }
timer.cancel();
doingWrite = false;
inputBuffer.consume(bytesTransferred);
@@ -210,6 +199,13 @@
void dataFormat(std::string_view id, std::string_view msg)
{
+ constexpr size_t bufferLimit = 10485760U; // 10MB
+ if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit)
+ {
+ BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client");
+ close("Buffer overflow");
+ return;
+ }
std::string rawData;
if (!id.empty())
{
@@ -255,7 +251,7 @@
if (ec == boost::asio::error::operation_aborted)
{
- BMCWEB_LOG_DEBUG("operation aborted");
+ BMCWEB_LOG_DEBUG("Timer operation aborted");
// Canceled wait means the path succeeded.
return;
}
@@ -264,22 +260,22 @@
BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
}
- BMCWEB_LOG_WARNING("{}Connection timed out, closing",
+ BMCWEB_LOG_WARNING("{} Connection timed out, closing",
logPtr(self.get()));
self->close("closing connection");
}
private:
- Adaptor adaptor;
-
+ std::array<char, 1> buffer{};
boost::beast::multi_buffer inputBuffer;
- std::optional<boost::beast::http::response_serializer<
- boost::beast::http::buffer_body>>
- serializer;
- boost::asio::io_context& ioc =
- crow::connections::systemBus->get_io_context();
+ Adaptor adaptor;
+
+ using BodyType = bmcweb::FileBody;
+ boost::beast::http::response<BodyType> res;
+ std::optional<boost::beast::http::response_serializer<BodyType>> serializer;
+ boost::asio::io_context& ioc;
boost::asio::steady_timer timer;
bool doingWrite = false;
diff --git a/meson.build b/meson.build
index 33c8b12..d644b79 100644
--- a/meson.build
+++ b/meson.build
@@ -429,6 +429,7 @@
'test/http/http_response_test.cpp',
'test/http/utility_test.cpp',
'test/http/verb_test.cpp',
+ 'test/http/server_sent_event_test.cpp',
'test/http/mutual_tls_meta.cpp',
'test/http/http_file_body_test.cpp',
'test/include/dbus_utility_test.cpp',
diff --git a/test/http/server_sent_event_test.cpp b/test/http/server_sent_event_test.cpp
new file mode 100644
index 0000000..d3f9403
--- /dev/null
+++ b/test/http/server_sent_event_test.cpp
@@ -0,0 +1,111 @@
+#include "boost/asio/read.hpp"
+#include "boost/asio/read_until.hpp"
+#include "http/http_request.hpp"
+#include "http/http_response.hpp"
+#include "http/server_sent_event.hpp"
+
+#include <boost/asio/steady_timer.hpp>
+#include <boost/beast/_experimental/test/stream.hpp>
+
+#include <memory>
+#include <string>
+#include <string_view>
+
+#include "gtest/gtest.h"
+namespace crow
+{
+namespace sse_socket
+{
+
+namespace
+{
+
+TEST(ServerSentEvent, SseWorks)
+{
+ boost::asio::io_context io;
+ boost::beast::test::stream stream(io);
+ boost::beast::test::stream out(io);
+ stream.connect(out);
+
+ bool openCalled = false;
+ auto openHandler = [&openCalled](Connection&) { openCalled = true; };
+ bool closeCalled = false;
+ auto closeHandler = [&closeCalled](Connection&) { closeCalled = true; };
+
+ std::shared_ptr<ConnectionImpl<boost::beast::test::stream>> conn =
+ std::make_shared<ConnectionImpl<boost::beast::test::stream>>(
+ io, std::move(stream), openHandler, closeHandler);
+ conn->start();
+ // Connect
+ {
+ constexpr std::string_view expected =
+ "HTTP/1.1 200 OK\r\n"
+ "Content-Type: text/event-stream\r\n"
+ "\r\n";
+
+ while (out.str().size() != expected.size())
+ {
+ io.run_for(std::chrono::milliseconds(1));
+ }
+
+ std::string eventContent;
+ eventContent.resize(expected.size());
+ boost::asio::read(out, boost::asio::buffer(eventContent));
+
+ EXPECT_EQ(eventContent, expected);
+ EXPECT_TRUE(openCalled);
+ EXPECT_FALSE(closeCalled);
+ EXPECT_TRUE(out.str().empty());
+ }
+ // Send one event
+ {
+ conn->sendEvent("TestEventId", "TestEventContent");
+ std::string_view expected = "id: TestEventId\n"
+ "data: TestEventContent\n"
+ "\n";
+
+ while (out.str().size() < expected.size())
+ {
+ io.run_for(std::chrono::milliseconds(1));
+ }
+ EXPECT_EQ(out.str(), expected);
+
+ std::string eventContent;
+ eventContent.resize(expected.size());
+ boost::asio::read(out, boost::asio::buffer(eventContent));
+
+ EXPECT_EQ(eventContent, expected);
+ EXPECT_TRUE(out.str().empty());
+ }
+ // Send second event
+ {
+ conn->sendEvent("TestEventId2", "TestEvent\nContent2");
+ constexpr std::string_view expected = "id: TestEventId2\n"
+ "data: TestEvent\n"
+ "data: Content2\n"
+ "\n";
+
+ while (out.str().size() < expected.size())
+ {
+ io.run_for(std::chrono::milliseconds(1));
+ }
+
+ std::string eventContent;
+ eventContent.resize(expected.size());
+ boost::asio::read(out, boost::asio::buffer(eventContent));
+ EXPECT_EQ(eventContent, expected);
+ EXPECT_TRUE(out.str().empty());
+ }
+ // close the remote
+ {
+ out.close();
+ while (!closeCalled)
+ {
+ io.run_for(std::chrono::milliseconds(1));
+ }
+ }
+}
+} // namespace
+
+} // namespace sse_socket
+} // namespace crow