blob: cfc5dfb33a912b0f67ae8c64c4a39c2b94ab0066 [file] [log] [blame]
V-Sanjana88ada3b2023-04-13 15:18:31 +05301#pragma once
Ed Tanousb2896142024-01-31 15:25:47 -08002#include "http_body.hpp"
V-Sanjana88ada3b2023-04-13 15:18:31 +05303#include "http_request.hpp"
4#include "http_response.hpp"
5
V-Sanjana88ada3b2023-04-13 15:18:31 +05306#include <boost/asio/buffer.hpp>
7#include <boost/asio/steady_timer.hpp>
8#include <boost/beast/core/multi_buffer.hpp>
V-Sanjana88ada3b2023-04-13 15:18:31 +05309#include <boost/beast/websocket.hpp>
10
11#include <array>
Ed Tanous8f79c5b2024-01-30 15:56:37 -080012#include <cstddef>
V-Sanjana88ada3b2023-04-13 15:18:31 +053013#include <functional>
Ed Tanous8f79c5b2024-01-30 15:56:37 -080014#include <optional>
V-Sanjana88ada3b2023-04-13 15:18:31 +053015
16namespace crow
17{
18
19namespace sse_socket
20{
Ed Tanous93cf0ac2024-03-28 00:35:13 -070021struct Connection : public std::enable_shared_from_this<Connection>
V-Sanjana88ada3b2023-04-13 15:18:31 +053022{
23 public:
Ed Tanous6fde95f2023-06-01 07:33:34 -070024 Connection() = default;
V-Sanjana88ada3b2023-04-13 15:18:31 +053025
26 Connection(const Connection&) = delete;
27 Connection(Connection&&) = delete;
28 Connection& operator=(const Connection&) = delete;
29 Connection& operator=(const Connection&&) = delete;
30 virtual ~Connection() = default;
31
32 virtual boost::asio::io_context& getIoContext() = 0;
V-Sanjana88ada3b2023-04-13 15:18:31 +053033 virtual void close(std::string_view msg = "quit") = 0;
34 virtual void sendEvent(std::string_view id, std::string_view msg) = 0;
V-Sanjana88ada3b2023-04-13 15:18:31 +053035};
36
37template <typename Adaptor>
38class ConnectionImpl : public Connection
39{
40 public:
Ed Tanousf80a87f2024-06-16 12:10:33 -070041 ConnectionImpl(
42 Adaptor&& adaptorIn,
43 std::function<void(Connection&, const Request&)> openHandlerIn,
44 std::function<void(Connection&)> closeHandlerIn) :
Ed Tanous6fde95f2023-06-01 07:33:34 -070045 adaptor(std::move(adaptorIn)),
Ed Tanous93cf0ac2024-03-28 00:35:13 -070046 timer(static_cast<boost::asio::io_context&>(
47 adaptor.get_executor().context())),
48 openHandler(std::move(openHandlerIn)),
V-Sanjana88ada3b2023-04-13 15:18:31 +053049 closeHandler(std::move(closeHandlerIn))
Ed Tanous8f79c5b2024-01-30 15:56:37 -080050
V-Sanjana88ada3b2023-04-13 15:18:31 +053051 {
Ed Tanous62598e32023-07-17 17:06:25 -070052 BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
V-Sanjana88ada3b2023-04-13 15:18:31 +053053 }
54
55 ConnectionImpl(const ConnectionImpl&) = delete;
56 ConnectionImpl(const ConnectionImpl&&) = delete;
57 ConnectionImpl& operator=(const ConnectionImpl&) = delete;
58 ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
59
60 ~ConnectionImpl() override
61 {
Ed Tanous62598e32023-07-17 17:06:25 -070062 BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
V-Sanjana88ada3b2023-04-13 15:18:31 +053063 }
64
65 boost::asio::io_context& getIoContext() override
66 {
67 return static_cast<boost::asio::io_context&>(
68 adaptor.get_executor().context());
69 }
70
Ed Tanousf80a87f2024-06-16 12:10:33 -070071 void start(const Request& req)
V-Sanjana88ada3b2023-04-13 15:18:31 +053072 {
Ed Tanous6fde95f2023-06-01 07:33:34 -070073 if (!openHandler)
V-Sanjana88ada3b2023-04-13 15:18:31 +053074 {
Ed Tanous62598e32023-07-17 17:06:25 -070075 BMCWEB_LOG_CRITICAL("No open handler???");
Ed Tanous6fde95f2023-06-01 07:33:34 -070076 return;
V-Sanjana88ada3b2023-04-13 15:18:31 +053077 }
Ed Tanousf80a87f2024-06-16 12:10:33 -070078 openHandler(*this, req);
Ed Tanous8f79c5b2024-01-30 15:56:37 -080079 sendSSEHeader();
V-Sanjana88ada3b2023-04-13 15:18:31 +053080 }
81
82 void close(const std::string_view msg) override
83 {
Ed Tanous8f79c5b2024-01-30 15:56:37 -080084 BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg);
V-Sanjana88ada3b2023-04-13 15:18:31 +053085 // send notification to handler for cleanup
86 if (closeHandler)
87 {
Ed Tanous6fde95f2023-06-01 07:33:34 -070088 closeHandler(*this);
V-Sanjana88ada3b2023-04-13 15:18:31 +053089 }
Ed Tanous62598e32023-07-17 17:06:25 -070090 BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
Ed Tanous6fde95f2023-06-01 07:33:34 -070091 boost::beast::get_lowest_layer(adaptor).close();
V-Sanjana88ada3b2023-04-13 15:18:31 +053092 }
93
Ed Tanous6fde95f2023-06-01 07:33:34 -070094 void sendSSEHeader()
V-Sanjana88ada3b2023-04-13 15:18:31 +053095 {
Ed Tanous62598e32023-07-17 17:06:25 -070096 BMCWEB_LOG_DEBUG("Starting SSE connection");
Ed Tanous8f79c5b2024-01-30 15:56:37 -080097
Ed Tanous6fde95f2023-06-01 07:33:34 -070098 res.set(boost::beast::http::field::content_type, "text/event-stream");
Ed Tanous8ece0e42024-01-02 13:16:50 -080099 boost::beast::http::response_serializer<BodyType>& serial =
Ed Tanous8f79c5b2024-01-30 15:56:37 -0800100 serializer.emplace(res);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530101
102 boost::beast::http::async_write_header(
Ed Tanous8ece0e42024-01-02 13:16:50 -0800103 adaptor, serial,
V-Sanjana88ada3b2023-04-13 15:18:31 +0530104 std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
105 shared_from_this()));
106 }
107
108 void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
Ed Tanous6fde95f2023-06-01 07:33:34 -0700109 const boost::system::error_code& ec,
110 size_t /*bytesSent*/)
V-Sanjana88ada3b2023-04-13 15:18:31 +0530111 {
Ed Tanous6fde95f2023-06-01 07:33:34 -0700112 serializer.reset();
V-Sanjana88ada3b2023-04-13 15:18:31 +0530113 if (ec)
114 {
Ed Tanous62598e32023-07-17 17:06:25 -0700115 BMCWEB_LOG_ERROR("Error sending header{}", ec);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530116 close("async_write_header failed");
117 return;
118 }
Ed Tanous62598e32023-07-17 17:06:25 -0700119 BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
V-Sanjana88ada3b2023-04-13 15:18:31 +0530120
V-Sanjana88ada3b2023-04-13 15:18:31 +0530121 // SSE stream header sent, So let us setup monitor.
122 // Any read data on this stream will be error in case of SSE.
Ed Tanous8f79c5b2024-01-30 15:56:37 -0800123 adaptor.async_read_some(boost::asio::buffer(buffer),
124 std::bind_front(&ConnectionImpl::afterReadError,
125 this, shared_from_this()));
V-Sanjana88ada3b2023-04-13 15:18:31 +0530126 }
127
Ed Tanous6fde95f2023-06-01 07:33:34 -0700128 void afterReadError(const std::shared_ptr<Connection>& /*self*/,
Ed Tanous8f79c5b2024-01-30 15:56:37 -0800129 const boost::system::error_code& ec, size_t bytesRead)
V-Sanjana88ada3b2023-04-13 15:18:31 +0530130 {
Ed Tanous8f79c5b2024-01-30 15:56:37 -0800131 BMCWEB_LOG_DEBUG("Read {}", bytesRead);
Ed Tanous6fde95f2023-06-01 07:33:34 -0700132 if (ec == boost::asio::error::operation_aborted)
133 {
134 return;
135 }
V-Sanjana88ada3b2023-04-13 15:18:31 +0530136 if (ec)
137 {
Ed Tanous62598e32023-07-17 17:06:25 -0700138 BMCWEB_LOG_ERROR("Read error: {}", ec);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530139 }
140
Ed Tanous6fde95f2023-06-01 07:33:34 -0700141 close("Close SSE connection");
V-Sanjana88ada3b2023-04-13 15:18:31 +0530142 }
143
144 void doWrite()
145 {
V-Sanjana88ada3b2023-04-13 15:18:31 +0530146 if (doingWrite)
147 {
148 return;
149 }
150 if (inputBuffer.size() == 0)
151 {
Ed Tanous62598e32023-07-17 17:06:25 -0700152 BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
V-Sanjana88ada3b2023-04-13 15:18:31 +0530153 return;
154 }
Ed Tanous6fde95f2023-06-01 07:33:34 -0700155 startTimeout();
V-Sanjana88ada3b2023-04-13 15:18:31 +0530156 doingWrite = true;
157
158 adaptor.async_write_some(
159 inputBuffer.data(),
160 std::bind_front(&ConnectionImpl::doWriteCallback, this,
Ed Tanous8f79c5b2024-01-30 15:56:37 -0800161 shared_from_this()));
V-Sanjana88ada3b2023-04-13 15:18:31 +0530162 }
163
Ed Tanous8f79c5b2024-01-30 15:56:37 -0800164 void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
V-Sanjana88ada3b2023-04-13 15:18:31 +0530165 const boost::beast::error_code& ec,
Ed Tanous6fde95f2023-06-01 07:33:34 -0700166 size_t bytesTransferred)
V-Sanjana88ada3b2023-04-13 15:18:31 +0530167 {
Ed Tanous6fde95f2023-06-01 07:33:34 -0700168 timer.cancel();
V-Sanjana88ada3b2023-04-13 15:18:31 +0530169 doingWrite = false;
170 inputBuffer.consume(bytesTransferred);
171
172 if (ec == boost::asio::error::eof)
173 {
Ed Tanous62598e32023-07-17 17:06:25 -0700174 BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
V-Sanjana88ada3b2023-04-13 15:18:31 +0530175 close("SSE stream closed");
176 return;
177 }
178
179 if (ec)
180 {
Ed Tanous62598e32023-07-17 17:06:25 -0700181 BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
V-Sanjana88ada3b2023-04-13 15:18:31 +0530182 close("async_write_some failed");
183 return;
184 }
Ed Tanous62598e32023-07-17 17:06:25 -0700185 BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
186 bytesTransferred);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530187
188 doWrite();
189 }
190
V-Sanjana88ada3b2023-04-13 15:18:31 +0530191 void sendEvent(std::string_view id, std::string_view msg) override
192 {
193 if (msg.empty())
194 {
Ed Tanous62598e32023-07-17 17:06:25 -0700195 BMCWEB_LOG_DEBUG("Empty data, bailing out.");
V-Sanjana88ada3b2023-04-13 15:18:31 +0530196 return;
197 }
198
Ed Tanous6fde95f2023-06-01 07:33:34 -0700199 dataFormat(id, msg);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530200
201 doWrite();
202 }
203
Ed Tanous6fde95f2023-06-01 07:33:34 -0700204 void dataFormat(std::string_view id, std::string_view msg)
V-Sanjana88ada3b2023-04-13 15:18:31 +0530205 {
Ed Tanous8f79c5b2024-01-30 15:56:37 -0800206 constexpr size_t bufferLimit = 10485760U; // 10MB
207 if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit)
208 {
209 BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client");
210 close("Buffer overflow");
211 return;
212 }
V-Sanjana88ada3b2023-04-13 15:18:31 +0530213 std::string rawData;
214 if (!id.empty())
215 {
216 rawData += "id: ";
Ed Tanous6fde95f2023-06-01 07:33:34 -0700217 rawData.append(id);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530218 rawData += "\n";
219 }
220
221 rawData += "data: ";
222 for (char character : msg)
223 {
224 rawData += character;
225 if (character == '\n')
226 {
227 rawData += "data: ";
228 }
229 }
230 rawData += "\n\n";
231
Ed Tanous44106f32024-04-06 13:48:50 -0700232 size_t copied = boost::asio::buffer_copy(
233 inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData));
234 inputBuffer.commit(copied);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530235 }
236
Ed Tanous6fde95f2023-06-01 07:33:34 -0700237 void startTimeout()
V-Sanjana88ada3b2023-04-13 15:18:31 +0530238 {
V-Sanjana88ada3b2023-04-13 15:18:31 +0530239 std::weak_ptr<Connection> weakSelf = weak_from_this();
240 timer.expires_after(std::chrono::seconds(30));
241 timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
242 this, weak_from_this()));
243 }
244
245 void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
Ed Tanous6fde95f2023-06-01 07:33:34 -0700246 const boost::system::error_code& ec)
V-Sanjana88ada3b2023-04-13 15:18:31 +0530247 {
248 std::shared_ptr<Connection> self = weakSelf.lock();
249 if (!self)
250 {
Ed Tanous62598e32023-07-17 17:06:25 -0700251 BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
252 logPtr(self.get()));
V-Sanjana88ada3b2023-04-13 15:18:31 +0530253 return;
254 }
255
256 if (ec == boost::asio::error::operation_aborted)
257 {
Ed Tanous8f79c5b2024-01-30 15:56:37 -0800258 BMCWEB_LOG_DEBUG("Timer operation aborted");
Ed Tanous8ece0e42024-01-02 13:16:50 -0800259 // Canceled wait means the path succeeded.
V-Sanjana88ada3b2023-04-13 15:18:31 +0530260 return;
261 }
262 if (ec)
263 {
Ed Tanous62598e32023-07-17 17:06:25 -0700264 BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530265 }
266
Ed Tanous8f79c5b2024-01-30 15:56:37 -0800267 BMCWEB_LOG_WARNING("{} Connection timed out, closing",
Ed Tanous62598e32023-07-17 17:06:25 -0700268 logPtr(self.get()));
V-Sanjana88ada3b2023-04-13 15:18:31 +0530269
270 self->close("closing connection");
271 }
272
273 private:
Ed Tanous8f79c5b2024-01-30 15:56:37 -0800274 std::array<char, 1> buffer{};
V-Sanjana88ada3b2023-04-13 15:18:31 +0530275 boost::beast::multi_buffer inputBuffer;
276
Ed Tanous8f79c5b2024-01-30 15:56:37 -0800277 Adaptor adaptor;
278
Ed Tanousb2896142024-01-31 15:25:47 -0800279 using BodyType = bmcweb::HttpBody;
Ed Tanous8f79c5b2024-01-30 15:56:37 -0800280 boost::beast::http::response<BodyType> res;
281 std::optional<boost::beast::http::response_serializer<BodyType>> serializer;
Ed Tanous6fde95f2023-06-01 07:33:34 -0700282 boost::asio::steady_timer timer;
V-Sanjana88ada3b2023-04-13 15:18:31 +0530283 bool doingWrite = false;
V-Sanjana88ada3b2023-04-13 15:18:31 +0530284
Ed Tanousf80a87f2024-06-16 12:10:33 -0700285 std::function<void(Connection&, const Request&)> openHandler;
Ed Tanous6fde95f2023-06-01 07:33:34 -0700286 std::function<void(Connection&)> closeHandler;
V-Sanjana88ada3b2023-04-13 15:18:31 +0530287};
288} // namespace sse_socket
289} // namespace crow