blob: 5da3a8e04e4ecac7d167541ab02c3488ba4b28ba [file] [log] [blame]
Ed Tanouse60300a2025-02-23 12:31:53 -08001// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright OpenBMC Authors
3#pragma once
4#include "boost_formatters.hpp"
5#include "http_body.hpp"
6#include "http_request.hpp"
7#include "io_context_singleton.hpp"
8#include "logging.hpp"
9#include "server_sent_event.hpp"
10
11#include <boost/asio/buffer.hpp>
12#include <boost/asio/error.hpp>
13#include <boost/asio/steady_timer.hpp>
14#include <boost/beast/core/error.hpp>
15#include <boost/beast/core/multi_buffer.hpp>
16#include <boost/beast/http/field.hpp>
17#include <boost/beast/http/serializer.hpp>
18#include <boost/beast/http/write.hpp>
19
20#include <array>
21#include <chrono>
22#include <cstddef>
23#include <functional>
24#include <memory>
25#include <optional>
26#include <string>
27#include <string_view>
28#include <utility>
29
30namespace crow
31{
32
33namespace sse_socket
34{
35
36template <typename Adaptor>
37class ConnectionImpl : public Connection
38{
39 public:
40 ConnectionImpl(
41 Adaptor&& adaptorIn,
42 std::function<void(Connection&, const Request&)> openHandlerIn,
43 std::function<void(Connection&)> closeHandlerIn) :
44 adaptor(std::move(adaptorIn)), timer(getIoContext()),
45 openHandler(std::move(openHandlerIn)),
46 closeHandler(std::move(closeHandlerIn))
47
48 {
49 BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
50 }
51
52 ConnectionImpl(const ConnectionImpl&) = delete;
53 ConnectionImpl(const ConnectionImpl&&) = delete;
54 ConnectionImpl& operator=(const ConnectionImpl&) = delete;
55 ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
56
57 ~ConnectionImpl() override
58 {
59 BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
60 }
61
62 void start(const Request& req)
63 {
64 BMCWEB_LOG_DEBUG("Starting SSE connection");
65
66 res.set(boost::beast::http::field::content_type, "text/event-stream");
67 boost::beast::http::response_serializer<BodyType>& serial =
68 serializer.emplace(res);
69
70 boost::beast::http::async_write_header(
71 adaptor, serial,
72 std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
Ed Tanous608ad2c2024-05-20 19:14:50 -070073 shared_from_this(), req.copy()));
Ed Tanouse60300a2025-02-23 12:31:53 -080074 }
75
76 void close(const std::string_view msg) override
77 {
78 BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg);
79 // send notification to handler for cleanup
80 if (closeHandler)
81 {
82 closeHandler(*this);
83 }
84 BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
85 boost::beast::get_lowest_layer(adaptor).close();
86 }
87
88 void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
89 const Request& req,
90 const boost::system::error_code& ec,
91 size_t /*bytesSent*/)
92 {
93 serializer.reset();
94 if (ec)
95 {
96 BMCWEB_LOG_ERROR("Error sending header{}", ec);
97 close("async_write_header failed");
98 return;
99 }
100 BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
101 if (!openHandler)
102 {
103 BMCWEB_LOG_CRITICAL("No open handler???");
104 return;
105 }
106 openHandler(*this, req);
107
108 // SSE stream header sent, So let us setup monitor.
109 // Any read data on this stream will be error in case of SSE.
110 adaptor.async_read_some(boost::asio::buffer(buffer),
111 std::bind_front(&ConnectionImpl::afterReadError,
112 this, shared_from_this()));
113 }
114
115 void afterReadError(const std::shared_ptr<Connection>& /*self*/,
116 const boost::system::error_code& ec, size_t bytesRead)
117 {
118 BMCWEB_LOG_DEBUG("Read {}", bytesRead);
119 if (ec == boost::asio::error::operation_aborted)
120 {
121 return;
122 }
123 if (ec)
124 {
125 BMCWEB_LOG_ERROR("Read error: {}", ec);
126 }
127
128 close("Close SSE connection");
129 }
130
131 void doWrite()
132 {
133 if (doingWrite)
134 {
135 return;
136 }
137 if (inputBuffer.size() == 0)
138 {
139 BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
140 return;
141 }
142 startTimeout();
143 doingWrite = true;
144
145 adaptor.async_write_some(
146 inputBuffer.data(),
147 std::bind_front(&ConnectionImpl::doWriteCallback, this,
148 shared_from_this()));
149 }
150
151 void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
152 const boost::beast::error_code& ec,
153 size_t bytesTransferred)
154 {
155 timer.cancel();
156 doingWrite = false;
157 inputBuffer.consume(bytesTransferred);
158
159 if (ec == boost::asio::error::eof)
160 {
161 BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
162 close("SSE stream closed");
163 return;
164 }
165
166 if (ec)
167 {
168 BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
169 close("async_write_some failed");
170 return;
171 }
172 BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
173 bytesTransferred);
174
175 doWrite();
176 }
177
178 void sendSseEvent(std::string_view id, std::string_view msg) override
179 {
180 if (msg.empty())
181 {
182 BMCWEB_LOG_DEBUG("Empty data, bailing out.");
183 return;
184 }
185
186 dataFormat(id, msg);
187
188 doWrite();
189 }
190
191 void dataFormat(std::string_view id, std::string_view msg)
192 {
193 constexpr size_t bufferLimit = 10485760U; // 10MB
194 if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit)
195 {
196 BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client");
197 close("Buffer overflow");
198 return;
199 }
200 std::string rawData;
201 if (!id.empty())
202 {
203 rawData += "id: ";
204 rawData.append(id);
205 rawData += "\n";
206 }
207
208 rawData += "data: ";
209 for (char character : msg)
210 {
211 rawData += character;
212 if (character == '\n')
213 {
214 rawData += "data: ";
215 }
216 }
217 rawData += "\n\n";
218
219 size_t copied = boost::asio::buffer_copy(
220 inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData));
221 inputBuffer.commit(copied);
222 }
223
224 void startTimeout()
225 {
226 std::weak_ptr<Connection> weakSelf = weak_from_this();
227 timer.expires_after(std::chrono::seconds(30));
228 timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
229 this, weak_from_this()));
230 }
231
232 void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
233 const boost::system::error_code& ec)
234 {
235 std::shared_ptr<Connection> self = weakSelf.lock();
236 if (!self)
237 {
238 BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
239 logPtr(self.get()));
240 return;
241 }
242
243 if (ec == boost::asio::error::operation_aborted)
244 {
245 BMCWEB_LOG_DEBUG("Timer operation aborted");
246 // Canceled wait means the path succeeded.
247 return;
248 }
249 if (ec)
250 {
251 BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
252 }
253
254 BMCWEB_LOG_WARNING("{} Connection timed out, closing",
255 logPtr(self.get()));
256
257 self->close("closing connection");
258 }
259
260 private:
261 std::array<char, 1> buffer{};
262 boost::beast::multi_buffer inputBuffer;
263
264 Adaptor adaptor;
265
266 using BodyType = bmcweb::HttpBody;
267 boost::beast::http::response<BodyType> res;
268 std::optional<boost::beast::http::response_serializer<BodyType>> serializer;
269 boost::asio::steady_timer timer;
270 bool doingWrite = false;
271
272 std::function<void(Connection&, const Request&)> openHandler;
273 std::function<void(Connection&)> closeHandler;
274};
275} // namespace sse_socket
276} // namespace crow