blob: f4ad21a69f018343bae6611e9649b5dccb5707a7 [file] [log] [blame]
V-Sanjana88ada3b2023-04-13 15:18:31 +05301#pragma once
Ed Tanous6fde95f2023-06-01 07:33:34 -07002#include "dbus_singleton.hpp"
V-Sanjana88ada3b2023-04-13 15:18:31 +05303#include "http_request.hpp"
4#include "http_response.hpp"
5
V-Sanjana88ada3b2023-04-13 15:18:31 +05306#include <boost/asio/buffer.hpp>
7#include <boost/asio/steady_timer.hpp>
8#include <boost/beast/core/multi_buffer.hpp>
9#include <boost/beast/http/buffer_body.hpp>
10#include <boost/beast/websocket.hpp>
11
12#include <array>
13#include <functional>
14
15#ifdef BMCWEB_ENABLE_SSL
16#include <boost/beast/websocket/ssl.hpp>
17#endif
18
19namespace crow
20{
21
22namespace sse_socket
23{
V-Sanjana88ada3b2023-04-13 15:18:31 +053024struct Connection : std::enable_shared_from_this<Connection>
25{
26 public:
Ed Tanous6fde95f2023-06-01 07:33:34 -070027 Connection() = default;
V-Sanjana88ada3b2023-04-13 15:18:31 +053028
29 Connection(const Connection&) = delete;
30 Connection(Connection&&) = delete;
31 Connection& operator=(const Connection&) = delete;
32 Connection& operator=(const Connection&&) = delete;
33 virtual ~Connection() = default;
34
35 virtual boost::asio::io_context& getIoContext() = 0;
V-Sanjana88ada3b2023-04-13 15:18:31 +053036 virtual void close(std::string_view msg = "quit") = 0;
37 virtual void sendEvent(std::string_view id, std::string_view msg) = 0;
V-Sanjana88ada3b2023-04-13 15:18:31 +053038};
39
40template <typename Adaptor>
41class ConnectionImpl : public Connection
42{
43 public:
Ed Tanous6fde95f2023-06-01 07:33:34 -070044 ConnectionImpl(Adaptor&& adaptorIn,
45 std::function<void(Connection&)> openHandlerIn,
46 std::function<void(Connection&)> closeHandlerIn) :
47 adaptor(std::move(adaptorIn)),
48 timer(ioc), openHandler(std::move(openHandlerIn)),
V-Sanjana88ada3b2023-04-13 15:18:31 +053049 closeHandler(std::move(closeHandlerIn))
50 {
Ed Tanous62598e32023-07-17 17:06:25 -070051 BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
V-Sanjana88ada3b2023-04-13 15:18:31 +053052 }
53
54 ConnectionImpl(const ConnectionImpl&) = delete;
55 ConnectionImpl(const ConnectionImpl&&) = delete;
56 ConnectionImpl& operator=(const ConnectionImpl&) = delete;
57 ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
58
59 ~ConnectionImpl() override
60 {
Ed Tanous62598e32023-07-17 17:06:25 -070061 BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
V-Sanjana88ada3b2023-04-13 15:18:31 +053062 }
63
64 boost::asio::io_context& getIoContext() override
65 {
66 return static_cast<boost::asio::io_context&>(
67 adaptor.get_executor().context());
68 }
69
70 void start()
71 {
Ed Tanous6fde95f2023-06-01 07:33:34 -070072 if (!openHandler)
V-Sanjana88ada3b2023-04-13 15:18:31 +053073 {
Ed Tanous62598e32023-07-17 17:06:25 -070074 BMCWEB_LOG_CRITICAL("No open handler???");
Ed Tanous6fde95f2023-06-01 07:33:34 -070075 return;
V-Sanjana88ada3b2023-04-13 15:18:31 +053076 }
Ed Tanous6fde95f2023-06-01 07:33:34 -070077 openHandler(*this);
V-Sanjana88ada3b2023-04-13 15:18:31 +053078 }
79
80 void close(const std::string_view msg) override
81 {
V-Sanjana88ada3b2023-04-13 15:18:31 +053082 // send notification to handler for cleanup
83 if (closeHandler)
84 {
Ed Tanous6fde95f2023-06-01 07:33:34 -070085 closeHandler(*this);
V-Sanjana88ada3b2023-04-13 15:18:31 +053086 }
Ed Tanous62598e32023-07-17 17:06:25 -070087 BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
Ed Tanous6fde95f2023-06-01 07:33:34 -070088 boost::beast::get_lowest_layer(adaptor).close();
V-Sanjana88ada3b2023-04-13 15:18:31 +053089 }
90
Ed Tanous6fde95f2023-06-01 07:33:34 -070091 void sendSSEHeader()
V-Sanjana88ada3b2023-04-13 15:18:31 +053092 {
Ed Tanous62598e32023-07-17 17:06:25 -070093 BMCWEB_LOG_DEBUG("Starting SSE connection");
V-Sanjana88ada3b2023-04-13 15:18:31 +053094 using BodyType = boost::beast::http::buffer_body;
Ed Tanous6fde95f2023-06-01 07:33:34 -070095 boost::beast::http::response<BodyType> res(
96 boost::beast::http::status::ok, 11, BodyType{});
97 res.set(boost::beast::http::field::content_type, "text/event-stream");
98 res.body().more = true;
Ed Tanous8ece0e42024-01-02 13:16:50 -080099 boost::beast::http::response_serializer<BodyType>& serial =
Ed Tanous6fde95f2023-06-01 07:33:34 -0700100 serializer.emplace(std::move(res));
V-Sanjana88ada3b2023-04-13 15:18:31 +0530101
102 boost::beast::http::async_write_header(
Ed Tanous8ece0e42024-01-02 13:16:50 -0800103 adaptor, serial,
V-Sanjana88ada3b2023-04-13 15:18:31 +0530104 std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
105 shared_from_this()));
106 }
107
108 void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
Ed Tanous6fde95f2023-06-01 07:33:34 -0700109 const boost::system::error_code& ec,
110 size_t /*bytesSent*/)
V-Sanjana88ada3b2023-04-13 15:18:31 +0530111 {
Ed Tanous6fde95f2023-06-01 07:33:34 -0700112 serializer.reset();
V-Sanjana88ada3b2023-04-13 15:18:31 +0530113 if (ec)
114 {
Ed Tanous62598e32023-07-17 17:06:25 -0700115 BMCWEB_LOG_ERROR("Error sending header{}", ec);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530116 close("async_write_header failed");
117 return;
118 }
Ed Tanous62598e32023-07-17 17:06:25 -0700119 BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
V-Sanjana88ada3b2023-04-13 15:18:31 +0530120
121 serializer.reset();
122
123 // SSE stream header sent, So let us setup monitor.
124 // Any read data on this stream will be error in case of SSE.
Ed Tanous6fde95f2023-06-01 07:33:34 -0700125
126 adaptor.async_wait(boost::asio::ip::tcp::socket::wait_error,
127 std::bind_front(&ConnectionImpl::afterReadError,
128 this, shared_from_this()));
V-Sanjana88ada3b2023-04-13 15:18:31 +0530129 }
130
Ed Tanous6fde95f2023-06-01 07:33:34 -0700131 void afterReadError(const std::shared_ptr<Connection>& /*self*/,
132 const boost::system::error_code& ec)
V-Sanjana88ada3b2023-04-13 15:18:31 +0530133 {
Ed Tanous6fde95f2023-06-01 07:33:34 -0700134 if (ec == boost::asio::error::operation_aborted)
135 {
136 return;
137 }
V-Sanjana88ada3b2023-04-13 15:18:31 +0530138 if (ec)
139 {
Ed Tanous62598e32023-07-17 17:06:25 -0700140 BMCWEB_LOG_ERROR("Read error: {}", ec);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530141 }
142
Ed Tanous6fde95f2023-06-01 07:33:34 -0700143 close("Close SSE connection");
V-Sanjana88ada3b2023-04-13 15:18:31 +0530144 }
145
146 void doWrite()
147 {
V-Sanjana88ada3b2023-04-13 15:18:31 +0530148 if (doingWrite)
149 {
150 return;
151 }
152 if (inputBuffer.size() == 0)
153 {
Ed Tanous62598e32023-07-17 17:06:25 -0700154 BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
V-Sanjana88ada3b2023-04-13 15:18:31 +0530155 return;
156 }
Ed Tanous6fde95f2023-06-01 07:33:34 -0700157 startTimeout();
V-Sanjana88ada3b2023-04-13 15:18:31 +0530158 doingWrite = true;
159
160 adaptor.async_write_some(
161 inputBuffer.data(),
162 std::bind_front(&ConnectionImpl::doWriteCallback, this,
Ed Tanous6fde95f2023-06-01 07:33:34 -0700163 weak_from_this()));
V-Sanjana88ada3b2023-04-13 15:18:31 +0530164 }
165
Ed Tanous6fde95f2023-06-01 07:33:34 -0700166 void doWriteCallback(const std::weak_ptr<Connection>& weak,
V-Sanjana88ada3b2023-04-13 15:18:31 +0530167 const boost::beast::error_code& ec,
Ed Tanous6fde95f2023-06-01 07:33:34 -0700168 size_t bytesTransferred)
V-Sanjana88ada3b2023-04-13 15:18:31 +0530169 {
Ed Tanous6fde95f2023-06-01 07:33:34 -0700170 auto self = weak.lock();
171 if (self == nullptr)
172 {
173 return;
174 }
175 timer.cancel();
V-Sanjana88ada3b2023-04-13 15:18:31 +0530176 doingWrite = false;
177 inputBuffer.consume(bytesTransferred);
178
179 if (ec == boost::asio::error::eof)
180 {
Ed Tanous62598e32023-07-17 17:06:25 -0700181 BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
V-Sanjana88ada3b2023-04-13 15:18:31 +0530182 close("SSE stream closed");
183 return;
184 }
185
186 if (ec)
187 {
Ed Tanous62598e32023-07-17 17:06:25 -0700188 BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
V-Sanjana88ada3b2023-04-13 15:18:31 +0530189 close("async_write_some failed");
190 return;
191 }
Ed Tanous62598e32023-07-17 17:06:25 -0700192 BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
193 bytesTransferred);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530194
195 doWrite();
196 }
197
V-Sanjana88ada3b2023-04-13 15:18:31 +0530198 void sendEvent(std::string_view id, std::string_view msg) override
199 {
200 if (msg.empty())
201 {
Ed Tanous62598e32023-07-17 17:06:25 -0700202 BMCWEB_LOG_DEBUG("Empty data, bailing out.");
V-Sanjana88ada3b2023-04-13 15:18:31 +0530203 return;
204 }
205
Ed Tanous6fde95f2023-06-01 07:33:34 -0700206 dataFormat(id, msg);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530207
208 doWrite();
209 }
210
Ed Tanous6fde95f2023-06-01 07:33:34 -0700211 void dataFormat(std::string_view id, std::string_view msg)
V-Sanjana88ada3b2023-04-13 15:18:31 +0530212 {
V-Sanjana88ada3b2023-04-13 15:18:31 +0530213 std::string rawData;
214 if (!id.empty())
215 {
216 rawData += "id: ";
Ed Tanous6fde95f2023-06-01 07:33:34 -0700217 rawData.append(id);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530218 rawData += "\n";
219 }
220
221 rawData += "data: ";
222 for (char character : msg)
223 {
224 rawData += character;
225 if (character == '\n')
226 {
227 rawData += "data: ";
228 }
229 }
230 rawData += "\n\n";
231
232 boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()),
233 boost::asio::buffer(rawData));
234 inputBuffer.commit(rawData.size());
235 }
236
Ed Tanous6fde95f2023-06-01 07:33:34 -0700237 void startTimeout()
V-Sanjana88ada3b2023-04-13 15:18:31 +0530238 {
V-Sanjana88ada3b2023-04-13 15:18:31 +0530239 std::weak_ptr<Connection> weakSelf = weak_from_this();
240 timer.expires_after(std::chrono::seconds(30));
241 timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
242 this, weak_from_this()));
243 }
244
245 void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
Ed Tanous6fde95f2023-06-01 07:33:34 -0700246 const boost::system::error_code& ec)
V-Sanjana88ada3b2023-04-13 15:18:31 +0530247 {
248 std::shared_ptr<Connection> self = weakSelf.lock();
249 if (!self)
250 {
Ed Tanous62598e32023-07-17 17:06:25 -0700251 BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
252 logPtr(self.get()));
V-Sanjana88ada3b2023-04-13 15:18:31 +0530253 return;
254 }
255
256 if (ec == boost::asio::error::operation_aborted)
257 {
Ed Tanous62598e32023-07-17 17:06:25 -0700258 BMCWEB_LOG_DEBUG("operation aborted");
Ed Tanous8ece0e42024-01-02 13:16:50 -0800259 // Canceled wait means the path succeeded.
V-Sanjana88ada3b2023-04-13 15:18:31 +0530260 return;
261 }
262 if (ec)
263 {
Ed Tanous62598e32023-07-17 17:06:25 -0700264 BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530265 }
266
Ed Tanous62598e32023-07-17 17:06:25 -0700267 BMCWEB_LOG_WARNING("{}Connection timed out, closing",
268 logPtr(self.get()));
V-Sanjana88ada3b2023-04-13 15:18:31 +0530269
270 self->close("closing connection");
271 }
272
273 private:
274 Adaptor adaptor;
275
V-Sanjana88ada3b2023-04-13 15:18:31 +0530276 boost::beast::multi_buffer inputBuffer;
277
278 std::optional<boost::beast::http::response_serializer<
Ed Tanous6fde95f2023-06-01 07:33:34 -0700279 boost::beast::http::buffer_body>>
V-Sanjana88ada3b2023-04-13 15:18:31 +0530280 serializer;
281 boost::asio::io_context& ioc =
282 crow::connections::systemBus->get_io_context();
Ed Tanous6fde95f2023-06-01 07:33:34 -0700283 boost::asio::steady_timer timer;
V-Sanjana88ada3b2023-04-13 15:18:31 +0530284 bool doingWrite = false;
V-Sanjana88ada3b2023-04-13 15:18:31 +0530285
Ed Tanous6fde95f2023-06-01 07:33:34 -0700286 std::function<void(Connection&)> openHandler;
287 std::function<void(Connection&)> closeHandler;
V-Sanjana88ada3b2023-04-13 15:18:31 +0530288};
289} // namespace sse_socket
290} // namespace crow