| Ed Tanous | e60300a | 2025-02-23 12:31:53 -0800 | [diff] [blame] | 1 | // SPDX-License-Identifier: Apache-2.0 | 
|  | 2 | // SPDX-FileCopyrightText: Copyright OpenBMC Authors | 
|  | 3 | #pragma once | 
|  | 4 | #include "boost_formatters.hpp" | 
|  | 5 | #include "http_body.hpp" | 
|  | 6 | #include "http_request.hpp" | 
|  | 7 | #include "io_context_singleton.hpp" | 
|  | 8 | #include "logging.hpp" | 
|  | 9 | #include "server_sent_event.hpp" | 
|  | 10 |  | 
|  | 11 | #include <boost/asio/buffer.hpp> | 
|  | 12 | #include <boost/asio/error.hpp> | 
|  | 13 | #include <boost/asio/steady_timer.hpp> | 
|  | 14 | #include <boost/beast/core/error.hpp> | 
|  | 15 | #include <boost/beast/core/multi_buffer.hpp> | 
|  | 16 | #include <boost/beast/http/field.hpp> | 
|  | 17 | #include <boost/beast/http/serializer.hpp> | 
|  | 18 | #include <boost/beast/http/write.hpp> | 
|  | 19 |  | 
|  | 20 | #include <array> | 
|  | 21 | #include <chrono> | 
|  | 22 | #include <cstddef> | 
|  | 23 | #include <functional> | 
|  | 24 | #include <memory> | 
|  | 25 | #include <optional> | 
|  | 26 | #include <string> | 
|  | 27 | #include <string_view> | 
|  | 28 | #include <utility> | 
|  | 29 |  | 
|  | 30 | namespace crow | 
|  | 31 | { | 
|  | 32 |  | 
|  | 33 | namespace sse_socket | 
|  | 34 | { | 
|  | 35 |  | 
|  | 36 | template <typename Adaptor> | 
|  | 37 | class ConnectionImpl : public Connection | 
|  | 38 | { | 
|  | 39 | public: | 
|  | 40 | ConnectionImpl( | 
|  | 41 | Adaptor&& adaptorIn, | 
|  | 42 | std::function<void(Connection&, const Request&)> openHandlerIn, | 
|  | 43 | std::function<void(Connection&)> closeHandlerIn) : | 
|  | 44 | adaptor(std::move(adaptorIn)), timer(getIoContext()), | 
|  | 45 | openHandler(std::move(openHandlerIn)), | 
|  | 46 | closeHandler(std::move(closeHandlerIn)) | 
|  | 47 |  | 
|  | 48 | { | 
|  | 49 | BMCWEB_LOG_DEBUG("SseConnectionImpl: SSE constructor {}", logPtr(this)); | 
|  | 50 | } | 
|  | 51 |  | 
|  | 52 | ConnectionImpl(const ConnectionImpl&) = delete; | 
|  | 53 | ConnectionImpl(const ConnectionImpl&&) = delete; | 
|  | 54 | ConnectionImpl& operator=(const ConnectionImpl&) = delete; | 
|  | 55 | ConnectionImpl& operator=(const ConnectionImpl&&) = delete; | 
|  | 56 |  | 
|  | 57 | ~ConnectionImpl() override | 
|  | 58 | { | 
|  | 59 | BMCWEB_LOG_DEBUG("SSE ConnectionImpl: SSE destructor {}", logPtr(this)); | 
|  | 60 | } | 
|  | 61 |  | 
|  | 62 | void start(const Request& req) | 
|  | 63 | { | 
|  | 64 | BMCWEB_LOG_DEBUG("Starting SSE connection"); | 
|  | 65 |  | 
|  | 66 | res.set(boost::beast::http::field::content_type, "text/event-stream"); | 
|  | 67 | boost::beast::http::response_serializer<BodyType>& serial = | 
|  | 68 | serializer.emplace(res); | 
|  | 69 |  | 
|  | 70 | boost::beast::http::async_write_header( | 
|  | 71 | adaptor, serial, | 
|  | 72 | std::bind_front(&ConnectionImpl::sendSSEHeaderCallback, this, | 
| Ed Tanous | 608ad2c | 2024-05-20 19:14:50 -0700 | [diff] [blame] | 73 | shared_from_this(), req.copy())); | 
| Ed Tanous | e60300a | 2025-02-23 12:31:53 -0800 | [diff] [blame] | 74 | } | 
|  | 75 |  | 
|  | 76 | void close(const std::string_view msg) override | 
|  | 77 | { | 
|  | 78 | BMCWEB_LOG_DEBUG("Closing connection with reason {}", msg); | 
|  | 79 | // send notification to handler for cleanup | 
|  | 80 | if (closeHandler) | 
|  | 81 | { | 
|  | 82 | closeHandler(*this); | 
|  | 83 | } | 
|  | 84 | BMCWEB_LOG_DEBUG("Closing SSE connection {} - {}", logPtr(this), msg); | 
|  | 85 | boost::beast::get_lowest_layer(adaptor).close(); | 
|  | 86 | } | 
|  | 87 |  | 
|  | 88 | void sendSSEHeaderCallback(const std::shared_ptr<Connection>& /*self*/, | 
|  | 89 | const Request& req, | 
|  | 90 | const boost::system::error_code& ec, | 
|  | 91 | size_t /*bytesSent*/) | 
|  | 92 | { | 
|  | 93 | serializer.reset(); | 
|  | 94 | if (ec) | 
|  | 95 | { | 
|  | 96 | BMCWEB_LOG_ERROR("Error sending header{}", ec); | 
|  | 97 | close("async_write_header failed"); | 
|  | 98 | return; | 
|  | 99 | } | 
|  | 100 | BMCWEB_LOG_DEBUG("SSE header sent - Connection established"); | 
|  | 101 | if (!openHandler) | 
|  | 102 | { | 
|  | 103 | BMCWEB_LOG_CRITICAL("No open handler???"); | 
|  | 104 | return; | 
|  | 105 | } | 
|  | 106 | openHandler(*this, req); | 
|  | 107 |  | 
|  | 108 | // SSE stream header sent, So let us setup monitor. | 
|  | 109 | // Any read data on this stream will be error in case of SSE. | 
|  | 110 | adaptor.async_read_some(boost::asio::buffer(buffer), | 
|  | 111 | std::bind_front(&ConnectionImpl::afterReadError, | 
|  | 112 | this, shared_from_this())); | 
|  | 113 | } | 
|  | 114 |  | 
|  | 115 | void afterReadError(const std::shared_ptr<Connection>& /*self*/, | 
|  | 116 | const boost::system::error_code& ec, size_t bytesRead) | 
|  | 117 | { | 
|  | 118 | BMCWEB_LOG_DEBUG("Read {}", bytesRead); | 
|  | 119 | if (ec == boost::asio::error::operation_aborted) | 
|  | 120 | { | 
|  | 121 | return; | 
|  | 122 | } | 
|  | 123 | if (ec) | 
|  | 124 | { | 
|  | 125 | BMCWEB_LOG_ERROR("Read error: {}", ec); | 
|  | 126 | } | 
|  | 127 |  | 
|  | 128 | close("Close SSE connection"); | 
|  | 129 | } | 
|  | 130 |  | 
|  | 131 | void doWrite() | 
|  | 132 | { | 
|  | 133 | if (doingWrite) | 
|  | 134 | { | 
|  | 135 | return; | 
|  | 136 | } | 
|  | 137 | if (inputBuffer.size() == 0) | 
|  | 138 | { | 
|  | 139 | BMCWEB_LOG_DEBUG("inputBuffer is empty... Bailing out"); | 
|  | 140 | return; | 
|  | 141 | } | 
|  | 142 | startTimeout(); | 
|  | 143 | doingWrite = true; | 
|  | 144 |  | 
|  | 145 | adaptor.async_write_some( | 
|  | 146 | inputBuffer.data(), | 
|  | 147 | std::bind_front(&ConnectionImpl::doWriteCallback, this, | 
|  | 148 | shared_from_this())); | 
|  | 149 | } | 
|  | 150 |  | 
|  | 151 | void doWriteCallback(const std::shared_ptr<Connection>& /*self*/, | 
|  | 152 | const boost::beast::error_code& ec, | 
|  | 153 | size_t bytesTransferred) | 
|  | 154 | { | 
|  | 155 | timer.cancel(); | 
|  | 156 | doingWrite = false; | 
|  | 157 | inputBuffer.consume(bytesTransferred); | 
|  | 158 |  | 
|  | 159 | if (ec == boost::asio::error::eof) | 
|  | 160 | { | 
|  | 161 | BMCWEB_LOG_ERROR("async_write_some() SSE stream closed"); | 
|  | 162 | close("SSE stream closed"); | 
|  | 163 | return; | 
|  | 164 | } | 
|  | 165 |  | 
|  | 166 | if (ec) | 
|  | 167 | { | 
|  | 168 | BMCWEB_LOG_ERROR("async_write_some() failed: {}", ec.message()); | 
|  | 169 | close("async_write_some failed"); | 
|  | 170 | return; | 
|  | 171 | } | 
|  | 172 | BMCWEB_LOG_DEBUG("async_write_some() bytes transferred: {}", | 
|  | 173 | bytesTransferred); | 
|  | 174 |  | 
|  | 175 | doWrite(); | 
|  | 176 | } | 
|  | 177 |  | 
|  | 178 | void sendSseEvent(std::string_view id, std::string_view msg) override | 
|  | 179 | { | 
|  | 180 | if (msg.empty()) | 
|  | 181 | { | 
|  | 182 | BMCWEB_LOG_DEBUG("Empty data, bailing out."); | 
|  | 183 | return; | 
|  | 184 | } | 
|  | 185 |  | 
|  | 186 | dataFormat(id, msg); | 
|  | 187 |  | 
|  | 188 | doWrite(); | 
|  | 189 | } | 
|  | 190 |  | 
|  | 191 | void dataFormat(std::string_view id, std::string_view msg) | 
|  | 192 | { | 
|  | 193 | constexpr size_t bufferLimit = 10485760U; // 10MB | 
|  | 194 | if (id.size() + msg.size() + inputBuffer.size() >= bufferLimit) | 
|  | 195 | { | 
|  | 196 | BMCWEB_LOG_ERROR("SSE Buffer overflow while waiting for client"); | 
|  | 197 | close("Buffer overflow"); | 
|  | 198 | return; | 
|  | 199 | } | 
|  | 200 | std::string rawData; | 
|  | 201 | if (!id.empty()) | 
|  | 202 | { | 
|  | 203 | rawData += "id: "; | 
|  | 204 | rawData.append(id); | 
|  | 205 | rawData += "\n"; | 
|  | 206 | } | 
|  | 207 |  | 
|  | 208 | rawData += "data: "; | 
|  | 209 | for (char character : msg) | 
|  | 210 | { | 
|  | 211 | rawData += character; | 
|  | 212 | if (character == '\n') | 
|  | 213 | { | 
|  | 214 | rawData += "data: "; | 
|  | 215 | } | 
|  | 216 | } | 
|  | 217 | rawData += "\n\n"; | 
|  | 218 |  | 
|  | 219 | size_t copied = boost::asio::buffer_copy( | 
|  | 220 | inputBuffer.prepare(rawData.size()), boost::asio::buffer(rawData)); | 
|  | 221 | inputBuffer.commit(copied); | 
|  | 222 | } | 
|  | 223 |  | 
|  | 224 | void startTimeout() | 
|  | 225 | { | 
|  | 226 | std::weak_ptr<Connection> weakSelf = weak_from_this(); | 
|  | 227 | timer.expires_after(std::chrono::seconds(30)); | 
|  | 228 | timer.async_wait(std::bind_front(&ConnectionImpl::onTimeoutCallback, | 
|  | 229 | this, weak_from_this())); | 
|  | 230 | } | 
|  | 231 |  | 
|  | 232 | void onTimeoutCallback(const std::weak_ptr<Connection>& weakSelf, | 
|  | 233 | const boost::system::error_code& ec) | 
|  | 234 | { | 
|  | 235 | std::shared_ptr<Connection> self = weakSelf.lock(); | 
|  | 236 | if (!self) | 
|  | 237 | { | 
|  | 238 | BMCWEB_LOG_CRITICAL("{} Failed to capture connection", | 
|  | 239 | logPtr(self.get())); | 
|  | 240 | return; | 
|  | 241 | } | 
|  | 242 |  | 
|  | 243 | if (ec == boost::asio::error::operation_aborted) | 
|  | 244 | { | 
|  | 245 | BMCWEB_LOG_DEBUG("Timer operation aborted"); | 
|  | 246 | // Canceled wait means the path succeeded. | 
|  | 247 | return; | 
|  | 248 | } | 
|  | 249 | if (ec) | 
|  | 250 | { | 
|  | 251 | BMCWEB_LOG_CRITICAL("{} timer failed {}", logPtr(self.get()), ec); | 
|  | 252 | } | 
|  | 253 |  | 
|  | 254 | BMCWEB_LOG_WARNING("{} Connection timed out, closing", | 
|  | 255 | logPtr(self.get())); | 
|  | 256 |  | 
|  | 257 | self->close("closing connection"); | 
|  | 258 | } | 
|  | 259 |  | 
|  | 260 | private: | 
|  | 261 | std::array<char, 1> buffer{}; | 
|  | 262 | boost::beast::multi_buffer inputBuffer; | 
|  | 263 |  | 
|  | 264 | Adaptor adaptor; | 
|  | 265 |  | 
|  | 266 | using BodyType = bmcweb::HttpBody; | 
|  | 267 | boost::beast::http::response<BodyType> res; | 
|  | 268 | std::optional<boost::beast::http::response_serializer<BodyType>> serializer; | 
|  | 269 | boost::asio::steady_timer timer; | 
|  | 270 | bool doingWrite = false; | 
|  | 271 |  | 
|  | 272 | std::function<void(Connection&, const Request&)> openHandler; | 
|  | 273 | std::function<void(Connection&)> closeHandler; | 
|  | 274 | }; | 
|  | 275 | } // namespace sse_socket | 
|  | 276 | } // namespace crow |