Implement nbd-proxy as a part of bmcweb

Nbd-proxy is responsible for exposing websocket endpoint in bmcweb.
It matches WS endpoints with unix socket paths using configuration
exposed on D-Bus by Virtual-Media.

Virtual-Media is then notified about unix socket availability through
mount/unmount D-Bus methods.

Currently, this feature is disabled by default.

Tested: Integrated with initial version of Virtual-Media.

Change-Id: I9c572e9841b16785727e5676fea1bb63b0311c63
Signed-off-by: Iwona Klimaszewska <iwona.klimaszewska@intel.com>
Signed-off-by: Przemyslaw Czarnowski <przemyslaw.hawrylewicz.czarnowski@intel.com>
diff --git a/include/async_resp.hpp b/include/async_resp.hpp
index af4edeb..78994cf 100644
--- a/include/async_resp.hpp
+++ b/include/async_resp.hpp
@@ -1,5 +1,7 @@
 #pragma once
 
+#include <functional>
+
 namespace bmcweb
 {
 
@@ -7,6 +9,7 @@
  * AsyncResp
  * Gathers data needed for response processing after async calls are done
  */
+
 class AsyncResp
 {
   public:
@@ -14,12 +17,23 @@
     {
     }
 
+    AsyncResp(crow::Response& response, std::function<void()>&& function) :
+        res(response), func(std::move(function))
+    {
+    }
+
     ~AsyncResp()
     {
+        if (func && res.result() == boost::beast::http::status::ok)
+        {
+            func();
+        }
+
         res.end();
     }
 
     crow::Response& res;
+    std::function<void()> func = 0;
 };
 
-} // namespace bmcweb
\ No newline at end of file
+} // namespace bmcweb
diff --git a/include/dbus_monitor.hpp b/include/dbus_monitor.hpp
index 0543c7b..1747810 100644
--- a/include/dbus_monitor.hpp
+++ b/include/dbus_monitor.hpp
@@ -2,6 +2,7 @@
 #include <app.h>
 #include <websocket.h>
 
+#include <async_resp.hpp>
 #include <boost/container/flat_map.hpp>
 #include <boost/container/flat_set.hpp>
 #include <dbus_singleton.hpp>
@@ -116,7 +117,8 @@
     BMCWEB_ROUTE(app, "/subscribe")
         .requires({"Login"})
         .websocket()
-        .onopen([&](crow::websocket::Connection& conn) {
+        .onopen([&](crow::websocket::Connection& conn,
+                    std::shared_ptr<bmcweb::AsyncResp> asyncResp) {
             BMCWEB_LOG_DEBUG << "Connection " << &conn << " opened";
             sessions[&conn] = DbusWebsocketSession();
         })
diff --git a/include/kvm_websocket.hpp b/include/kvm_websocket.hpp
index d97b03e..306c684 100644
--- a/include/kvm_websocket.hpp
+++ b/include/kvm_websocket.hpp
@@ -3,6 +3,7 @@
 #include <sys/socket.h>
 #include <websocket.h>
 
+#include <async_resp.hpp>
 #include <boost/container/flat_map.hpp>
 #include <webserver_common.hpp>
 
@@ -161,7 +162,8 @@
     BMCWEB_ROUTE(app, "/kvm/0")
         .requires({"ConfigureComponents", "ConfigureManager"})
         .websocket()
-        .onopen([](crow::websocket::Connection& conn) {
+        .onopen([](crow::websocket::Connection& conn,
+                   std::shared_ptr<bmcweb::AsyncResp> asyncResp) {
             BMCWEB_LOG_DEBUG << "Connection " << &conn << " opened";
 
             if (sessions.size() == maxSessions)
diff --git a/include/nbd_proxy.hpp b/include/nbd_proxy.hpp
new file mode 100644
index 0000000..64578f2
--- /dev/null
+++ b/include/nbd_proxy.hpp
@@ -0,0 +1,381 @@
+/*
+// Copyright (c) 2019 Intel Corporation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+*/
+#pragma once
+#include <app.h>
+#include <websocket.h>
+
+#include <boost/asio.hpp>
+#include <boost/beast/core/buffers_to_string.hpp>
+#include <boost/beast/core/multi_buffer.hpp>
+#include <boost/container/flat_map.hpp>
+#include <dbus_utility.hpp>
+#include <experimental/filesystem>
+#include <variant>
+#include <webserver_common.hpp>
+
+namespace crow
+{
+
+namespace nbd_proxy
+{
+
+using boost::asio::local::stream_protocol;
+
+static constexpr auto nbdBufferSize = 131088;
+
+struct NbdProxyServer : std::enable_shared_from_this<NbdProxyServer>
+{
+    NbdProxyServer(crow::websocket::Connection& connIn,
+                   const std::string& socketIdIn,
+                   const std::string& endpointIdIn, const std::string& pathIn) :
+        socketId(socketIdIn),
+        endpointId(endpointIdIn), path(pathIn),
+        acceptor(connIn.get_io_context(), stream_protocol::endpoint(socketId)),
+        connection(connIn)
+    {
+    }
+
+    ~NbdProxyServer()
+    {
+        BMCWEB_LOG_DEBUG << "NbdProxyServer destructor";
+        close();
+        connection.close();
+
+        if (peerSocket)
+        {
+            BMCWEB_LOG_DEBUG << "peerSocket->close()";
+            peerSocket->close();
+            peerSocket.reset();
+            BMCWEB_LOG_DEBUG << "std::remove(" << socketId << ")";
+            std::remove(socketId.c_str());
+        }
+    }
+
+    std::string getEndpointId() const
+    {
+        return endpointId;
+    }
+
+    void run()
+    {
+        acceptor.async_accept(
+            [this, self(shared_from_this())](boost::system::error_code ec,
+                                             stream_protocol::socket socket) {
+                if (ec)
+                {
+                    BMCWEB_LOG_ERROR << "Cannot accept new connection: " << ec;
+                    return;
+                }
+                if (peerSocket)
+                {
+                    // Something is wrong - socket shouldn't be acquired at this
+                    // point
+                    BMCWEB_LOG_ERROR
+                        << "Failed to open connection - socket already used";
+                    return;
+                }
+
+                BMCWEB_LOG_DEBUG << "Connection opened";
+                peerSocket = std::move(socket);
+                doRead();
+
+                // Trigger Write if any data was sent from server
+                // Initially this is negotiation chunk
+                doWrite();
+            });
+
+        auto mountHandler = [](const boost::system::error_code ec,
+                               const bool status) {
+            if (ec)
+            {
+                BMCWEB_LOG_ERROR << "DBus error: " << ec
+                                 << ", cannot call mount method";
+                return;
+            }
+        };
+
+        crow::connections::systemBus->async_method_call(
+            std::move(mountHandler), "xyz.openbmc_project.VirtualMedia", path,
+            "xyz.openbmc_project.VirtualMedia.Proxy", "Mount");
+    }
+
+    void send(const std::string_view data)
+    {
+        boost::asio::buffer_copy(ws2uxBuf.prepare(data.size()),
+                                 boost::asio::buffer(data));
+        ws2uxBuf.commit(data.size());
+        doWrite();
+    }
+
+    void close()
+    {
+        // The reference to session should exists until unmount is
+        // called
+        auto unmountHandler = [](const boost::system::error_code ec) {
+            if (ec)
+            {
+                BMCWEB_LOG_ERROR << "DBus error: " << ec
+                                 << ", cannot call unmount method";
+                return;
+            }
+        };
+
+        crow::connections::systemBus->async_method_call(
+            std::move(unmountHandler), "xyz.openbmc_project.VirtualMedia", path,
+            "xyz.openbmc_project.VirtualMedia.Proxy", "Unmount");
+    }
+
+  private:
+    void doRead()
+    {
+        if (!peerSocket)
+        {
+            BMCWEB_LOG_DEBUG << "UNIX socket isn't created yet";
+            // Skip if UNIX socket is not created yet.
+            return;
+        }
+
+        // Trigger async read
+        peerSocket->async_read_some(
+            ux2wsBuf.prepare(nbdBufferSize),
+            [this, self(shared_from_this())](boost::system::error_code ec,
+                                             std::size_t bytesRead) {
+                if (ec)
+                {
+                    BMCWEB_LOG_ERROR << "UNIX socket: async_read_some error = "
+                                     << ec;
+                    // UNIX socket has been closed by peer, best we can do is to
+                    // break all connections
+                    close();
+                    return;
+                }
+
+                // Fetch data from UNIX socket
+
+                ux2wsBuf.commit(bytesRead);
+
+                // Paste it to WebSocket as binary
+                connection.sendBinary(
+                    boost::beast::buffers_to_string(ux2wsBuf.data()));
+                ux2wsBuf.consume(bytesRead);
+
+                // Allow further reads
+                doRead();
+            });
+    }
+
+    void doWrite()
+    {
+        if (!peerSocket)
+        {
+            BMCWEB_LOG_DEBUG << "UNIX socket isn't created yet";
+            // Skip if UNIX socket is not created yet. Collect data, and wait
+            // for nbd-client connection
+            return;
+        }
+
+        if (uxWriteInProgress)
+        {
+            BMCWEB_LOG_ERROR << "Write in progress";
+            return;
+        }
+
+        if (ws2uxBuf.size() == 0)
+        {
+            BMCWEB_LOG_ERROR << "No data to write to UNIX socket";
+            return;
+        }
+
+        uxWriteInProgress = true;
+        boost::asio::async_write(
+            *peerSocket, ws2uxBuf.data(),
+            [this, self(shared_from_this())](boost::system::error_code ec,
+                                             std::size_t bytesWritten) {
+                ws2uxBuf.consume(bytesWritten);
+                uxWriteInProgress = false;
+                if (ec)
+                {
+                    BMCWEB_LOG_ERROR << "UNIX: async_write error = " << ec;
+                    return;
+                }
+                // Retrigger doWrite if there is something in buffer
+                doWrite();
+            });
+    }
+
+    // Keeps UNIX socket endpoint file path
+    const std::string socketId;
+    const std::string endpointId;
+    const std::string path;
+
+    bool uxWriteInProgress = false;
+
+    // UNIX => WebSocket buffer
+    boost::beast::multi_buffer ux2wsBuf;
+
+    // WebSocket <= UNIX buffer
+    boost::beast::multi_buffer ws2uxBuf;
+
+    // Default acceptor for UNIX socket
+    stream_protocol::acceptor acceptor;
+
+    // The socket used to communicate with the client.
+    std::optional<stream_protocol::socket> peerSocket;
+
+    crow::websocket::Connection& connection;
+};
+
+static boost::container::flat_map<crow::websocket::Connection*,
+                                  std::shared_ptr<NbdProxyServer>>
+    sessions;
+
+void requestRoutes(CrowApp& app)
+{
+    BMCWEB_ROUTE(app, "/nbd/<str>")
+        .websocket()
+        .onopen([&app](crow::websocket::Connection& conn,
+                       std::shared_ptr<bmcweb::AsyncResp> asyncResp) {
+            BMCWEB_LOG_DEBUG << "nbd-proxy.onopen(" << &conn << ")";
+
+            for (const auto session : sessions)
+            {
+                if (session.second->getEndpointId() == conn.req.target())
+                {
+                    BMCWEB_LOG_ERROR
+                        << "Cannot open new connection - socket is in use";
+                    return;
+                }
+            }
+
+            auto openHandler = [asyncResp, &conn](
+                                   const boost::system::error_code ec,
+                                   dbus::utility::ManagedObjectType& objects) {
+                const std::string* socketValue = nullptr;
+                const std::string* endpointValue = nullptr;
+                const std::string* endpointObjectPath = nullptr;
+
+                if (ec)
+                {
+                    BMCWEB_LOG_ERROR << "DBus error: " << ec;
+                    return;
+                }
+
+                for (const auto& objectPath : objects)
+                {
+                    const auto interfaceMap = objectPath.second.find(
+                        "xyz.openbmc_project.VirtualMedia.MountPoint");
+
+                    if (interfaceMap == objectPath.second.end())
+                    {
+                        BMCWEB_LOG_DEBUG << "Cannot find MountPoint object";
+                        continue;
+                    }
+
+                    const auto endpoint =
+                        interfaceMap->second.find("EndpointId");
+                    if (endpoint == interfaceMap->second.end())
+                    {
+                        BMCWEB_LOG_DEBUG << "Cannot find EndpointId property";
+                        continue;
+                    }
+
+                    endpointValue = std::get_if<std::string>(&endpoint->second);
+
+                    if (endpointValue == nullptr)
+                    {
+                        BMCWEB_LOG_ERROR << "EndpointId property value is null";
+                        continue;
+                    }
+
+                    if (*endpointValue == conn.req.target())
+                    {
+                        const auto socket = interfaceMap->second.find("Socket");
+                        if (socket == interfaceMap->second.end())
+                        {
+                            BMCWEB_LOG_DEBUG << "Cannot find Socket property";
+                            continue;
+                        }
+
+                        socketValue = std::get_if<std::string>(&socket->second);
+                        if (socketValue == nullptr)
+                        {
+                            BMCWEB_LOG_ERROR << "Socket property value is null";
+                            continue;
+                        }
+
+                        endpointObjectPath = &objectPath.first.str;
+                        break;
+                    }
+                }
+
+                if (endpointObjectPath == nullptr)
+                {
+                    BMCWEB_LOG_ERROR << "Cannot find requested EndpointId";
+                    asyncResp->res.result(
+                        boost::beast::http::status::not_found);
+                    return;
+                }
+
+                // If the socket file exists (i.e. after bmcweb crash), we
+                // cannot reuse it.
+                std::remove((*socketValue).c_str());
+
+                sessions[&conn] = std::make_shared<NbdProxyServer>(
+                    conn, std::move(*socketValue), std::move(*endpointValue),
+                    std::move(*endpointObjectPath));
+
+                sessions[&conn]->run();
+
+                asyncResp->res.result(boost::beast::http::status::ok);
+            };
+            crow::connections::systemBus->async_method_call(
+                std::move(openHandler), "xyz.openbmc_project.VirtualMedia",
+                "/xyz/openbmc_project/VirtualMedia",
+                "org.freedesktop.DBus.ObjectManager", "GetManagedObjects");
+        })
+        .onclose(
+            [](crow::websocket::Connection& conn, const std::string& reason) {
+                BMCWEB_LOG_DEBUG << "nbd-proxy.onclose(reason = '" << reason
+                                 << "')";
+                auto session = sessions.find(&conn);
+                if (session == sessions.end())
+                {
+                    BMCWEB_LOG_DEBUG << "No session to close";
+                    return;
+                }
+                // Remove reference to session in global map
+                session->second->close();
+                sessions.erase(session);
+            })
+        .onmessage([](crow::websocket::Connection& conn,
+                      const std::string& data, bool isBinary) {
+            BMCWEB_LOG_DEBUG << "nbd-proxy.onmessage(len = " << data.length()
+                             << ")";
+            // Acquire proxy from sessions
+            auto session = sessions.find(&conn);
+            if (session != sessions.end())
+            {
+                if (session->second)
+                {
+                    session->second->send(data);
+                    return;
+                }
+            }
+            conn.close();
+        });
+}
+} // namespace nbd_proxy
+} // namespace crow
diff --git a/include/obmc_console.hpp b/include/obmc_console.hpp
index b545f96..9e5e058 100644
--- a/include/obmc_console.hpp
+++ b/include/obmc_console.hpp
@@ -3,6 +3,7 @@
 #include <sys/socket.h>
 #include <websocket.h>
 
+#include <async_resp.hpp>
 #include <boost/container/flat_map.hpp>
 #include <boost/container/flat_set.hpp>
 #include <webserver_common.hpp>
@@ -106,7 +107,8 @@
     BMCWEB_ROUTE(app, "/console0")
         .requires({"ConfigureComponents", "ConfigureManager"})
         .websocket()
-        .onopen([](crow::websocket::Connection& conn) {
+        .onopen([](crow::websocket::Connection& conn,
+                   std::shared_ptr<bmcweb::AsyncResp> asyncResp) {
             BMCWEB_LOG_DEBUG << "Connection " << &conn << " opened";
 
             sessions.insert(&conn);
diff --git a/include/vm_websocket.hpp b/include/vm_websocket.hpp
index 92ccc0d..a8380a7 100644
--- a/include/vm_websocket.hpp
+++ b/include/vm_websocket.hpp
@@ -160,7 +160,8 @@
     BMCWEB_ROUTE(app, "/vm/0/0")
         .requires({"ConfigureComponents", "ConfigureManager"})
         .websocket()
-        .onopen([](crow::websocket::Connection& conn) {
+        .onopen([](crow::websocket::Connection& conn,
+                   std::shared_ptr<bmcweb::AsyncResp> asyncResp) {
             BMCWEB_LOG_DEBUG << "Connection " << &conn << " opened";
 
             if (session != nullptr)