blob: 58659c84a025ccf8331516b77fa710ccbda2e427 [file] [log] [blame]
V-Sanjana88ada3b2023-04-13 15:18:31 +05301#pragma once
2#include "async_resolve.hpp"
3#include "async_resp.hpp"
4#include "http_request.hpp"
5#include "http_response.hpp"
6
7#include <boost/algorithm/string/predicate.hpp>
8#include <boost/asio/buffer.hpp>
9#include <boost/asio/steady_timer.hpp>
10#include <boost/beast/core/multi_buffer.hpp>
11#include <boost/beast/http/buffer_body.hpp>
12#include <boost/beast/websocket.hpp>
13
14#include <array>
15#include <functional>
16
17#ifdef BMCWEB_ENABLE_SSL
18#include <boost/beast/websocket/ssl.hpp>
19#endif
20
21namespace crow
22{
23
24namespace sse_socket
25{
26static constexpr const std::array<const char*, 1> sseRoutes = {
27 "/redfish/v1/EventService/SSE"};
28
29struct Connection : std::enable_shared_from_this<Connection>
30{
31 public:
32 explicit Connection(const crow::Request& reqIn) : req(reqIn) {}
33
34 Connection(const Connection&) = delete;
35 Connection(Connection&&) = delete;
36 Connection& operator=(const Connection&) = delete;
37 Connection& operator=(const Connection&&) = delete;
38 virtual ~Connection() = default;
39
40 virtual boost::asio::io_context& getIoContext() = 0;
41 virtual void sendSSEHeader() = 0;
42 virtual void completeRequest(crow::Response& thisRes) = 0;
43 virtual void close(std::string_view msg = "quit") = 0;
44 virtual void sendEvent(std::string_view id, std::string_view msg) = 0;
45
46 crow::Request req;
47};
48
49template <typename Adaptor>
50class ConnectionImpl : public Connection
51{
52 public:
53 ConnectionImpl(
54 const crow::Request& reqIn, Adaptor adaptorIn,
55 std::function<void(std::shared_ptr<Connection>&, const crow::Request&,
56 const std::shared_ptr<bmcweb::AsyncResp>&)>
57 openHandlerIn,
58 std::function<void(std::shared_ptr<Connection>&)> closeHandlerIn) :
59 Connection(reqIn),
60 adaptor(std::move(adaptorIn)), openHandler(std::move(openHandlerIn)),
61 closeHandler(std::move(closeHandlerIn))
62 {
63 BMCWEB_LOG_DEBUG << "SseConnectionImpl: SSE constructor " << this;
64 }
65
66 ConnectionImpl(const ConnectionImpl&) = delete;
67 ConnectionImpl(const ConnectionImpl&&) = delete;
68 ConnectionImpl& operator=(const ConnectionImpl&) = delete;
69 ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
70
71 ~ConnectionImpl() override
72 {
73 BMCWEB_LOG_DEBUG << "SSE ConnectionImpl: SSE destructor " << this;
74 }
75
76 boost::asio::io_context& getIoContext() override
77 {
78 return static_cast<boost::asio::io_context&>(
79 adaptor.get_executor().context());
80 }
81
82 void start()
83 {
84 if (openHandler)
85 {
86 auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
87 std::shared_ptr<Connection> self = this->shared_from_this();
88
89 asyncResp->res.setCompleteRequestHandler(
90 [self(shared_from_this())](crow::Response& thisRes) {
91 if (thisRes.resultInt() != 200)
92 {
93 self->completeRequest(thisRes);
94 }
95 });
96
97 openHandler(self, req, asyncResp);
98 }
99 }
100
101 void close(const std::string_view msg) override
102 {
103 BMCWEB_LOG_DEBUG << "Closing SSE connection " << this << " - " << msg;
104 boost::beast::get_lowest_layer(adaptor).close();
105
106 // send notification to handler for cleanup
107 if (closeHandler)
108 {
109 std::shared_ptr<Connection> self = shared_from_this();
110 closeHandler(self);
111 }
112 }
113
114 void sendSSEHeader() override
115 {
116 BMCWEB_LOG_DEBUG << "Starting SSE connection";
117 auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
118 using BodyType = boost::beast::http::buffer_body;
119 auto response =
120 std::make_shared<boost::beast::http::response<BodyType>>(
121 boost::beast::http::status::ok, 11);
122
123 serializer.emplace(*asyncResp->res.stringResponse);
124
125 response->set(boost::beast::http::field::content_type,
126 "text/event-stream");
127 response->body().more = true;
128
129 boost::beast::http::async_write_header(
130 adaptor, *serializer,
131 std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
132 shared_from_this()));
133 }
134
135 void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
136 const boost::beast::error_code& ec,
137 const std::size_t& /*unused*/)
138 {
139 if (ec)
140 {
141 BMCWEB_LOG_ERROR << "Error sending header" << ec;
142 close("async_write_header failed");
143 return;
144 }
145 BMCWEB_LOG_DEBUG << "SSE header sent - Connection established";
146
147 serializer.reset();
148
149 // SSE stream header sent, So let us setup monitor.
150 // Any read data on this stream will be error in case of SSE.
151 setupRead();
152 }
153
154 void setupRead()
155 {
156 std::weak_ptr<Connection> weakSelf = weak_from_this();
157
158 boost::beast::http::async_read_some(
159 adaptor, outputBuffer, *parser,
160 std::bind_front(&ConnectionImpl::setupReadCallback, this,
161 weak_from_this()));
162 }
163
164 void setupReadCallback(const std::weak_ptr<Connection>& weakSelf,
165 const boost::system::error_code& ec,
166 size_t bytesRead)
167 {
168 std::shared_ptr<Connection> self = weakSelf.lock();
169 BMCWEB_LOG_DEBUG << "async_read_some: Read " << bytesRead << " bytes";
170 if (ec)
171 {
172 BMCWEB_LOG_ERROR << "Read error: " << ec;
173 }
174
175 // After establishing SSE stream, Reading data on this
176 // stream means client is disobeys the SSE protocol.
177 // Read the data to avoid buffer attacks and close connection.
178
179 self->close("Close SSE connection");
180 }
181
182 void doWrite()
183 {
184 onTimeout();
185
186 if (doingWrite)
187 {
188 return;
189 }
190 if (inputBuffer.size() == 0)
191 {
192 BMCWEB_LOG_DEBUG << "inputBuffer is empty... Bailing out";
193 return;
194 }
195 doingWrite = true;
196
197 adaptor.async_write_some(
198 inputBuffer.data(),
199 std::bind_front(&ConnectionImpl::doWriteCallback, this,
200 shared_from_this()));
201 }
202
203 void doWriteCallback(const std::shared_ptr<Connection>& /*self*/,
204 const boost::beast::error_code& ec,
205 const size_t bytesTransferred)
206 {
207 doingWrite = false;
208 inputBuffer.consume(bytesTransferred);
209
210 if (ec == boost::asio::error::eof)
211 {
212 BMCWEB_LOG_ERROR << "async_write_some() SSE stream closed";
213 close("SSE stream closed");
214 return;
215 }
216
217 if (ec)
218 {
219 BMCWEB_LOG_ERROR << "async_write_some() failed: " << ec.message();
220 close("async_write_some failed");
221 return;
222 }
223 BMCWEB_LOG_DEBUG << "async_write_some() bytes transferred: "
224 << bytesTransferred;
225
226 doWrite();
227 }
228
229 void completeRequest(crow::Response& thisRes) override
230 {
231 auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
232 asyncResp->res = std::move(thisRes);
233
234 if (asyncResp->res.body().empty() && !asyncResp->res.jsonValue.empty())
235 {
236 asyncResp->res.addHeader(boost::beast::http::field::content_type,
237 "application/json");
238 asyncResp->res.body() = asyncResp->res.jsonValue.dump(
239 2, ' ', true, nlohmann::json::error_handler_t::replace);
240 }
241
242 asyncResp->res.preparePayload();
243
244 serializer.emplace(*asyncResp->res.stringResponse);
245
246 boost::beast::http::async_write_some(
247 adaptor, *serializer,
248 std::bind_front(&ConnectionImpl::completeRequestCallback, this,
249 shared_from_this()));
250 }
251
252 void completeRequestCallback(const std::shared_ptr<Connection>& /*self*/,
253 const boost::system::error_code& ec,
254 std::size_t bytesTransferred)
255 {
256 auto asyncResp = std::make_shared<bmcweb::AsyncResp>();
257 BMCWEB_LOG_DEBUG << this << " async_write " << bytesTransferred
258 << " bytes";
259 if (ec)
260 {
261 BMCWEB_LOG_DEBUG << this << " from async_write failed";
262 return;
263 }
264
265 BMCWEB_LOG_DEBUG << this << " Closing SSE connection - Request invalid";
266 serializer.reset();
267 close("Request invalid");
268 asyncResp->res.releaseCompleteRequestHandler();
269 }
270
271 void sendEvent(std::string_view id, std::string_view msg) override
272 {
273 if (msg.empty())
274 {
275 BMCWEB_LOG_DEBUG << "Empty data, bailing out.";
276 return;
277 }
278
279 dataFormat(id);
280
281 doWrite();
282 }
283
284 void dataFormat(std::string_view id)
285 {
286 std::string_view msg;
287 std::string rawData;
288 if (!id.empty())
289 {
290 rawData += "id: ";
291 rawData.append(id.begin(), id.end());
292 rawData += "\n";
293 }
294
295 rawData += "data: ";
296 for (char character : msg)
297 {
298 rawData += character;
299 if (character == '\n')
300 {
301 rawData += "data: ";
302 }
303 }
304 rawData += "\n\n";
305
306 boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()),
307 boost::asio::buffer(rawData));
308 inputBuffer.commit(rawData.size());
309 }
310
311 void onTimeout()
312 {
313 boost::asio::steady_timer timer(ioc);
314 std::weak_ptr<Connection> weakSelf = weak_from_this();
315 timer.expires_after(std::chrono::seconds(30));
316 timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
317 this, weak_from_this()));
318 }
319
320 void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
321 const boost::system::error_code ec)
322 {
323 std::shared_ptr<Connection> self = weakSelf.lock();
324 if (!self)
325 {
326 BMCWEB_LOG_CRITICAL << self << " Failed to capture connection";
327 return;
328 }
329
330 if (ec == boost::asio::error::operation_aborted)
331 {
332 BMCWEB_LOG_DEBUG << "operation aborted";
333 // Canceled wait means the path succeeeded.
334 return;
335 }
336 if (ec)
337 {
338 BMCWEB_LOG_CRITICAL << self << " timer failed " << ec;
339 }
340
341 BMCWEB_LOG_WARNING << self << "Connection timed out, closing";
342
343 self->close("closing connection");
344 }
345
346 private:
347 Adaptor adaptor;
348
349 boost::beast::multi_buffer outputBuffer;
350 boost::beast::multi_buffer inputBuffer;
351
352 std::optional<boost::beast::http::response_serializer<
353 boost::beast::http::string_body>>
354 serializer;
355 boost::asio::io_context& ioc =
356 crow::connections::systemBus->get_io_context();
357 bool doingWrite = false;
358 std::optional<
359 boost::beast::http::request_parser<boost::beast::http::string_body>>
360 parser;
361
362 std::function<void(std::shared_ptr<Connection>&, const crow::Request&,
363 const std::shared_ptr<bmcweb::AsyncResp>&)>
364 openHandler;
365 std::function<void(std::shared_ptr<Connection>&)> closeHandler;
366};
367} // namespace sse_socket
368} // namespace crow