blob: 773394836f7810b3cf54e3d42c37619f48b6003c [file] [log] [blame]
V-Sanjana88ada3b2023-04-13 15:18:31 +05301#pragma once
Ed Tanous6fde95f2023-06-01 07:33:34 -07002#include "dbus_singleton.hpp"
V-Sanjana88ada3b2023-04-13 15:18:31 +05303#include "http_request.hpp"
4#include "http_response.hpp"
5
6#include <boost/algorithm/string/predicate.hpp>
7#include <boost/asio/buffer.hpp>
8#include <boost/asio/steady_timer.hpp>
9#include <boost/beast/core/multi_buffer.hpp>
10#include <boost/beast/http/buffer_body.hpp>
11#include <boost/beast/websocket.hpp>
12
13#include <array>
14#include <functional>
15
16#ifdef BMCWEB_ENABLE_SSL
17#include <boost/beast/websocket/ssl.hpp>
18#endif
19
20namespace crow
21{
22
23namespace sse_socket
24{
V-Sanjana88ada3b2023-04-13 15:18:31 +053025struct Connection : std::enable_shared_from_this<Connection>
26{
27 public:
Ed Tanous6fde95f2023-06-01 07:33:34 -070028 Connection() = default;
V-Sanjana88ada3b2023-04-13 15:18:31 +053029
30 Connection(const Connection&) = delete;
31 Connection(Connection&&) = delete;
32 Connection& operator=(const Connection&) = delete;
33 Connection& operator=(const Connection&&) = delete;
34 virtual ~Connection() = default;
35
36 virtual boost::asio::io_context& getIoContext() = 0;
V-Sanjana88ada3b2023-04-13 15:18:31 +053037 virtual void close(std::string_view msg = "quit") = 0;
38 virtual void sendEvent(std::string_view id, std::string_view msg) = 0;
V-Sanjana88ada3b2023-04-13 15:18:31 +053039};
40
41template <typename Adaptor>
42class ConnectionImpl : public Connection
43{
44 public:
Ed Tanous6fde95f2023-06-01 07:33:34 -070045 ConnectionImpl(Adaptor&& adaptorIn,
46 std::function<void(Connection&)> openHandlerIn,
47 std::function<void(Connection&)> closeHandlerIn) :
48 adaptor(std::move(adaptorIn)),
49 timer(ioc), openHandler(std::move(openHandlerIn)),
V-Sanjana88ada3b2023-04-13 15:18:31 +053050 closeHandler(std::move(closeHandlerIn))
51 {
Ed Tanous62598e32023-07-17 17:06:25 -070052 BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this));
V-Sanjana88ada3b2023-04-13 15:18:31 +053053 }
54
55 ConnectionImpl(const ConnectionImpl&) = delete;
56 ConnectionImpl(const ConnectionImpl&&) = delete;
57 ConnectionImpl& operator=(const ConnectionImpl&) = delete;
58 ConnectionImpl& operator=(const ConnectionImpl&&) = delete;
59
60 ~ConnectionImpl() override
61 {
Ed Tanous62598e32023-07-17 17:06:25 -070062 BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this));
V-Sanjana88ada3b2023-04-13 15:18:31 +053063 }
64
65 boost::asio::io_context& getIoContext() override
66 {
67 return static_cast<boost::asio::io_context&>(
68 adaptor.get_executor().context());
69 }
70
71 void start()
72 {
Ed Tanous6fde95f2023-06-01 07:33:34 -070073 if (!openHandler)
V-Sanjana88ada3b2023-04-13 15:18:31 +053074 {
Ed Tanous62598e32023-07-17 17:06:25 -070075 BMCWEB_LOG_CRITICAL("No open handler???");
Ed Tanous6fde95f2023-06-01 07:33:34 -070076 return;
V-Sanjana88ada3b2023-04-13 15:18:31 +053077 }
Ed Tanous6fde95f2023-06-01 07:33:34 -070078 openHandler(*this);
V-Sanjana88ada3b2023-04-13 15:18:31 +053079 }
80
81 void close(const std::string_view msg) override
82 {
V-Sanjana88ada3b2023-04-13 15:18:31 +053083 // send notification to handler for cleanup
84 if (closeHandler)
85 {
Ed Tanous6fde95f2023-06-01 07:33:34 -070086 closeHandler(*this);
V-Sanjana88ada3b2023-04-13 15:18:31 +053087 }
Ed Tanous62598e32023-07-17 17:06:25 -070088 BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg);
Ed Tanous6fde95f2023-06-01 07:33:34 -070089 boost::beast::get_lowest_layer(adaptor).close();
V-Sanjana88ada3b2023-04-13 15:18:31 +053090 }
91
Ed Tanous6fde95f2023-06-01 07:33:34 -070092 void sendSSEHeader()
V-Sanjana88ada3b2023-04-13 15:18:31 +053093 {
Ed Tanous62598e32023-07-17 17:06:25 -070094 BMCWEB_LOG_DEBUG("Starting SSE connection");
V-Sanjana88ada3b2023-04-13 15:18:31 +053095 using BodyType = boost::beast::http::buffer_body;
Ed Tanous6fde95f2023-06-01 07:33:34 -070096 boost::beast::http::response<BodyType> res(
97 boost::beast::http::status::ok, 11, BodyType{});
98 res.set(boost::beast::http::field::content_type, "text/event-stream");
99 res.body().more = true;
100 boost::beast::http::response_serializer<BodyType>& ser =
101 serializer.emplace(std::move(res));
V-Sanjana88ada3b2023-04-13 15:18:31 +0530102
103 boost::beast::http::async_write_header(
Ed Tanous6fde95f2023-06-01 07:33:34 -0700104 adaptor, ser,
V-Sanjana88ada3b2023-04-13 15:18:31 +0530105 std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this,
106 shared_from_this()));
107 }
108
109 void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/,
Ed Tanous6fde95f2023-06-01 07:33:34 -0700110 const boost::system::error_code& ec,
111 size_t /*bytesSent*/)
V-Sanjana88ada3b2023-04-13 15:18:31 +0530112 {
Ed Tanous6fde95f2023-06-01 07:33:34 -0700113 serializer.reset();
V-Sanjana88ada3b2023-04-13 15:18:31 +0530114 if (ec)
115 {
Ed Tanous62598e32023-07-17 17:06:25 -0700116 BMCWEB_LOG_ERROR("Error sending header{}", ec);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530117 close("async_write_header failed");
118 return;
119 }
Ed Tanous62598e32023-07-17 17:06:25 -0700120 BMCWEB_LOG_DEBUG("SSE header sent - Connection established");
V-Sanjana88ada3b2023-04-13 15:18:31 +0530121
122 serializer.reset();
123
124 // SSE stream header sent, So let us setup monitor.
125 // Any read data on this stream will be error in case of SSE.
Ed Tanous6fde95f2023-06-01 07:33:34 -0700126
127 adaptor.async_wait(boost::asio::ip::tcp::socket::wait_error,
128 std::bind_front(&ConnectionImpl::afterReadError,
129 this, shared_from_this()));
V-Sanjana88ada3b2023-04-13 15:18:31 +0530130 }
131
Ed Tanous6fde95f2023-06-01 07:33:34 -0700132 void afterReadError(const std::shared_ptr<Connection>& /*self*/,
133 const boost::system::error_code& ec)
V-Sanjana88ada3b2023-04-13 15:18:31 +0530134 {
Ed Tanous6fde95f2023-06-01 07:33:34 -0700135 if (ec == boost::asio::error::operation_aborted)
136 {
137 return;
138 }
V-Sanjana88ada3b2023-04-13 15:18:31 +0530139 if (ec)
140 {
Ed Tanous62598e32023-07-17 17:06:25 -0700141 BMCWEB_LOG_ERROR("Read error: {}", ec);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530142 }
143
Ed Tanous6fde95f2023-06-01 07:33:34 -0700144 close("Close SSE connection");
V-Sanjana88ada3b2023-04-13 15:18:31 +0530145 }
146
147 void doWrite()
148 {
V-Sanjana88ada3b2023-04-13 15:18:31 +0530149 if (doingWrite)
150 {
151 return;
152 }
153 if (inputBuffer.size() == 0)
154 {
Ed Tanous62598e32023-07-17 17:06:25 -0700155 BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out");
V-Sanjana88ada3b2023-04-13 15:18:31 +0530156 return;
157 }
Ed Tanous6fde95f2023-06-01 07:33:34 -0700158 startTimeout();
V-Sanjana88ada3b2023-04-13 15:18:31 +0530159 doingWrite = true;
160
161 adaptor.async_write_some(
162 inputBuffer.data(),
163 std::bind_front(&ConnectionImpl::doWriteCallback, this,
Ed Tanous6fde95f2023-06-01 07:33:34 -0700164 weak_from_this()));
V-Sanjana88ada3b2023-04-13 15:18:31 +0530165 }
166
Ed Tanous6fde95f2023-06-01 07:33:34 -0700167 void doWriteCallback(const std::weak_ptr<Connection>& weak,
V-Sanjana88ada3b2023-04-13 15:18:31 +0530168 const boost::beast::error_code& ec,
Ed Tanous6fde95f2023-06-01 07:33:34 -0700169 size_t bytesTransferred)
V-Sanjana88ada3b2023-04-13 15:18:31 +0530170 {
Ed Tanous6fde95f2023-06-01 07:33:34 -0700171 auto self = weak.lock();
172 if (self == nullptr)
173 {
174 return;
175 }
176 timer.cancel();
V-Sanjana88ada3b2023-04-13 15:18:31 +0530177 doingWrite = false;
178 inputBuffer.consume(bytesTransferred);
179
180 if (ec == boost::asio::error::eof)
181 {
Ed Tanous62598e32023-07-17 17:06:25 -0700182 BMCWEB_LOG_ERROR("async_write_some() SSE stream closed");
V-Sanjana88ada3b2023-04-13 15:18:31 +0530183 close("SSE stream closed");
184 return;
185 }
186
187 if (ec)
188 {
Ed Tanous62598e32023-07-17 17:06:25 -0700189 BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message());
V-Sanjana88ada3b2023-04-13 15:18:31 +0530190 close("async_write_some failed");
191 return;
192 }
Ed Tanous62598e32023-07-17 17:06:25 -0700193 BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}",
194 bytesTransferred);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530195
196 doWrite();
197 }
198
V-Sanjana88ada3b2023-04-13 15:18:31 +0530199 void sendEvent(std::string_view id, std::string_view msg) override
200 {
201 if (msg.empty())
202 {
Ed Tanous62598e32023-07-17 17:06:25 -0700203 BMCWEB_LOG_DEBUG("Empty data, bailing out.");
V-Sanjana88ada3b2023-04-13 15:18:31 +0530204 return;
205 }
206
Ed Tanous6fde95f2023-06-01 07:33:34 -0700207 dataFormat(id, msg);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530208
209 doWrite();
210 }
211
Ed Tanous6fde95f2023-06-01 07:33:34 -0700212 void dataFormat(std::string_view id, std::string_view msg)
V-Sanjana88ada3b2023-04-13 15:18:31 +0530213 {
V-Sanjana88ada3b2023-04-13 15:18:31 +0530214 std::string rawData;
215 if (!id.empty())
216 {
217 rawData += "id: ";
Ed Tanous6fde95f2023-06-01 07:33:34 -0700218 rawData.append(id);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530219 rawData += "\n";
220 }
221
222 rawData += "data: ";
223 for (char character : msg)
224 {
225 rawData += character;
226 if (character == '\n')
227 {
228 rawData += "data: ";
229 }
230 }
231 rawData += "\n\n";
232
233 boost::asio::buffer_copy(inputBuffer.prepare(rawData.size()),
234 boost::asio::buffer(rawData));
235 inputBuffer.commit(rawData.size());
236 }
237
Ed Tanous6fde95f2023-06-01 07:33:34 -0700238 void startTimeout()
V-Sanjana88ada3b2023-04-13 15:18:31 +0530239 {
V-Sanjana88ada3b2023-04-13 15:18:31 +0530240 std::weak_ptr<Connection> weakSelf = weak_from_this();
241 timer.expires_after(std::chrono::seconds(30));
242 timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback,
243 this, weak_from_this()));
244 }
245
246 void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf,
Ed Tanous6fde95f2023-06-01 07:33:34 -0700247 const boost::system::error_code& ec)
V-Sanjana88ada3b2023-04-13 15:18:31 +0530248 {
249 std::shared_ptr<Connection> self = weakSelf.lock();
250 if (!self)
251 {
Ed Tanous62598e32023-07-17 17:06:25 -0700252 BMCWEB_LOG_CRITICAL("{} Failed to capture connection",
253 logPtr(self.get()));
V-Sanjana88ada3b2023-04-13 15:18:31 +0530254 return;
255 }
256
257 if (ec == boost::asio::error::operation_aborted)
258 {
Ed Tanous62598e32023-07-17 17:06:25 -0700259 BMCWEB_LOG_DEBUG("operation aborted");
V-Sanjana88ada3b2023-04-13 15:18:31 +0530260 // Canceled wait means the path succeeeded.
261 return;
262 }
263 if (ec)
264 {
Ed Tanous62598e32023-07-17 17:06:25 -0700265 BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec);
V-Sanjana88ada3b2023-04-13 15:18:31 +0530266 }
267
Ed Tanous62598e32023-07-17 17:06:25 -0700268 BMCWEB_LOG_WARNING("{}Connection timed out, closing",
269 logPtr(self.get()));
V-Sanjana88ada3b2023-04-13 15:18:31 +0530270
271 self->close("closing connection");
272 }
273
274 private:
275 Adaptor adaptor;
276
V-Sanjana88ada3b2023-04-13 15:18:31 +0530277 boost::beast::multi_buffer inputBuffer;
278
279 std::optional<boost::beast::http::response_serializer<
Ed Tanous6fde95f2023-06-01 07:33:34 -0700280 boost::beast::http::buffer_body>>
V-Sanjana88ada3b2023-04-13 15:18:31 +0530281 serializer;
282 boost::asio::io_context& ioc =
283 crow::connections::systemBus->get_io_context();
Ed Tanous6fde95f2023-06-01 07:33:34 -0700284 boost::asio::steady_timer timer;
V-Sanjana88ada3b2023-04-13 15:18:31 +0530285 bool doingWrite = false;
V-Sanjana88ada3b2023-04-13 15:18:31 +0530286
Ed Tanous6fde95f2023-06-01 07:33:34 -0700287 std::function<void(Connection&)> openHandler;
288 std::function<void(Connection&)> closeHandler;
V-Sanjana88ada3b2023-04-13 15:18:31 +0530289};
290} // namespace sse_socket
291} // namespace crow