nbd proxy and websocket cleanups
As-written, the nbd (and all websocket daemons) suffer from a problem
where there is no way to apply socket backpressure, so in certain
conditions, it's trivial to run the BMC out of memory on a given
message. This is a problem.
This commit implements the idea of an incremental callback handler, that
accepts a callback function to be run when the processing of the message
is complete. This allows applying backpressure on the socket, which in
turn, should provide pressure back to the client, and prevent buffering
crashes on slow connections, or connections with high latency.
Tested: NBD proxy not upstream, no way to test. No changes made to
normal websocket flow.
Signed-off-by: Michal Orzel <michalx.orzel@intel.com>
Signed-off-by: Ed Tanous <edtanous@google.com>
Change-Id: I3f116cc91eeadc949579deacbeb2d9f5e0f4fa53
diff --git a/http/routing.hpp b/http/routing.hpp
index e41ce93..613b54d 100644
--- a/http/routing.hpp
+++ b/http/routing.hpp
@@ -360,7 +360,7 @@
myConnection = std::make_shared<
crow::websocket::ConnectionImpl<boost::asio::ip::tcp::socket>>(
req, std::move(adaptor), openHandler, messageHandler,
- closeHandler, errorHandler);
+ messageExHandler, closeHandler, errorHandler);
myConnection->start();
}
#else
@@ -375,7 +375,7 @@
myConnection = std::make_shared<crow::websocket::ConnectionImpl<
boost::beast::ssl_stream<boost::asio::ip::tcp::socket>>>(
req, std::move(adaptor), openHandler, messageHandler,
- closeHandler, errorHandler);
+ messageExHandler, closeHandler, errorHandler);
myConnection->start();
}
#endif
@@ -395,6 +395,13 @@
}
template <typename Func>
+ self_t& onmessageex(Func f)
+ {
+ messageExHandler = f;
+ return *this;
+ }
+
+ template <typename Func>
self_t& onclose(Func f)
{
closeHandler = f;
@@ -412,6 +419,10 @@
std::function<void(crow::websocket::Connection&)> openHandler;
std::function<void(crow::websocket::Connection&, const std::string&, bool)>
messageHandler;
+ std::function<void(crow::websocket::Connection&, std::string_view,
+ crow::websocket::MessageType type,
+ std::function<void()>&& whenComplete)>
+ messageExHandler;
std::function<void(crow::websocket::Connection&, const std::string&)>
closeHandler;
std::function<void(crow::websocket::Connection&)> errorHandler;
diff --git a/http/websocket.hpp b/http/websocket.hpp
index 216e96f..9a5aa29 100644
--- a/http/websocket.hpp
+++ b/http/websocket.hpp
@@ -3,6 +3,7 @@
#include "http_request.hpp"
#include <boost/asio/buffer.hpp>
+#include <boost/beast/core/multi_buffer.hpp>
#include <boost/beast/websocket.hpp>
#include <array>
@@ -17,6 +18,12 @@
namespace websocket
{
+enum class MessageType
+{
+ Binary,
+ Text,
+};
+
struct Connection : std::enable_shared_from_this<Connection>
{
public:
@@ -30,9 +37,13 @@
virtual void sendBinary(std::string_view msg) = 0;
virtual void sendBinary(std::string&& msg) = 0;
+ virtual void sendEx(MessageType type, std::string_view msg,
+ std::function<void()>&& onDone) = 0;
virtual void sendText(std::string_view msg) = 0;
virtual void sendText(std::string&& msg) = 0;
virtual void close(std::string_view msg = "quit") = 0;
+ virtual void deferRead() = 0;
+ virtual void resumeRead() = 0;
virtual boost::asio::io_context& getIoContext() = 0;
virtual ~Connection() = default;
@@ -48,12 +59,17 @@
std::function<void(Connection&)> openHandlerIn,
std::function<void(Connection&, const std::string&, bool)>
messageHandlerIn,
+ std::function<void(crow::websocket::Connection&, std::string_view,
+ crow::websocket::MessageType type,
+ std::function<void()>&& whenComplete)>
+ messageExHandlerIn,
std::function<void(Connection&, const std::string&)> closeHandlerIn,
std::function<void(Connection&)> errorHandlerIn) :
Connection(reqIn),
ws(std::move(adaptorIn)), inBuffer(inString, 131088),
openHandler(std::move(openHandlerIn)),
messageHandler(std::move(messageHandlerIn)),
+ messageExHandler(std::move(messageExHandlerIn)),
closeHandler(std::move(closeHandlerIn)),
errorHandler(std::move(errorHandlerIn)), session(reqIn.session)
{
@@ -126,28 +142,61 @@
void sendBinary(std::string_view msg) override
{
ws.binary(true);
- outBuffer.emplace_back(msg);
+ outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()),
+ boost::asio::buffer(msg)));
doWrite();
}
+ void sendEx(MessageType type, std::string_view msg,
+ std::function<void()>&& onDone) override
+ {
+ if (doingWrite)
+ {
+ BMCWEB_LOG_CRITICAL
+ << "Cannot mix sendEx usage with sendBinary or sendText";
+ onDone();
+ return;
+ }
+ ws.binary(type == MessageType::Binary);
+
+ ws.async_write(boost::asio::buffer(msg),
+ [weak(weak_from_this()), onDone{std::move(onDone)}](
+ const boost::beast::error_code& ec, size_t) {
+ std::shared_ptr<Connection> self = weak.lock();
+
+ // Call the done handler regardless of whether we
+ // errored, but before we close things out
+ onDone();
+
+ if (ec)
+ {
+ BMCWEB_LOG_ERROR << "Error in ws.async_write " << ec;
+ self->close("write error");
+ }
+ });
+ }
+
void sendBinary(std::string&& msg) override
{
ws.binary(true);
- outBuffer.emplace_back(std::move(msg));
+ outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()),
+ boost::asio::buffer(msg)));
doWrite();
}
void sendText(std::string_view msg) override
{
ws.text(true);
- outBuffer.emplace_back(msg);
+ outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()),
+ boost::asio::buffer(msg)));
doWrite();
}
void sendText(std::string&& msg) override
{
ws.text(true);
- outBuffer.emplace_back(std::move(msg));
+ outBuffer.commit(boost::asio::buffer_copy(outBuffer.prepare(msg.size()),
+ boost::asio::buffer(msg)));
doWrite();
}
@@ -172,19 +221,41 @@
{
BMCWEB_LOG_DEBUG << "Websocket accepted connection";
- doRead();
-
if (openHandler)
{
openHandler(*this);
}
+ doRead();
+ }
+
+ void deferRead() override
+ {
+ readingDefered = true;
+
+ // If we're not actively reading, we need to take ownership of
+ // ourselves for a small portion of time, do that, and clear when we
+ // resume.
+ selfOwned = shared_from_this();
+ }
+
+ void resumeRead() override
+ {
+ readingDefered = false;
+ doRead();
+
+ // No longer need to keep ourselves alive now that read is active.
+ selfOwned.reset();
}
void doRead()
{
- ws.async_read(inBuffer,
- [this, self(shared_from_this())](
- boost::beast::error_code ec, std::size_t bytesRead) {
+ if (readingDefered)
+ {
+ return;
+ }
+ ws.async_read(inBuffer, [this, self(shared_from_this())](
+ const boost::beast::error_code& ec,
+ size_t bytesRead) {
if (ec)
{
if (ec != boost::beast::websocket::error::closed)
@@ -198,16 +269,10 @@
}
return;
}
- if (messageHandler)
- {
- messageHandler(*this, inString, ws.got_text());
- }
- inBuffer.consume(bytesRead);
- inString.clear();
- doRead();
+
+ handleMessage(bytesRead);
});
}
-
void doWrite()
{
// If we're already doing a write, ignore the request, it will be picked
@@ -217,17 +282,17 @@
return;
}
- if (outBuffer.empty())
+ if (outBuffer.size() == 0)
{
// Done for now
return;
}
doingWrite = true;
- ws.async_write(boost::asio::buffer(outBuffer.front()),
- [this, self(shared_from_this())](
- boost::beast::error_code ec, std::size_t) {
+ ws.async_write(outBuffer.data(), [this, self(shared_from_this())](
+ const boost::beast::error_code& ec,
+ size_t bytesSent) {
doingWrite = false;
- outBuffer.erase(outBuffer.begin());
+ outBuffer.consume(bytesSent);
if (ec == boost::beast::websocket::error::closed)
{
// Do nothing here. doRead handler will call the
@@ -245,21 +310,59 @@
}
private:
+ void handleMessage(size_t bytesRead)
+ {
+ if (messageExHandler)
+ {
+ // Note, because of the interactions with the read buffers,
+ // this message handler overrides the normal message handler
+ messageExHandler(*this, inString, MessageType::Binary,
+ [this, self(shared_from_this()), bytesRead]() {
+ if (self == nullptr)
+ {
+ return;
+ }
+
+ inBuffer.consume(bytesRead);
+ inString.clear();
+
+ doRead();
+ });
+ return;
+ }
+
+ if (messageHandler)
+ {
+ messageHandler(*this, inString, ws.got_text());
+ }
+ inBuffer.consume(bytesRead);
+ inString.clear();
+ doRead();
+ }
+
boost::beast::websocket::stream<Adaptor, false> ws;
+ bool readingDefered = false;
std::string inString;
boost::asio::dynamic_string_buffer<std::string::value_type,
std::string::traits_type,
std::string::allocator_type>
inBuffer;
- std::vector<std::string> outBuffer;
+
+ boost::beast::multi_buffer outBuffer;
bool doingWrite = false;
std::function<void(Connection&)> openHandler;
std::function<void(Connection&, const std::string&, bool)> messageHandler;
+ std::function<void(crow::websocket::Connection&, std::string_view,
+ crow::websocket::MessageType type,
+ std::function<void()>&& whenComplete)>
+ messageExHandler;
std::function<void(Connection&, const std::string&)> closeHandler;
std::function<void(Connection&)> errorHandler;
std::shared_ptr<persistent_data::UserSession> session;
+
+ std::shared_ptr<Connection> selfOwned;
};
} // namespace websocket
} // namespace crow