vm_websocket: Add websocket handler

On receiving a websocket request on endpoint /vm/0/0,
connect to the nbd-proxy app and send/receive stdio.

Tested: Verified that the host could see the virtual
media usb device, mounted it manually and checked
the contents of the iso file used for the test were
there. To test, used the html and js script:
https://github.com/openbmc/jsnbd/tree/master/web
and an Ubuntu iso image file.
Verified that it worked after closing the websocket
(using the stop function from the html file), to
check that the processes were cleaned up and freed
up for a subsequent request.

Change-Id: I0b070310b070c086d67d0ae3e2c165551d6b87cc
Signed-off-by: Adriana Kobylak <anoo@us.ibm.com>
diff --git a/include/vm_websocket.hpp b/include/vm_websocket.hpp
new file mode 100644
index 0000000..3f229e6
--- /dev/null
+++ b/include/vm_websocket.hpp
@@ -0,0 +1,218 @@
+#pragma once
+
+#include <crow/app.h>
+#include <crow/websocket.h>
+#include <signal.h>
+
+#include <boost/beast/core/flat_static_buffer.hpp>
+#include <boost/process.hpp>
+#include <webserver_common.hpp>
+
+namespace crow
+{
+namespace obmc_vm
+{
+
+static crow::websocket::Connection* session = nullptr;
+
+// The max network block device buffer size is 128kb plus 16bytes
+// for the message header:
+// https://github.com/NetworkBlockDevice/nbd/blob/master/doc/proto.md#simple-reply-message
+static constexpr auto nbdBufferSize = 131088;
+
+class Handler : public std::enable_shared_from_this<Handler>
+{
+  public:
+    Handler(const std::string& media, boost::asio::io_service& ios) :
+        pipeOut(ios), pipeIn(ios), media(media), doingWrite(false),
+        outputBuffer(new boost::beast::flat_static_buffer<nbdBufferSize>),
+        inputBuffer(new boost::beast::flat_static_buffer<nbdBufferSize>)
+    {
+    }
+
+    ~Handler()
+    {
+    }
+
+    void doClose()
+    {
+        // boost::process::child::terminate uses SIGKILL, need to send SIGTERM
+        // to allow the proxy to stop nbd-client and the USB device gadget.
+        int rc = kill(proxy.id(), SIGTERM);
+        if (rc)
+        {
+            return;
+        }
+        proxy.wait();
+    }
+
+    void connect()
+    {
+        std::error_code ec;
+        proxy = boost::process::child("/usr/sbin/nbd-proxy", media,
+                                      boost::process::std_out > pipeOut,
+                                      boost::process::std_in < pipeIn, ec);
+        if (ec)
+        {
+            BMCWEB_LOG_ERROR << "Couldn't connect to nbd-proxy: "
+                             << ec.message();
+            if (session != nullptr)
+            {
+                session->close("Error connecting to nbd-proxy");
+            }
+            return;
+        }
+        doWrite();
+        doRead();
+    }
+
+    void doWrite()
+    {
+        if (doingWrite)
+        {
+            BMCWEB_LOG_DEBUG << "Already writing.  Bailing out";
+            return;
+        }
+
+        if (inputBuffer->size() == 0)
+        {
+            BMCWEB_LOG_DEBUG << "inputBuffer empty.  Bailing out";
+            return;
+        }
+
+        doingWrite = true;
+        pipeIn.async_write_some(
+            inputBuffer->data(),
+            [this, self(shared_from_this())](boost::beast::error_code ec,
+                                             std::size_t bytesWritten) {
+                BMCWEB_LOG_DEBUG << "Wrote " << bytesWritten << "bytes";
+                doingWrite = false;
+                inputBuffer->consume(bytesWritten);
+
+                if (session == nullptr)
+                {
+                    return;
+                }
+                if (ec == boost::asio::error::eof)
+                {
+                    session->close("VM socket port closed");
+                    return;
+                }
+                if (ec)
+                {
+                    session->close("Error in writing to proxy port");
+                    BMCWEB_LOG_ERROR << "Error in VM socket write " << ec;
+                    return;
+                }
+                doWrite();
+            });
+    }
+
+    void doRead()
+    {
+        std::size_t bytes = outputBuffer->capacity() - outputBuffer->size();
+
+        pipeOut.async_read_some(
+            outputBuffer->prepare(bytes),
+            [this, self(shared_from_this())](
+                const boost::system::error_code& ec, std::size_t bytesRead) {
+                BMCWEB_LOG_DEBUG << "Read done.  Read " << bytesRead
+                                 << " bytes";
+                if (ec)
+                {
+                    BMCWEB_LOG_ERROR << "Couldn't read from VM port: " << ec;
+                    if (session != nullptr)
+                    {
+                        session->close("Error in connecting to VM port");
+                    }
+                    return;
+                }
+                if (session == nullptr)
+                {
+                    return;
+                }
+
+                outputBuffer->commit(bytesRead);
+                std::string_view payload(
+                    static_cast<const char*>(outputBuffer->data().data()),
+                    bytesRead);
+                session->sendBinary(payload);
+                outputBuffer->consume(bytesRead);
+
+                doRead();
+            });
+    }
+
+    boost::process::async_pipe pipeOut;
+    boost::process::async_pipe pipeIn;
+    boost::process::child proxy;
+    std::string media;
+    bool doingWrite;
+
+    std::unique_ptr<boost::beast::flat_static_buffer<nbdBufferSize>>
+        outputBuffer;
+    std::unique_ptr<boost::beast::flat_static_buffer<nbdBufferSize>>
+        inputBuffer;
+};
+
+static std::shared_ptr<Handler> handler;
+
+template <typename... Middlewares> void requestRoutes(Crow<Middlewares...>& app)
+{
+    BMCWEB_ROUTE(app, "/vm/0/0")
+        .websocket()
+        .onopen([](crow::websocket::Connection& conn) {
+            BMCWEB_LOG_DEBUG << "Connection " << &conn << " opened";
+
+            if (session != nullptr)
+            {
+                conn.close("Session already connected");
+                return;
+            }
+
+            if (handler != nullptr)
+            {
+                conn.close("Handler already running");
+                return;
+            }
+
+            session = &conn;
+
+            // media is the last digit of the endpoint /vm/0/0. A future
+            // enhancement can include supporting different endpoint values.
+            const char* media = "0";
+            handler = std::make_shared<Handler>(media, conn.get_io_context());
+            handler->connect();
+        })
+        .onclose(
+            [](crow::websocket::Connection& conn, const std::string& reason) {
+                session = nullptr;
+                handler->doClose();
+#if BOOST_VERSION >= 107000
+                handler->inputBuffer->clear();
+                handler->outputBuffer->clear();
+#else
+                handler->inputBuffer->reset();
+                handler->outputBuffer->reset();
+#endif
+                handler.reset();
+            })
+        .onmessage([](crow::websocket::Connection& conn,
+                      const std::string& data, bool is_binary) {
+            if (data.length() > handler->inputBuffer->capacity())
+            {
+                BMCWEB_LOG_ERROR << "Buffer overrun when writing "
+                                 << data.length() << " bytes";
+                conn.close("Buffer overrun");
+                return;
+            }
+
+            boost::asio::buffer_copy(handler->inputBuffer->prepare(data.size()),
+                                     boost::asio::buffer(data));
+            handler->inputBuffer->commit(data.size());
+            handler->doWrite();
+        });
+}
+
+} // namespace obmc_vm
+} // namespace crow