Move websocket implementation to boost beast
Boost beast is already in much better use, and gives more confidence in
the security model. This change keeps the existing crow interfaces,
and simply replaces the backend with beast. Calling code remains
largely unchanged, with the exception of having to explicitly cast to
string (to obtain a string view) when sending messages.
Change-Id: I90edad505faf2d4465b4888f1f2c4b12cc9e77d0
Signed-off-by: Ed Tanous <ed.tanous@intel.com>
diff --git a/crow/include/crow/http_request.h b/crow/include/crow/http_request.h
index 301aa59..7e95bc6 100644
--- a/crow/include/crow/http_request.h
+++ b/crow/include/crow/http_request.h
@@ -42,7 +42,6 @@
bool keepAlive() { return req.keep_alive(); }
- private:
boost::beast::http::request<boost::beast::http::string_body>& req;
};
diff --git a/crow/include/crow/routing.h b/crow/include/crow/routing.h
index 12daeb4..ddf307a 100644
--- a/crow/include/crow/routing.h
+++ b/crow/include/crow/routing.h
@@ -264,9 +264,11 @@
#ifdef BMCWEB_ENABLE_SSL
void handleUpgrade(const Request& req, Response&,
SSLAdaptor&& adaptor) override {
- new crow::websocket::ConnectionImpl<SSLAdaptor>(req, std::move(adaptor),
- openHandler, messageHandler,
- closeHandler, errorHandler);
+ std::shared_ptr<crow::websocket::ConnectionImpl<SSLAdaptor>> myConnection =
+ std::make_shared<crow::websocket::ConnectionImpl<SSLAdaptor>>(
+ req, std::move(adaptor), openHandler, messageHandler, closeHandler,
+ errorHandler);
+ myConnection->start();
}
#endif
diff --git a/crow/include/crow/socket_adaptors.h b/crow/include/crow/socket_adaptors.h
index 4b379db..1d43ca2 100644
--- a/crow/include/crow/socket_adaptors.h
+++ b/crow/include/crow/socket_adaptors.h
@@ -12,6 +12,7 @@
using tcp = asio::ip::tcp;
struct SocketAdaptor {
+ using streamType = tcp::socket;
using secure = std::false_type;
using context = void;
SocketAdaptor(boost::asio::io_service& ioService, context* /*unused*/)
@@ -38,7 +39,8 @@
template <typename F>
void start(F f) {
- f(boost::system::error_code());
+ boost::system::error_code ec;
+ f(ec);
}
tcp::socket socketCls;
@@ -72,6 +74,7 @@
#ifdef BMCWEB_ENABLE_SSL
struct SSLAdaptor {
+ using streamType = boost::asio::ssl::stream<tcp::socket>;
using secure = std::true_type;
using context = boost::asio::ssl::context;
using ssl_socket_t = boost::asio::ssl::stream<tcp::socket>;
@@ -104,9 +107,9 @@
fail, because the adapter is gone. As is, doRead believes the parse
failed, because isOpen now returns False (which could also mean the client
disconnected during parse)
- UPdate: The parser does in fact have an "isUpgrade" method that is intended
- for exactly this purpose. Todo is now to make doRead obey the flag
- appropriately so this code can be changed back.
+ UPdate: The parser does in fact have an "isUpgrade" method that is
+ intended for exactly this purpose. Todo is now to make doRead obey the
+ flag appropriately so this code can be changed back.
*/
if (sslSocket != nullptr) {
return sslSocket->lowest_layer().is_open();
diff --git a/crow/include/crow/websocket.h b/crow/include/crow/websocket.h
index 2272500..82c6db8 100644
--- a/crow/include/crow/websocket.h
+++ b/crow/include/crow/websocket.h
@@ -1,28 +1,27 @@
#pragma once
#include <array>
+#include <functional>
#include "crow/http_request.h"
#include "crow/socket_adaptors.h"
#include <boost/algorithm/string/predicate.hpp>
-#include <boost/uuid/detail/sha1.hpp>
+#include <boost/beast/websocket.hpp>
+
+#ifdef BMCWEB_ENABLE_SSL
+#include <boost/beast/websocket/ssl.hpp>
+#endif
namespace crow {
namespace websocket {
-enum class WebSocketReadState {
- MiniHeader,
- Len16,
- Len64,
- Mask,
- Payload,
-};
-
-struct Connection {
+struct Connection : std::enable_shared_from_this<Connection> {
public:
explicit Connection(const crow::Request& req)
: req(req), userdataPtr(nullptr){};
- virtual void sendBinary(const std::string& msg) = 0;
- virtual void sendText(const std::string& msg) = 0;
- virtual void close(const std::string& msg = "quit") = 0;
+ virtual void sendBinary(const boost::beast::string_view msg) = 0;
+ virtual void sendBinary(std::string&& msg) = 0;
+ virtual void sendText(const boost::beast::string_view msg) = 0;
+ virtual void sendText(std::string&& msg) = 0;
+ virtual void close(const boost::beast::string_view msg = "quit") = 0;
virtual boost::asio::io_service& getIoService() = 0;
virtual ~Connection() = default;
@@ -39,134 +38,90 @@
class ConnectionImpl : public Connection {
public:
ConnectionImpl(
- const crow::Request& req, Adaptor&& adaptor,
+ const crow::Request& req, Adaptor&& adaptorIn,
std::function<void(Connection&)> open_handler,
std::function<void(Connection&, const std::string&, bool)>
message_handler,
std::function<void(Connection&, const std::string&)> close_handler,
std::function<void(Connection&)> error_handler)
- : adaptor(std::move(adaptor)),
+ : adaptor(std::move(adaptorIn)),
+ ws(adaptor.socket()),
Connection(req),
openHandler(std::move(open_handler)),
messageHandler(std::move(message_handler)),
closeHandler(std::move(close_handler)),
errorHandler(std::move(error_handler)) {
- if (!boost::iequals(req.getHeaderValue("upgrade"), "websocket")) {
- adaptor.close();
- delete this;
- return;
- }
- // Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
- // Sec-WebSocket-Version: 13
- std::string magic(req.getHeaderValue("Sec-WebSocket-Key"));
- magic += "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
- boost::uuids::detail::sha1 s;
- s.process_bytes(magic.data(), magic.size());
-
- // sha1 digests are 20 bytes long
- uint32_t digest[20 / sizeof(uint32_t)];
- s.get_digest(digest);
- for (int i = 0; i < 5; i++) {
- digest[i] = htonl(digest[i]);
- }
- start(crow::utility::base64encode(reinterpret_cast<char*>(digest), 20));
- }
-
- template <typename CompletionHandler>
- void dispatch(CompletionHandler handler) {
- adaptor.getIoService().dispatch(handler);
- }
-
- template <typename CompletionHandler>
- void post(CompletionHandler handler) {
- adaptor.getIoService().post(handler);
+ BMCWEB_LOG_DEBUG << "Creating new connection " << this;
}
boost::asio::io_service& getIoService() override {
return adaptor.getIoService();
}
- void sendPong(const std::string& msg) {
- dispatch([this, msg] {
- char buf[3] = "\x8A\x00";
- buf[1] += msg.size();
- writeBuffers.emplace_back(buf, buf + 2);
- writeBuffers.emplace_back(msg);
- doWrite();
- });
+ void start() {
+ BMCWEB_LOG_DEBUG << "starting connection " << this;
+
+ boost::string_view protocol =
+ req.getHeaderValue(boost::beast::http::field::sec_websocket_protocol);
+
+ // Perform the websocket upgrade
+ ws.async_accept_ex(
+ req.req,
+ [protocol{std::string(protocol)}](
+ boost::beast::websocket::response_type & m) {
+ if (!protocol.empty()) {
+ m.insert(boost::beast::http::field::sec_websocket_protocol,
+ protocol);
+ }
+ },
+ [ this, self(shared_from_this()) ](boost::system::error_code ec) {
+ if (ec) {
+ BMCWEB_LOG_ERROR << "Error in ws.async_accept " << ec;
+ return;
+ }
+ acceptDone();
+ });
}
- void sendBinary(const std::string& msg) override {
- dispatch([this, msg] {
- auto header = buildHeader(2, msg.size());
- writeBuffers.emplace_back(std::move(header));
- writeBuffers.emplace_back(msg);
- doWrite();
- });
- }
-
- void sendText(const std::string& msg) override {
- dispatch([this, msg] {
- auto header = buildHeader(1, msg.size());
- writeBuffers.emplace_back(std::move(header));
- writeBuffers.emplace_back(msg);
- doWrite();
- });
- }
-
- void close(const std::string& msg) override {
- dispatch([this, msg] {
- hasSentClose = true;
- if (hasRecvClose && !isCloseHandlerCalled) {
- isCloseHandlerCalled = true;
- if (closeHandler) {
- closeHandler(*this, msg);
- }
- }
- auto header = buildHeader(0x8, msg.size());
- writeBuffers.emplace_back(std::move(header));
- writeBuffers.emplace_back(msg);
- doWrite();
- });
- }
-
- protected:
- std::string buildHeader(int opcode, uint64_t size) {
- char buf[2 + 8] = "\x80\x00";
- buf[0] += opcode;
- if (size < 126) {
- buf[1] += size;
- return {buf, buf + 2};
- } else if (size < 0x10000) {
- buf[1] += 126;
- *reinterpret_cast<uint16_t*>(buf + 2) =
- htons(static_cast<uint16_t>(size));
- return {buf, buf + 4};
- } else {
- buf[1] += 127;
- *reinterpret_cast<uint64_t*>(buf + 2) =
- ((1 == htonl(1))
- ? size
- : (static_cast<uint64_t>(htonl((size)&0xFFFFFFFF)) << 32) |
- htonl((size) >> 32));
- return {buf, buf + 10};
- }
- }
-
- void start(std::string&& hello) {
- static std::string header =
- "HTTP/1.1 101 Switching Protocols\r\n"
- "Upgrade: websocket\r\n"
- "Connection: Upgrade\r\n"
- //"Sec-WebSocket-Protocol: binary\r\n" // TODO(ed): this hardcodes
- // binary mode find a better way
- "Sec-WebSocket-Accept: ";
- static std::string crlf = "\r\n";
- writeBuffers.emplace_back(header);
- writeBuffers.emplace_back(std::move(hello));
- writeBuffers.emplace_back(crlf);
- writeBuffers.emplace_back(crlf);
+ void sendBinary(const boost::beast::string_view msg) override {
+ ws.binary(true);
+ outBuffer.emplace_back(msg);
doWrite();
+ }
+
+ void sendBinary(std::string&& msg) override {
+ ws.binary(true);
+ outBuffer.emplace_back(std::move(msg));
+ doWrite();
+ }
+
+ void sendText(const boost::beast::string_view msg) override {
+ ws.text(true);
+ outBuffer.emplace_back(msg);
+ doWrite();
+ }
+
+ void sendText(std::string&& msg) override {
+ ws.text(true);
+ outBuffer.emplace_back(std::move(msg));
+ doWrite();
+ }
+
+ void close(const boost::beast::string_view msg) override {
+ ws.async_close(
+ boost::beast::websocket::close_code::normal,
+ [ this, self(shared_from_this()) ](boost::system::error_code ec) {
+ if (ec) {
+ BMCWEB_LOG_ERROR << "Error closing websocket " << ec;
+ return;
+ }
+ adaptor.close();
+ });
+ }
+
+ void acceptDone() {
+ BMCWEB_LOG_DEBUG << "Websocket accepted connection";
+
if (openHandler) {
openHandler(*this);
}
@@ -174,292 +129,73 @@
}
void doRead() {
- isReading = true;
- switch (state) {
- case WebSocketReadState::MiniHeader: {
- // boost::asio::async_read(adaptor.socket(),
- // boost::asio::buffer(&miniHeader, 1),
- adaptor.socket().async_read_some(
- boost::asio::buffer(&miniHeader, 2),
- [this](const boost::system::error_code& ec,
- std::size_t bytes_transferred) {
- isReading = false;
- miniHeader = htons(miniHeader);
-#ifdef BMCWEB_ENABLE_DEBUG
-
- if (!ec && bytes_transferred != 2) {
- throw std::runtime_error(
- "WebSocket:MiniHeader:async_read fail:asio bug?");
- }
-#endif
-
- if (!ec && ((miniHeader & 0x80) == 0x80)) {
- if ((miniHeader & 0x7f) == 127) {
- state = WebSocketReadState::Len64;
- } else if ((miniHeader & 0x7f) == 126) {
- state = WebSocketReadState::Len16;
- } else {
- remainingLength = miniHeader & 0x7f;
- state = WebSocketReadState::Mask;
- }
- doRead();
- } else {
- closeConnection = true;
- adaptor.close();
- if (errorHandler) {
- errorHandler(*this);
- }
- checkDestroy();
- }
- });
- } break;
- case WebSocketReadState::Len16: {
- remainingLength = 0;
- boost::asio::async_read(
- adaptor.socket(), boost::asio::buffer(&remainingLength, 2),
- [this](const boost::system::error_code& ec,
- std::size_t bytes_transferred) {
- isReading = false;
- remainingLength = ntohs(*(uint16_t*)&remainingLength);
-#ifdef BMCWEB_ENABLE_DEBUG
- if (!ec && bytes_transferred != 2) {
- throw std::runtime_error(
- "WebSocket:Len16:async_read fail:asio bug?");
- }
-#endif
-
- if (!ec) {
- state = WebSocketReadState::Mask;
- doRead();
- } else {
- closeConnection = true;
- adaptor.close();
- if (errorHandler) {
- errorHandler(*this);
- }
- checkDestroy();
- }
- });
- } break;
- case WebSocketReadState::Len64: {
- boost::asio::async_read(
- adaptor.socket(), boost::asio::buffer(&remainingLength, 8),
- [this](const boost::system::error_code& ec,
- std::size_t bytes_transferred) {
- isReading = false;
- remainingLength =
- ((1 == ntohl(1))
- ? (remainingLength)
- : ((uint64_t)ntohl((remainingLength)&0xFFFFFFFF) << 32) |
- ntohl((remainingLength) >> 32));
-#ifdef BMCWEB_ENABLE_DEBUG
- if (!ec && bytes_transferred != 8) {
- throw std::runtime_error(
- "WebSocket:Len16:async_read fail:asio bug?");
- }
-#endif
-
- if (!ec) {
- state = WebSocketReadState::Mask;
- doRead();
- } else {
- closeConnection = true;
- adaptor.close();
- if (errorHandler) {
- errorHandler(*this);
- }
- checkDestroy();
- }
- });
- } break;
- case WebSocketReadState::Mask:
- boost::asio::async_read(
- adaptor.socket(), boost::asio::buffer((char*)&mask, 4),
- [this](const boost::system::error_code& ec,
- std::size_t bytes_transferred) {
- isReading = false;
-#ifdef BMCWEB_ENABLE_DEBUG
- if (!ec && bytes_transferred != 4) {
- throw std::runtime_error(
- "WebSocket:Mask:async_read fail:asio bug?");
- }
-#endif
-
- if (!ec) {
- state = WebSocketReadState::Payload;
- doRead();
- } else {
- closeConnection = true;
- if (errorHandler) {
- errorHandler(*this);
- }
- adaptor.close();
- }
- });
- break;
- case WebSocketReadState::Payload: {
- size_t toRead = buffer.size();
- if (remainingLength < toRead) {
- toRead = remainingLength;
- }
- adaptor.socket().async_read_some(
- boost::asio::buffer(buffer, toRead),
- [this](const boost::system::error_code& ec,
- std::size_t bytes_transferred) {
- isReading = false;
-
- if (!ec) {
- fragment.insert(fragment.end(), buffer.begin(),
- buffer.begin() + bytes_transferred);
- remainingLength -= bytes_transferred;
- if (remainingLength == 0) {
- handleFragment();
- state = WebSocketReadState::MiniHeader;
- doRead();
- }
- } else {
- closeConnection = true;
- if (errorHandler) {
- errorHandler(*this);
- }
- adaptor.close();
- }
- });
- } break;
- }
- }
-
- bool isFin() { return miniHeader & 0x8000; }
-
- int opcode() { return (miniHeader & 0x0f00) >> 8; }
-
- void handleFragment() {
- for (decltype(fragment.length()) i = 0; i < fragment.length(); i++) {
- fragment[i] ^= ((char*)&mask)[i % 4];
- }
- switch (opcode()) {
- case 0: // Continuation
- {
- message += fragment;
- if (isFin()) {
- if (messageHandler) {
- messageHandler(*this, message, isBinary);
- }
- message.clear();
- }
- }
- case 1: // Text
- {
- isBinary = false;
- message += fragment;
- if (isFin()) {
- if (messageHandler) {
- messageHandler(*this, message, isBinary);
- }
- message.clear();
- }
- } break;
- case 2: // Binary
- {
- isBinary = true;
- message += fragment;
- if (isFin()) {
- if (messageHandler) {
- messageHandler(*this, message, isBinary);
- }
- message.clear();
- }
- } break;
- case 0x8: // Close
- {
- hasRecvClose = true;
- if (!hasSentClose) {
- close(fragment);
- } else {
- adaptor.close();
- closeConnection = true;
- if (!isCloseHandlerCalled) {
- if (closeHandler) {
- closeHandler(*this, fragment);
+ ws.async_read(
+ inBuffer, [ this, self(shared_from_this()) ](
+ boost::beast::error_code ec, std::size_t bytes_read) {
+ if (ec) {
+ if (ec != boost::beast::websocket::error::closed) {
+ BMCWEB_LOG_ERROR << "doRead error " << ec;
}
- isCloseHandlerCalled = true;
+ if (closeHandler) {
+ boost::beast::string_view reason = ws.reason().reason;
+ closeHandler(*this, std::string(reason));
+ }
+ return;
}
- checkDestroy();
- }
- } break;
- case 0x9: // Ping
- {
- sendPong(fragment);
- } break;
- case 0xA: // Pong
- {
- pongReceived = true;
- } break;
- }
-
- fragment.clear();
+ if (messageHandler) {
+ // TODO(Ed) There must be a more direct way to do this conversion,
+ // but I can't find it at the moment. It should get optimized away
+ boost::asio::const_buffer cb =
+ boost::beast::buffers_front(inBuffer.data());
+ boost::beast::string_view message(
+ reinterpret_cast<char const*>(cb.data()), cb.size());
+ messageHandler(*this, std::string(message), ws.got_text());
+ }
+ doRead();
+ });
}
void doWrite() {
- if (sendingBuffers.empty()) {
- sendingBuffers.swap(writeBuffers);
- std::vector<boost::asio::const_buffer> buffers;
- buffers.reserve(sendingBuffers.size());
- for (auto& s : sendingBuffers) {
- buffers.emplace_back(boost::asio::buffer(s));
- }
- boost::asio::async_write(adaptor.socket(), buffers,
- [&](const boost::system::error_code& ec,
- std::size_t /*bytes_transferred*/) {
- sendingBuffers.clear();
- if (!ec && !closeConnection) {
- if (!writeBuffers.empty()) {
- doWrite();
- }
- if (hasSentClose) {
- closeConnection = true;
- }
- } else {
- closeConnection = true;
- checkDestroy();
- }
- });
+ // If we're already doing a write, ignore the request, it will be picked up
+ // when the current write is complete
+ if (doingWrite) {
+ return;
}
- }
- void checkDestroy() {
- // if (hasSentClose && hasRecvClose)
- if (!isCloseHandlerCalled) {
- if (closeHandler) {
- closeHandler(*this, "uncleanly");
- }
+ if (outBuffer.empty()) {
+ // Done for now
+ return;
}
- if (sendingBuffers.empty() && !isReading) {
- delete this;
- }
+ doingWrite = true;
+ ws.async_write(boost::asio::buffer(outBuffer.front()),
+ [ this, self(shared_from_this()) ](
+ boost::beast::error_code ec, std::size_t bytes_written) {
+ doingWrite = false;
+ outBuffer.erase(outBuffer.begin());
+ if (ec == boost::beast::websocket::error::closed) {
+ // Do nothing here. doRead handler will call the
+ // closeHandler.
+ close("Write error");
+ return;
+ }
+ if (ec) {
+ BMCWEB_LOG_ERROR << "Error in ws.async_write " << ec;
+ return;
+ }
+ doWrite();
+ });
}
private:
Adaptor adaptor;
- std::vector<std::string> sendingBuffers;
- std::vector<std::string> writeBuffers;
+ boost::beast::websocket::stream<
+ std::add_lvalue_reference_t<typename Adaptor::streamType>>
+ ws;
- std::array<char, 4096> buffer{};
- bool isBinary{};
- std::string message;
- std::string fragment;
- WebSocketReadState state{WebSocketReadState::MiniHeader};
- uint64_t remainingLength{0};
- bool closeConnection{false};
- bool isReading{false};
- uint32_t mask{};
- uint16_t miniHeader{};
- bool hasSentClose{false};
- bool hasRecvClose{false};
- bool errorOccured{false};
- bool pongReceived{false};
- bool isCloseHandlerCalled{false};
+ boost::beast::flat_static_buffer<4096> inBuffer;
+ std::vector<std::string> outBuffer;
+ bool doingWrite = false;
std::function<void(Connection&)> openHandler;
std::function<void(Connection&, const std::string&, bool)> messageHandler;
diff --git a/include/dbus_monitor.hpp b/include/dbus_monitor.hpp
index 43385e6..f0b2167 100644
--- a/include/dbus_monitor.hpp
+++ b/include/dbus_monitor.hpp
@@ -50,7 +50,8 @@
std::string pathNamespace(conn.req.urlParams.get("path_namespace"));
if (pathNamespace.empty()) {
conn.sendText(
- nlohmann::json({"error", "Did not specify path_namespace"}));
+ nlohmann::json({"error", "Did not specify path_namespace"})
+ .dump());
conn.close("error");
}
sessions[&conn] = DbusWebsocketSession();
@@ -62,7 +63,6 @@
sessions[&conn].matches.emplace_back(
std::make_unique<sdbusplus::bus::match::match>(
*crow::connections::systemBus, matchString, onPropertyUpdate));
-
})
.onclose([&](crow::websocket::Connection& conn,
const std::string& reason) { sessions.erase(&conn); })