Add the stream mode

The stream mode behaves differently in versus the existing buffer mode.

1. It leverages rsyslog to persist logs;
2. It leverages logrotate to rotate and compress logs;
3. It persists logs as soon as they are collected.

Add configuration options to choose modes at start up time. When stream
mode is disabled, no difference compared to the existing service.

See README.md for details.

This change also adds mock classes for unit test purpose.

Change-Id: Ic7d02e826c7d9372621c096c6e768e6216974150
Signed-off-by: Nan Zhou <nanzhoumails@gmail.com>
diff --git a/src/service.cpp b/src/buffer_service.cpp
similarity index 70%
rename from src/service.cpp
rename to src/buffer_service.cpp
index a5610e4..e17f7f0 100644
--- a/src/service.cpp
+++ b/src/buffer_service.cpp
@@ -1,7 +1,7 @@
 // SPDX-License-Identifier: Apache-2.0
 // Copyright (C) 2020 YADRO
 
-#include "service.hpp"
+#include "buffer_service.hpp"
 
 #include <phosphor-logging/log.hpp>
 
@@ -31,34 +31,36 @@
 };
 // clang-format on
 
-Service::Service(const Config& config) :
-    config(config), hostConsole(config.socketId),
-    logBuffer(config.bufMaxSize, config.bufMaxTime),
-    fileStorage(config.outDir, config.socketId, config.maxFiles)
+BufferService::BufferService(const Config& config, DbusLoop& dbusLoop,
+                             HostConsole& hostConsole, LogBuffer& logBuffer,
+                             FileStorage& fileStorage) :
+    config(config),
+    dbusLoop(&dbusLoop), hostConsole(&hostConsole), logBuffer(&logBuffer),
+    fileStorage(&fileStorage)
 {}
 
-void Service::run()
+void BufferService::run()
 {
     if (config.bufFlushFull)
     {
-        logBuffer.setFullHandler([this]() { this->flush(); });
+        logBuffer->setFullHandler([this]() { this->flush(); });
     }
 
-    hostConsole.connect();
+    hostConsole->connect();
 
     // Add SIGUSR1 signal handler for manual flushing
-    dbusLoop.addSignalHandler(SIGUSR1, [this]() { this->flush(); });
+    dbusLoop->addSignalHandler(SIGUSR1, [this]() { this->flush(); });
     // Add SIGTERM signal handler for service shutdown
-    dbusLoop.addSignalHandler(SIGTERM, [this]() { this->dbusLoop.stop(0); });
+    dbusLoop->addSignalHandler(SIGTERM, [this]() { this->dbusLoop->stop(0); });
 
     // Register callback for socket IO
-    dbusLoop.addIoHandler(hostConsole, [this]() { this->readConsole(); });
+    dbusLoop->addIoHandler(*hostConsole, [this]() { this->readConsole(); });
 
     // Register host state watcher
     if (*config.hostState)
     {
-        dbusLoop.addPropertyHandler(config.hostState, watchProperties,
-                                    [this]() { this->flush(); });
+        dbusLoop->addPropertyHandler(config.hostState, watchProperties,
+                                     [this]() { this->flush(); });
     }
 
     if (!*config.hostState && !config.bufFlushFull)
@@ -76,8 +78,8 @@
                       entry("MaxFiles=%lu", config.maxFiles));
 
     // Run D-Bus event loop
-    const int rc = dbusLoop.run();
-    if (!logBuffer.empty())
+    const int rc = dbusLoop->run();
+    if (!logBuffer->empty())
     {
         flush();
     }
@@ -88,17 +90,17 @@
     }
 }
 
-void Service::flush()
+void BufferService::flush()
 {
-    if (logBuffer.empty())
+    if (logBuffer->empty())
     {
         log<level::INFO>("Ignore flush: buffer is empty");
         return;
     }
     try
     {
-        const std::string fileName = fileStorage.save(logBuffer);
-        logBuffer.clear();
+        const std::string fileName = fileStorage->save(*logBuffer);
+        logBuffer->clear();
 
         std::string msg = "Host logs flushed to ";
         msg += fileName;
@@ -110,7 +112,7 @@
     }
 }
 
-void Service::readConsole()
+void BufferService::readConsole()
 {
     constexpr size_t bufSize = 128; // enough for most line-oriented output
     std::vector<char> bufData(bufSize);
@@ -118,9 +120,9 @@
 
     try
     {
-        while (const size_t rsz = hostConsole.read(buf, bufSize))
+        while (const size_t rsz = hostConsole->read(buf, bufSize))
         {
-            logBuffer.append(buf, rsz);
+            logBuffer->append(buf, rsz);
         }
     }
     catch (const std::system_error& ex)
diff --git a/src/buffer_service.hpp b/src/buffer_service.hpp
new file mode 100644
index 0000000..a03ef3b
--- /dev/null
+++ b/src/buffer_service.hpp
@@ -0,0 +1,70 @@
+// SPDX-License-Identifier: Apache-2.0
+// Copyright (C) 2020 YADRO
+
+#pragma once
+
+#include "config.hpp"
+#include "dbus_loop.hpp"
+#include "file_storage.hpp"
+#include "host_console.hpp"
+#include "log_buffer.hpp"
+#include "service.hpp"
+
+#include <sys/un.h>
+
+/**
+ * @class BufferService
+ * @brief Buffer based log service: watches for events and handles them.
+ */
+class BufferService : public Service
+{
+  public:
+    /**
+     * @brief Constructor for buffer-only mode or buffer + stream mode.  All
+     * arguments should outlive this class.
+     *
+     * @param config service configuration.
+     * @param dbusLoop the DbusLoop instance.
+     * @param hostConsole the HostConsole instance.
+     * @param logBuffer the logBuffer instance.
+     * @param fileStorage the fileStorage instance.
+     *
+     * @throw std::exception in case of errors
+     */
+    BufferService(const Config& config, DbusLoop& dbusLoop,
+                  HostConsole& hostConsole, LogBuffer& logBuffer,
+                  FileStorage& fileStorage);
+
+    ~BufferService() override = default;
+
+    /**
+     * @brief Run the service.
+     *
+     * @throw std::exception in case of errors
+     */
+    void run() override;
+
+  protected:
+    /**
+     * @brief Flush log buffer to a file.
+     */
+    virtual void flush();
+
+    /**
+     * @brief Read data from host console and perform actions according to
+     * modes.
+     */
+    virtual void readConsole();
+
+  private:
+    /** @brief Service configuration. */
+    const Config& config;
+    /** @brief D-Bus event loop. */
+    DbusLoop* dbusLoop;
+    /** @brief Host console connection. */
+    HostConsole* hostConsole;
+    /** @brief Intermediate storage: container for parsed log messages. */
+    LogBuffer* logBuffer;
+    /** @brief Persistent storage. */
+    FileStorage* fileStorage;
+};
diff --git a/src/config.cpp b/src/config.cpp
index d7f2ee1..9a6f44c 100644
--- a/src/config.cpp
+++ b/src/config.cpp
@@ -3,12 +3,20 @@
 
 #include "config.hpp"
 
+#include <sys/un.h>
+
 #include <algorithm>
 #include <climits>
 #include <cstring>
 #include <stdexcept>
 #include <string>
 
+namespace
+{
+constexpr char bufferModeStr[] = "buffer";
+constexpr char streamModeStr[] = "stream";
+} // namespace
+
 /**
  * @brief Set boolean value from environment variable.
  *
@@ -89,18 +97,46 @@
 Config::Config()
 {
     safeSet("SOCKET_ID", socketId);
-    safeSet("BUF_MAXSIZE", bufMaxSize);
-    safeSet("BUF_MAXTIME", bufMaxTime);
-    safeSet("FLUSH_FULL", bufFlushFull);
-    safeSet("HOST_STATE", hostState);
-    safeSet("OUT_DIR", outDir);
-    safeSet("MAX_FILES", maxFiles);
-
-    // Validate parameters
-    if (bufFlushFull && !bufMaxSize && !bufMaxTime)
+    const char* mode_str = bufferModeStr;
+    safeSet("MODE", mode_str);
+    if (strcmp(mode_str, bufferModeStr) == 0)
+    {
+        mode = Mode::bufferMode;
+    }
+    else if (strcmp(mode_str, streamModeStr) == 0)
+    {
+        mode = Mode::streamMode;
+    }
+    else
     {
         throw std::invalid_argument(
-            "Flush policy is set to save the buffer as it fills, but buffer's "
-            "limits are not defined");
+            "Invalid value for mode; expect either 'stream' or 'buffer'");
+    }
+
+    if (mode == Mode::bufferMode)
+    {
+        safeSet("BUF_MAXSIZE", bufMaxSize);
+        safeSet("BUF_MAXTIME", bufMaxTime);
+        safeSet("FLUSH_FULL", bufFlushFull);
+        safeSet("HOST_STATE", hostState);
+        safeSet("OUT_DIR", outDir);
+        safeSet("MAX_FILES", maxFiles);
+        // Validate parameters
+        if (bufFlushFull && !bufMaxSize && !bufMaxTime)
+        {
+            throw std::invalid_argument("Flush policy is set to save the "
+                                        "buffer as it fills, but buffer's "
+                                        "limits are not defined");
+        }
+    }
+    else
+    {
+        // mode == Mode::streamMode
+        safeSet("STREAM_DST", streamDestination);
+        // We need an extra +1 for null terminator.
+        if (strlen(streamDestination) + 1 > sizeof(sockaddr_un::sun_path))
+        {
+            throw std::invalid_argument("Invalid STREAM_DST: too long");
+        }
     }
 }
diff --git a/src/config.hpp b/src/config.hpp
index a2aa614..1a213b4 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -5,6 +5,12 @@
 
 #include <cstddef>
 
+enum class Mode
+{
+    bufferMode,
+    streamMode
+};
+
 /**
  * @struct Config
  * @brief Configuration of the service, initialized with default values.
@@ -18,8 +24,13 @@
      */
     Config();
 
+    /** The following configs are for both modes. */
     /** @brief Socket ID used for connection with host console. */
     const char* socketId = "";
+    /** @brief The mode the service is in. */
+    Mode mode = Mode::bufferMode;
+
+    /** The following configs are for buffer mode. */
     /** @brief Max number of messages stored inside intermediate buffer. */
     size_t bufMaxSize = 3000;
     /** @brief Max age of messages (in minutes) inside intermediate buffer. */
@@ -32,4 +43,8 @@
     const char* outDir = "/var/lib/obmc/hostlogs";
     /** @brief Max number of log files in the output directory. */
     size_t maxFiles = 10;
+
+    /** The following configs are for stream mode. */
+    /** @brief Path to the unix socket that receives the log stream. */
+    const char* streamDestination = "/run/rsyslog/console_input";
 };
diff --git a/src/dbus_loop.hpp b/src/dbus_loop.hpp
index eeb5327..a96d5ac 100644
--- a/src/dbus_loop.hpp
+++ b/src/dbus_loop.hpp
@@ -26,14 +26,14 @@
     using WatchProperties = std::map<std::string, Properties>;
 
     DbusLoop();
-    ~DbusLoop();
+    virtual ~DbusLoop();
 
     /**
      * @brief Run worker loop.
      *
      * @return exit code from loop
      */
-    int run() const;
+    virtual int run() const;
 
     /**
      * @brief Stop worker loop.
@@ -53,9 +53,9 @@
      *
      * @throw std::system_error in case of errors
      */
-    void addPropertyHandler(const std::string& objPath,
-                            const WatchProperties& props,
-                            std::function<void()> callback);
+    virtual void addPropertyHandler(const std::string& objPath,
+                                    const WatchProperties& props,
+                                    std::function<void()> callback);
 
     /**
      * @brief Add IO event handler.
@@ -65,7 +65,7 @@
      *
      * @throw std::system_error in case of errors
      */
-    void addIoHandler(int fd, std::function<void()> callback);
+    virtual void addIoHandler(int fd, std::function<void()> callback);
 
     /**
      * @brief Add signal handler.
@@ -75,7 +75,7 @@
      *
      * @throw std::system_error in case of errors
      */
-    void addSignalHandler(int signal, std::function<void()> callback);
+    virtual void addSignalHandler(int signal, std::function<void()> callback);
 
   private:
     /**
diff --git a/src/file_storage.hpp b/src/file_storage.hpp
index 0292440..cf5aef4 100644
--- a/src/file_storage.hpp
+++ b/src/file_storage.hpp
@@ -26,6 +26,8 @@
     FileStorage(const std::string& path, const std::string& prefix,
                 size_t maxFiles);
 
+    virtual ~FileStorage() = default;
+
     /**
      * @brief Save log buffer to a file.
      *
@@ -35,7 +37,7 @@
      *
      * @return path to saved file
      */
-    std::string save(const LogBuffer& buf) const;
+    virtual std::string save(const LogBuffer& buf) const;
 
   private:
     /**
diff --git a/src/host_console.hpp b/src/host_console.hpp
index b0369af..73a5bfd 100644
--- a/src/host_console.hpp
+++ b/src/host_console.hpp
@@ -19,7 +19,7 @@
      */
     HostConsole(const std::string& socketId);
 
-    ~HostConsole();
+    virtual ~HostConsole();
 
     /**
      * @brief Connect to the host's console via socket.
@@ -27,7 +27,7 @@
      * @throw std::invalid_argument if socket ID is invalid
      * @throw std::system_error in case of other errors
      */
-    void connect();
+    virtual void connect();
 
     /**
      * @brief Non-blocking read data from console's socket.
@@ -39,10 +39,10 @@
      *
      * @return number of actually read bytes
      */
-    size_t read(char* buf, size_t sz) const;
+    virtual size_t read(char* buf, size_t sz) const;
 
     /** @brief Get socket file descriptor, used for watching IO. */
-    operator int() const;
+    virtual operator int() const;
 
   private:
     /** @brief Socket Id. */
diff --git a/src/log_buffer.hpp b/src/log_buffer.hpp
index 24e7ad7..be1806b 100644
--- a/src/log_buffer.hpp
+++ b/src/log_buffer.hpp
@@ -37,25 +37,27 @@
      */
     LogBuffer(size_t maxSize, size_t maxTime);
 
+    virtual ~LogBuffer() = default;
+
     /**
      * @brief Add raw data from host's console output.
      *
      * @param[in] data pointer to raw data buffer
      * @param[in] sz size of the buffer in bytes
      */
-    void append(const char* data, size_t sz);
+    virtual void append(const char* data, size_t sz);
 
     /**
      * @brief Set handler called if buffer is full.
      *
      * @param[in] cb callback function
      */
-    void setFullHandler(std::function<void()> cb);
+    virtual void setFullHandler(std::function<void()> cb);
 
     /** @brief Clear (reset) container. */
-    void clear();
+    virtual void clear();
     /** @brief Check container for empty. */
-    bool empty() const;
+    virtual bool empty() const;
     /** @brief Get container's iterator. */
     container_t::const_iterator begin() const;
     /** @brief Get container's iterator. */
diff --git a/src/main.cpp b/src/main.cpp
index cb8d2bc..756440f 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -1,12 +1,16 @@
 // SPDX-License-Identifier: Apache-2.0
 // Copyright (C) 2020 YADRO
 
+#include "buffer_service.hpp"
 #include "config.hpp"
 #include "service.hpp"
+#include "stream_service.hpp"
 #include "version.hpp"
 
 #include <getopt.h>
 
+#include <phosphor-logging/log.hpp>
+
 /** @brief Print version info. */
 static void printVersion()
 {
@@ -63,9 +67,28 @@
 
     try
     {
-        Config cfg;
-        Service svc(cfg);
-        svc.run();
+        Config config;
+        DbusLoop dbus_loop;
+        HostConsole host_console(config.socketId);
+        using phosphor::logging::level;
+        using phosphor::logging::log;
+        if (config.mode == Mode::streamMode)
+        {
+            log<level::INFO>("HostLogger is in stream mode.");
+            StreamService service(config.streamDestination, dbus_loop,
+                                  host_console);
+            service.run();
+        }
+        else
+        {
+            log<level::INFO>("HostLogger is in buffer mode.");
+            LogBuffer logBuffer(config.bufMaxSize, config.bufMaxTime);
+            FileStorage fileStorage(config.outDir, config.socketId,
+                                    config.maxFiles);
+            BufferService service(config, dbus_loop, host_console, logBuffer,
+                                  fileStorage);
+            service.run();
+        }
     }
     catch (const std::exception& ex)
     {
diff --git a/src/service.hpp b/src/service.hpp
index 86c02c7..219260c 100644
--- a/src/service.hpp
+++ b/src/service.hpp
@@ -3,55 +3,17 @@
 
 #pragma once
 
-#include "config.hpp"
-#include "dbus_loop.hpp"
-#include "file_storage.hpp"
-#include "host_console.hpp"
-#include "log_buffer.hpp"
-
 /**
  * @class Service
- * @brief Log service: watches for events and handles them.
+ * @brief The log service interface
  */
 class Service
 {
   public:
-    /**
-     * @brief Constructor.
-     *
-     * @param[in] config service configuration
-     *
-     * @throw std::exception in case of errors
-     */
-    Service(const Config& config);
+    virtual ~Service() = default;
 
     /**
      * @brief Run the service.
-     *
-     * @throw std::exception in case of errors
      */
-    void run();
-
-  private:
-    /**
-     * @brief Flush log buffer to a file.
-     */
-    void flush();
-
-    /**
-     * @brief Read data from host console and put it into the log buffer.
-     */
-    void readConsole();
-
-  private:
-    /** @brief Service configuration. */
-    const Config& config;
-    /** @brief D-Bus event loop. */
-    DbusLoop dbusLoop;
-    /** @brief Host console connection. */
-    HostConsole hostConsole;
-    /** @brief Intermediate storage: container for parsed log messages. */
-    LogBuffer logBuffer;
-    /** @brief Persistent storage. */
-    FileStorage fileStorage;
+    virtual void run() = 0;
 };
diff --git a/src/stream_service.cpp b/src/stream_service.cpp
new file mode 100644
index 0000000..b8d84e9
--- /dev/null
+++ b/src/stream_service.cpp
@@ -0,0 +1,106 @@
+// SPDX-License-Identifier: Apache-2.0
+// Copyright (C) 2021 Google
+
+#include "stream_service.hpp"
+
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#include <phosphor-logging/log.hpp>
+
+#include <vector>
+
+using namespace phosphor::logging;
+
+StreamService::StreamService(const char* streamDestination, DbusLoop& dbusLoop,
+                             HostConsole& hostConsole) :
+    destinationPath(streamDestination),
+    dbusLoop(&dbusLoop), hostConsole(&hostConsole), outputSocketFd(-1),
+    destination()
+{}
+
+StreamService::~StreamService()
+{
+    if (outputSocketFd != -1)
+    {
+        close(outputSocketFd);
+    }
+}
+
+void StreamService::run()
+{
+    setStreamSocket();
+    hostConsole->connect();
+    // Add SIGTERM signal handler for service shutdown
+    dbusLoop->addSignalHandler(SIGTERM, [this]() { this->dbusLoop->stop(0); });
+    // Register callback for socket IO
+    dbusLoop->addIoHandler(*hostConsole, [this]() { this->readConsole(); });
+
+    // Run D-Bus event loop
+    const int rc = dbusLoop->run();
+    if (rc < 0)
+    {
+        std::error_code ec(-rc, std::generic_category());
+        throw std::system_error(ec, "Error in event loop");
+    }
+}
+
+void StreamService::readConsole()
+{
+    constexpr size_t bufSize = 128; // enough for most line-oriented output
+    std::vector<char> bufData(bufSize);
+    char* buf = bufData.data();
+
+    try
+    {
+        while (const size_t rsz = hostConsole->read(buf, bufSize))
+        {
+            streamConsole(buf, rsz);
+        }
+    }
+    catch (const std::system_error& ex)
+    {
+        log<level::ERR>(ex.what());
+    }
+}
+
+void StreamService::streamConsole(const char* data, size_t len)
+{
+    // Send all received characters in a blocking manner.
+    size_t sent = 0;
+    while (sent < len)
+    {
+        // Datagram sockets preserve message boundaries. Furthermore,
+        // In most implementation, UNIX domain datagram sockets are
+        // always reliable and don't reorder datagrams.
+        ssize_t curr_sent =
+            sendto(outputSocketFd, data + sent, len - sent, 0,
+                   reinterpret_cast<const sockaddr*>(&destination),
+                   sizeof(destination) - sizeof(destination.sun_path) +
+                       strlen(destinationPath + 1) + 1);
+        if (curr_sent == -1)
+        {
+            std::string error = "Unable to send to the destination ";
+            error += destinationPath;
+            std::error_code ec(errno ? errno : EIO, std::generic_category());
+            throw std::system_error(ec, error);
+        }
+        sent += curr_sent;
+    }
+}
+
+void StreamService::setStreamSocket()
+{
+    destination.sun_family = AF_UNIX;
+    // To deal with abstract namespace unix socket.
+    size_t len = strlen(destinationPath + 1) + 1;
+    memcpy(destination.sun_path, destinationPath, len);
+    destination.sun_path[len] = '\0';
+    outputSocketFd = socket(AF_UNIX, SOCK_DGRAM, 0);
+    if (outputSocketFd == -1)
+    {
+        std::error_code ec(errno ? errno : EIO, std::generic_category());
+        throw std::system_error(ec, "Unable to create output socket.");
+    }
+}
diff --git a/src/stream_service.hpp b/src/stream_service.hpp
new file mode 100644
index 0000000..4aaba41
--- /dev/null
+++ b/src/stream_service.hpp
@@ -0,0 +1,80 @@
+// SPDX-License-Identifier: Apache-2.0
+// Copyright (C) 2021 Google
+
+#pragma once
+
+#include "config.hpp"
+#include "dbus_loop.hpp"
+#include "file_storage.hpp"
+#include "host_console.hpp"
+#include "log_buffer.hpp"
+#include "service.hpp"
+
+#include <sys/un.h>
+
+/**
+ * @class Service
+ * @brief Log service: watches for events and handles them.
+ */
+class StreamService : public Service
+{
+  public:
+    /**
+     * @brief Constructor for stream-only mode. All arguments should outlive
+     * this class.
+     *
+     * @param streamDestination the destination socket to stream logs.
+     * @param dbusLoop the DbusLoop instance.
+     * @param hostConsole the HostConsole instance.
+     */
+    StreamService(const char* streamDestination, DbusLoop& dbusLoop,
+                  HostConsole& hostConsole);
+
+    /**
+     * @brief Destructor; close the file descriptor.
+     */
+    ~StreamService() override;
+
+    /**
+     * @brief Run the service.
+     *
+     * @throw std::exception in case of errors
+     */
+    void run() override;
+
+  protected:
+    /**
+     * @brief Read data from host console and perform actions according to
+     * modes.
+     */
+    virtual void readConsole();
+
+    /**
+     * @brief Stream console data to a datagram unix socket.
+     *
+     * @param data the bytes to stream
+     * @param len the length of the bytes array
+     *
+     * @throw std::exception in case of errors
+     */
+    virtual void streamConsole(const char* data, size_t len);
+
+    /**
+     * @brief Set up stream socket
+     *
+     * @throw std::exception in case of errors
+     */
+    virtual void setStreamSocket();
+
+  private:
+    /** @brief Path to the destination (the rsyslog unix socket) */
+    const char* destinationPath;
+    /** @brief D-Bus event loop. */
+    DbusLoop* dbusLoop;
+    /** @brief Host console connection. */
+    HostConsole* hostConsole;
+    /** @brief File descriptor of the ouput socket */
+    int outputSocketFd;
+    /** @brief Address of the destination (the rsyslog unix socket) */
+    sockaddr_un destination;
+};