Add the stream mode

The stream mode behaves differently in versus the existing buffer mode.

1. It leverages rsyslog to persist logs;
2. It leverages logrotate to rotate and compress logs;
3. It persists logs as soon as they are collected.

Add configuration options to choose modes at start up time. When stream
mode is disabled, no difference compared to the existing service.

See README.md for details.

This change also adds mock classes for unit test purpose.

Change-Id: Ic7d02e826c7d9372621c096c6e768e6216974150
Signed-off-by: Nan Zhou <nanzhoumails@gmail.com>
diff --git a/README.md b/README.md
index 0c08f13..b263e1c 100644
--- a/README.md
+++ b/README.md
@@ -4,15 +4,22 @@
 console output data, such as boot logs or Linux kernel messages printed to the
 system console.
 
-Host logs are stored in a temporary buffer and flushed to a file according to
-the policy that can be defined with service parameters. It gives the ability to
-save the last boot log and subsequent messages in separate files.
+There are two modes in Host Logger.
+
+Buffer mode: host logs are stored in a temporary buffer and flushed to a file
+according to the policy that can be defined with service parameters. It gives
+the ability to save the startup logs and shutdown logs in separate files.
+
+Stream mode: host logs are forwarded into a server socket (e.g. a socket created
+by the rsyslog imuxsock module). It gives the ability to stream logs in almost
+realtime.
 
 ## Architecture
 
 Host Logger is a standalone service (daemon) that works on top of the
 obmc-console and uses its UNIX domain socket to read the console output.
 
+### The Buffer Mode
 ```
 +-------------+                                       +----------------+
 |    Host     | State  +---------------------+ Event  |   Host Logger  |
@@ -28,13 +35,36 @@
 ```
 
 Unlike the obmc-console project, where console output is a binary byte stream,
-the Host Logger service interprets this stream: splits it into separate
+the service in buffer mode interprets this stream: splits it into separate
 messages, adds a time stamp and pushes the message into an internal buffer.
 Maximum size of the buffer and flush conditions are controlled by service
 parameters.
 
+### The Stream Mode
+
+```
++-----------+                               +-------------+      +------------+
+|           |                               |             |      |            |
+|   Host    |                               | Host Logger |      |  rsyslog   |
+|           |                               |             |      | +--------+ |  +---------+
+|           |                               |             |      | | omfile ----->Log Files|--+
+|           |   +--------------------+      |             |      | +--------+ |  +---------+  |
+|           |   |obmc-console-server |      |             |      |            |  +------------v---+
+|+---------+|   |   +-------------+  |Stream|+-----------+|DGRAM | +--------+ |  | Redfish        |
+|| console -----|-->| UNIX socket |---------->  Service  --------->|imuxsock| |  | LogService &   |
+|+---------+|   |   +-------------+  |      |+-----------+|      | +--------+ |  | EventService   |
++-----------+   +--------------------+      +-------------+      +------------+  +----------------+
+```
+
+The service in stream mode forwards the byte stream into rsyslog via the imuxsock
+module. The log is persisted via the omfile module as soon as collected. It makes
+Host Logger leverage exsisting tools (rsyslog and logrotate). It also fits in the
+Redfish LogService and EventService architecture in OpenBMC.
+
 ## Log buffer rotation policy
 
+### The Buffer Mode
+
 Maximum buffer size can be defined in the service configuration using two ways:
 - Limits by size: buffer will store the last N messages, the oldest messages are
   removed. Controlled by `BUF_MAXSIZE` option.
@@ -43,8 +73,15 @@
 
 Any of these parameters can be combined.
 
+### The Stream Mode
+
+Rotation and compression are handled by the [logrotate](https://linux.die.net/man/8/logrotate)
+tool.
+
 ## Log buffer flush policy
 
+### The Buffer Mode
+
 Messages from the buffer will be written to a file when one of the following
 events occurs:
 - Host changes its state (start, reboot or shut down). The service watches the
@@ -53,6 +90,10 @@
   `BUF_MAXTIME` parameters, this mode can be activated by `FLUSH_FULL` flag.
 - Signal `SIGUSR1` is received (manual flush).
 
+### The Stream Mode
+
+Logs are flushed as soon as they are collected.
+
 ## Configuration
 
 Configuration of the service is loaded from environment variables, so each
@@ -69,6 +110,10 @@
 - `SOCKET_ID`: Socket Id used for connection with the host console. This Id
   shall match the "socket-id" parameter of obmc-console server.
   The default value is empty (single-host mode).
+- `MODE`: The mode that the service is running in. Possible values: `buffer`
+  or `stream`. The default value is `buffer`.
+
+#### The Buffer Mode
 
 - `BUF_MAXSIZE`: Max number of stored messages in the buffer. The default value
   is `3000` (0=unlimited).
@@ -93,6 +138,11 @@
 - `MAX_FILES`: Log files rotation, max number of files in the output directory,
   oldest files are removed. The default value is `10` (0=unlimited).
 
+#### The Stream Mode
+
+- `STREAM_DST`: Absolute path to the output unix socket. The default value is
+  `/run/rsyslog/console_input`.
+
 ### Example
 
 #### Remove file limits
diff --git a/default.conf b/default.conf
index c751864..5e35f4f 100644
--- a/default.conf
+++ b/default.conf
@@ -1,4 +1,5 @@
 SOCKET_ID=
+MODE=buffer
 BUF_MAXSIZE=3000
 BUF_MAXTIME=0
 FLUSH_FULL=false
diff --git a/meson.build b/meson.build
index 75958fe..ee8ecdb 100644
--- a/meson.build
+++ b/meson.build
@@ -44,7 +44,8 @@
     'src/host_console.cpp',
     'src/log_buffer.cpp',
     'src/main.cpp',
-    'src/service.cpp',
+    'src/buffer_service.cpp',
+    'src/stream_service.cpp',
     'src/zlib_exception.cpp',
     'src/zlib_file.cpp',
   ],
diff --git a/src/service.cpp b/src/buffer_service.cpp
similarity index 70%
rename from src/service.cpp
rename to src/buffer_service.cpp
index a5610e4..e17f7f0 100644
--- a/src/service.cpp
+++ b/src/buffer_service.cpp
@@ -1,7 +1,7 @@
 // SPDX-License-Identifier: Apache-2.0
 // Copyright (C) 2020 YADRO
 
-#include "service.hpp"
+#include "buffer_service.hpp"
 
 #include <phosphor-logging/log.hpp>
 
@@ -31,34 +31,36 @@
 };
 // clang-format on
 
-Service::Service(const Config& config) :
-    config(config), hostConsole(config.socketId),
-    logBuffer(config.bufMaxSize, config.bufMaxTime),
-    fileStorage(config.outDir, config.socketId, config.maxFiles)
+BufferService::BufferService(const Config& config, DbusLoop& dbusLoop,
+                             HostConsole& hostConsole, LogBuffer& logBuffer,
+                             FileStorage& fileStorage) :
+    config(config),
+    dbusLoop(&dbusLoop), hostConsole(&hostConsole), logBuffer(&logBuffer),
+    fileStorage(&fileStorage)
 {}
 
-void Service::run()
+void BufferService::run()
 {
     if (config.bufFlushFull)
     {
-        logBuffer.setFullHandler([this]() { this->flush(); });
+        logBuffer->setFullHandler([this]() { this->flush(); });
     }
 
-    hostConsole.connect();
+    hostConsole->connect();
 
     // Add SIGUSR1 signal handler for manual flushing
-    dbusLoop.addSignalHandler(SIGUSR1, [this]() { this->flush(); });
+    dbusLoop->addSignalHandler(SIGUSR1, [this]() { this->flush(); });
     // Add SIGTERM signal handler for service shutdown
-    dbusLoop.addSignalHandler(SIGTERM, [this]() { this->dbusLoop.stop(0); });
+    dbusLoop->addSignalHandler(SIGTERM, [this]() { this->dbusLoop->stop(0); });
 
     // Register callback for socket IO
-    dbusLoop.addIoHandler(hostConsole, [this]() { this->readConsole(); });
+    dbusLoop->addIoHandler(*hostConsole, [this]() { this->readConsole(); });
 
     // Register host state watcher
     if (*config.hostState)
     {
-        dbusLoop.addPropertyHandler(config.hostState, watchProperties,
-                                    [this]() { this->flush(); });
+        dbusLoop->addPropertyHandler(config.hostState, watchProperties,
+                                     [this]() { this->flush(); });
     }
 
     if (!*config.hostState && !config.bufFlushFull)
@@ -76,8 +78,8 @@
                       entry("MaxFiles=%lu", config.maxFiles));
 
     // Run D-Bus event loop
-    const int rc = dbusLoop.run();
-    if (!logBuffer.empty())
+    const int rc = dbusLoop->run();
+    if (!logBuffer->empty())
     {
         flush();
     }
@@ -88,17 +90,17 @@
     }
 }
 
-void Service::flush()
+void BufferService::flush()
 {
-    if (logBuffer.empty())
+    if (logBuffer->empty())
     {
         log<level::INFO>("Ignore flush: buffer is empty");
         return;
     }
     try
     {
-        const std::string fileName = fileStorage.save(logBuffer);
-        logBuffer.clear();
+        const std::string fileName = fileStorage->save(*logBuffer);
+        logBuffer->clear();
 
         std::string msg = "Host logs flushed to ";
         msg += fileName;
@@ -110,7 +112,7 @@
     }
 }
 
-void Service::readConsole()
+void BufferService::readConsole()
 {
     constexpr size_t bufSize = 128; // enough for most line-oriented output
     std::vector<char> bufData(bufSize);
@@ -118,9 +120,9 @@
 
     try
     {
-        while (const size_t rsz = hostConsole.read(buf, bufSize))
+        while (const size_t rsz = hostConsole->read(buf, bufSize))
         {
-            logBuffer.append(buf, rsz);
+            logBuffer->append(buf, rsz);
         }
     }
     catch (const std::system_error& ex)
diff --git a/src/buffer_service.hpp b/src/buffer_service.hpp
new file mode 100644
index 0000000..a03ef3b
--- /dev/null
+++ b/src/buffer_service.hpp
@@ -0,0 +1,70 @@
+// SPDX-License-Identifier: Apache-2.0
+// Copyright (C) 2020 YADRO
+
+#pragma once
+
+#include "config.hpp"
+#include "dbus_loop.hpp"
+#include "file_storage.hpp"
+#include "host_console.hpp"
+#include "log_buffer.hpp"
+#include "service.hpp"
+
+#include <sys/un.h>
+
+/**
+ * @class BufferService
+ * @brief Buffer based log service: watches for events and handles them.
+ */
+class BufferService : public Service
+{
+  public:
+    /**
+     * @brief Constructor for buffer-only mode or buffer + stream mode.  All
+     * arguments should outlive this class.
+     *
+     * @param config service configuration.
+     * @param dbusLoop the DbusLoop instance.
+     * @param hostConsole the HostConsole instance.
+     * @param logBuffer the logBuffer instance.
+     * @param fileStorage the fileStorage instance.
+     *
+     * @throw std::exception in case of errors
+     */
+    BufferService(const Config& config, DbusLoop& dbusLoop,
+                  HostConsole& hostConsole, LogBuffer& logBuffer,
+                  FileStorage& fileStorage);
+
+    ~BufferService() override = default;
+
+    /**
+     * @brief Run the service.
+     *
+     * @throw std::exception in case of errors
+     */
+    void run() override;
+
+  protected:
+    /**
+     * @brief Flush log buffer to a file.
+     */
+    virtual void flush();
+
+    /**
+     * @brief Read data from host console and perform actions according to
+     * modes.
+     */
+    virtual void readConsole();
+
+  private:
+    /** @brief Service configuration. */
+    const Config& config;
+    /** @brief D-Bus event loop. */
+    DbusLoop* dbusLoop;
+    /** @brief Host console connection. */
+    HostConsole* hostConsole;
+    /** @brief Intermediate storage: container for parsed log messages. */
+    LogBuffer* logBuffer;
+    /** @brief Persistent storage. */
+    FileStorage* fileStorage;
+};
diff --git a/src/config.cpp b/src/config.cpp
index d7f2ee1..9a6f44c 100644
--- a/src/config.cpp
+++ b/src/config.cpp
@@ -3,12 +3,20 @@
 
 #include "config.hpp"
 
+#include <sys/un.h>
+
 #include <algorithm>
 #include <climits>
 #include <cstring>
 #include <stdexcept>
 #include <string>
 
+namespace
+{
+constexpr char bufferModeStr[] = "buffer";
+constexpr char streamModeStr[] = "stream";
+} // namespace
+
 /**
  * @brief Set boolean value from environment variable.
  *
@@ -89,18 +97,46 @@
 Config::Config()
 {
     safeSet("SOCKET_ID", socketId);
-    safeSet("BUF_MAXSIZE", bufMaxSize);
-    safeSet("BUF_MAXTIME", bufMaxTime);
-    safeSet("FLUSH_FULL", bufFlushFull);
-    safeSet("HOST_STATE", hostState);
-    safeSet("OUT_DIR", outDir);
-    safeSet("MAX_FILES", maxFiles);
-
-    // Validate parameters
-    if (bufFlushFull && !bufMaxSize && !bufMaxTime)
+    const char* mode_str = bufferModeStr;
+    safeSet("MODE", mode_str);
+    if (strcmp(mode_str, bufferModeStr) == 0)
+    {
+        mode = Mode::bufferMode;
+    }
+    else if (strcmp(mode_str, streamModeStr) == 0)
+    {
+        mode = Mode::streamMode;
+    }
+    else
     {
         throw std::invalid_argument(
-            "Flush policy is set to save the buffer as it fills, but buffer's "
-            "limits are not defined");
+            "Invalid value for mode; expect either 'stream' or 'buffer'");
+    }
+
+    if (mode == Mode::bufferMode)
+    {
+        safeSet("BUF_MAXSIZE", bufMaxSize);
+        safeSet("BUF_MAXTIME", bufMaxTime);
+        safeSet("FLUSH_FULL", bufFlushFull);
+        safeSet("HOST_STATE", hostState);
+        safeSet("OUT_DIR", outDir);
+        safeSet("MAX_FILES", maxFiles);
+        // Validate parameters
+        if (bufFlushFull && !bufMaxSize && !bufMaxTime)
+        {
+            throw std::invalid_argument("Flush policy is set to save the "
+                                        "buffer as it fills, but buffer's "
+                                        "limits are not defined");
+        }
+    }
+    else
+    {
+        // mode == Mode::streamMode
+        safeSet("STREAM_DST", streamDestination);
+        // We need an extra +1 for null terminator.
+        if (strlen(streamDestination) + 1 > sizeof(sockaddr_un::sun_path))
+        {
+            throw std::invalid_argument("Invalid STREAM_DST: too long");
+        }
     }
 }
diff --git a/src/config.hpp b/src/config.hpp
index a2aa614..1a213b4 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -5,6 +5,12 @@
 
 #include <cstddef>
 
+enum class Mode
+{
+    bufferMode,
+    streamMode
+};
+
 /**
  * @struct Config
  * @brief Configuration of the service, initialized with default values.
@@ -18,8 +24,13 @@
      */
     Config();
 
+    /** The following configs are for both modes. */
     /** @brief Socket ID used for connection with host console. */
     const char* socketId = "";
+    /** @brief The mode the service is in. */
+    Mode mode = Mode::bufferMode;
+
+    /** The following configs are for buffer mode. */
     /** @brief Max number of messages stored inside intermediate buffer. */
     size_t bufMaxSize = 3000;
     /** @brief Max age of messages (in minutes) inside intermediate buffer. */
@@ -32,4 +43,8 @@
     const char* outDir = "/var/lib/obmc/hostlogs";
     /** @brief Max number of log files in the output directory. */
     size_t maxFiles = 10;
+
+    /** The following configs are for stream mode. */
+    /** @brief Path to the unix socket that receives the log stream. */
+    const char* streamDestination = "/run/rsyslog/console_input";
 };
diff --git a/src/dbus_loop.hpp b/src/dbus_loop.hpp
index eeb5327..a96d5ac 100644
--- a/src/dbus_loop.hpp
+++ b/src/dbus_loop.hpp
@@ -26,14 +26,14 @@
     using WatchProperties = std::map<std::string, Properties>;
 
     DbusLoop();
-    ~DbusLoop();
+    virtual ~DbusLoop();
 
     /**
      * @brief Run worker loop.
      *
      * @return exit code from loop
      */
-    int run() const;
+    virtual int run() const;
 
     /**
      * @brief Stop worker loop.
@@ -53,9 +53,9 @@
      *
      * @throw std::system_error in case of errors
      */
-    void addPropertyHandler(const std::string& objPath,
-                            const WatchProperties& props,
-                            std::function<void()> callback);
+    virtual void addPropertyHandler(const std::string& objPath,
+                                    const WatchProperties& props,
+                                    std::function<void()> callback);
 
     /**
      * @brief Add IO event handler.
@@ -65,7 +65,7 @@
      *
      * @throw std::system_error in case of errors
      */
-    void addIoHandler(int fd, std::function<void()> callback);
+    virtual void addIoHandler(int fd, std::function<void()> callback);
 
     /**
      * @brief Add signal handler.
@@ -75,7 +75,7 @@
      *
      * @throw std::system_error in case of errors
      */
-    void addSignalHandler(int signal, std::function<void()> callback);
+    virtual void addSignalHandler(int signal, std::function<void()> callback);
 
   private:
     /**
diff --git a/src/file_storage.hpp b/src/file_storage.hpp
index 0292440..cf5aef4 100644
--- a/src/file_storage.hpp
+++ b/src/file_storage.hpp
@@ -26,6 +26,8 @@
     FileStorage(const std::string& path, const std::string& prefix,
                 size_t maxFiles);
 
+    virtual ~FileStorage() = default;
+
     /**
      * @brief Save log buffer to a file.
      *
@@ -35,7 +37,7 @@
      *
      * @return path to saved file
      */
-    std::string save(const LogBuffer& buf) const;
+    virtual std::string save(const LogBuffer& buf) const;
 
   private:
     /**
diff --git a/src/host_console.hpp b/src/host_console.hpp
index b0369af..73a5bfd 100644
--- a/src/host_console.hpp
+++ b/src/host_console.hpp
@@ -19,7 +19,7 @@
      */
     HostConsole(const std::string& socketId);
 
-    ~HostConsole();
+    virtual ~HostConsole();
 
     /**
      * @brief Connect to the host's console via socket.
@@ -27,7 +27,7 @@
      * @throw std::invalid_argument if socket ID is invalid
      * @throw std::system_error in case of other errors
      */
-    void connect();
+    virtual void connect();
 
     /**
      * @brief Non-blocking read data from console's socket.
@@ -39,10 +39,10 @@
      *
      * @return number of actually read bytes
      */
-    size_t read(char* buf, size_t sz) const;
+    virtual size_t read(char* buf, size_t sz) const;
 
     /** @brief Get socket file descriptor, used for watching IO. */
-    operator int() const;
+    virtual operator int() const;
 
   private:
     /** @brief Socket Id. */
diff --git a/src/log_buffer.hpp b/src/log_buffer.hpp
index 24e7ad7..be1806b 100644
--- a/src/log_buffer.hpp
+++ b/src/log_buffer.hpp
@@ -37,25 +37,27 @@
      */
     LogBuffer(size_t maxSize, size_t maxTime);
 
+    virtual ~LogBuffer() = default;
+
     /**
      * @brief Add raw data from host's console output.
      *
      * @param[in] data pointer to raw data buffer
      * @param[in] sz size of the buffer in bytes
      */
-    void append(const char* data, size_t sz);
+    virtual void append(const char* data, size_t sz);
 
     /**
      * @brief Set handler called if buffer is full.
      *
      * @param[in] cb callback function
      */
-    void setFullHandler(std::function<void()> cb);
+    virtual void setFullHandler(std::function<void()> cb);
 
     /** @brief Clear (reset) container. */
-    void clear();
+    virtual void clear();
     /** @brief Check container for empty. */
-    bool empty() const;
+    virtual bool empty() const;
     /** @brief Get container's iterator. */
     container_t::const_iterator begin() const;
     /** @brief Get container's iterator. */
diff --git a/src/main.cpp b/src/main.cpp
index cb8d2bc..756440f 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -1,12 +1,16 @@
 // SPDX-License-Identifier: Apache-2.0
 // Copyright (C) 2020 YADRO
 
+#include "buffer_service.hpp"
 #include "config.hpp"
 #include "service.hpp"
+#include "stream_service.hpp"
 #include "version.hpp"
 
 #include <getopt.h>
 
+#include <phosphor-logging/log.hpp>
+
 /** @brief Print version info. */
 static void printVersion()
 {
@@ -63,9 +67,28 @@
 
     try
     {
-        Config cfg;
-        Service svc(cfg);
-        svc.run();
+        Config config;
+        DbusLoop dbus_loop;
+        HostConsole host_console(config.socketId);
+        using phosphor::logging::level;
+        using phosphor::logging::log;
+        if (config.mode == Mode::streamMode)
+        {
+            log<level::INFO>("HostLogger is in stream mode.");
+            StreamService service(config.streamDestination, dbus_loop,
+                                  host_console);
+            service.run();
+        }
+        else
+        {
+            log<level::INFO>("HostLogger is in buffer mode.");
+            LogBuffer logBuffer(config.bufMaxSize, config.bufMaxTime);
+            FileStorage fileStorage(config.outDir, config.socketId,
+                                    config.maxFiles);
+            BufferService service(config, dbus_loop, host_console, logBuffer,
+                                  fileStorage);
+            service.run();
+        }
     }
     catch (const std::exception& ex)
     {
diff --git a/src/service.hpp b/src/service.hpp
index 86c02c7..219260c 100644
--- a/src/service.hpp
+++ b/src/service.hpp
@@ -3,55 +3,17 @@
 
 #pragma once
 
-#include "config.hpp"
-#include "dbus_loop.hpp"
-#include "file_storage.hpp"
-#include "host_console.hpp"
-#include "log_buffer.hpp"
-
 /**
  * @class Service
- * @brief Log service: watches for events and handles them.
+ * @brief The log service interface
  */
 class Service
 {
   public:
-    /**
-     * @brief Constructor.
-     *
-     * @param[in] config service configuration
-     *
-     * @throw std::exception in case of errors
-     */
-    Service(const Config& config);
+    virtual ~Service() = default;
 
     /**
      * @brief Run the service.
-     *
-     * @throw std::exception in case of errors
      */
-    void run();
-
-  private:
-    /**
-     * @brief Flush log buffer to a file.
-     */
-    void flush();
-
-    /**
-     * @brief Read data from host console and put it into the log buffer.
-     */
-    void readConsole();
-
-  private:
-    /** @brief Service configuration. */
-    const Config& config;
-    /** @brief D-Bus event loop. */
-    DbusLoop dbusLoop;
-    /** @brief Host console connection. */
-    HostConsole hostConsole;
-    /** @brief Intermediate storage: container for parsed log messages. */
-    LogBuffer logBuffer;
-    /** @brief Persistent storage. */
-    FileStorage fileStorage;
+    virtual void run() = 0;
 };
diff --git a/src/stream_service.cpp b/src/stream_service.cpp
new file mode 100644
index 0000000..b8d84e9
--- /dev/null
+++ b/src/stream_service.cpp
@@ -0,0 +1,106 @@
+// SPDX-License-Identifier: Apache-2.0
+// Copyright (C) 2021 Google
+
+#include "stream_service.hpp"
+
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#include <phosphor-logging/log.hpp>
+
+#include <vector>
+
+using namespace phosphor::logging;
+
+StreamService::StreamService(const char* streamDestination, DbusLoop& dbusLoop,
+                             HostConsole& hostConsole) :
+    destinationPath(streamDestination),
+    dbusLoop(&dbusLoop), hostConsole(&hostConsole), outputSocketFd(-1),
+    destination()
+{}
+
+StreamService::~StreamService()
+{
+    if (outputSocketFd != -1)
+    {
+        close(outputSocketFd);
+    }
+}
+
+void StreamService::run()
+{
+    setStreamSocket();
+    hostConsole->connect();
+    // Add SIGTERM signal handler for service shutdown
+    dbusLoop->addSignalHandler(SIGTERM, [this]() { this->dbusLoop->stop(0); });
+    // Register callback for socket IO
+    dbusLoop->addIoHandler(*hostConsole, [this]() { this->readConsole(); });
+
+    // Run D-Bus event loop
+    const int rc = dbusLoop->run();
+    if (rc < 0)
+    {
+        std::error_code ec(-rc, std::generic_category());
+        throw std::system_error(ec, "Error in event loop");
+    }
+}
+
+void StreamService::readConsole()
+{
+    constexpr size_t bufSize = 128; // enough for most line-oriented output
+    std::vector<char> bufData(bufSize);
+    char* buf = bufData.data();
+
+    try
+    {
+        while (const size_t rsz = hostConsole->read(buf, bufSize))
+        {
+            streamConsole(buf, rsz);
+        }
+    }
+    catch (const std::system_error& ex)
+    {
+        log<level::ERR>(ex.what());
+    }
+}
+
+void StreamService::streamConsole(const char* data, size_t len)
+{
+    // Send all received characters in a blocking manner.
+    size_t sent = 0;
+    while (sent < len)
+    {
+        // Datagram sockets preserve message boundaries. Furthermore,
+        // In most implementation, UNIX domain datagram sockets are
+        // always reliable and don't reorder datagrams.
+        ssize_t curr_sent =
+            sendto(outputSocketFd, data + sent, len - sent, 0,
+                   reinterpret_cast<const sockaddr*>(&destination),
+                   sizeof(destination) - sizeof(destination.sun_path) +
+                       strlen(destinationPath + 1) + 1);
+        if (curr_sent == -1)
+        {
+            std::string error = "Unable to send to the destination ";
+            error += destinationPath;
+            std::error_code ec(errno ? errno : EIO, std::generic_category());
+            throw std::system_error(ec, error);
+        }
+        sent += curr_sent;
+    }
+}
+
+void StreamService::setStreamSocket()
+{
+    destination.sun_family = AF_UNIX;
+    // To deal with abstract namespace unix socket.
+    size_t len = strlen(destinationPath + 1) + 1;
+    memcpy(destination.sun_path, destinationPath, len);
+    destination.sun_path[len] = '\0';
+    outputSocketFd = socket(AF_UNIX, SOCK_DGRAM, 0);
+    if (outputSocketFd == -1)
+    {
+        std::error_code ec(errno ? errno : EIO, std::generic_category());
+        throw std::system_error(ec, "Unable to create output socket.");
+    }
+}
diff --git a/src/stream_service.hpp b/src/stream_service.hpp
new file mode 100644
index 0000000..4aaba41
--- /dev/null
+++ b/src/stream_service.hpp
@@ -0,0 +1,80 @@
+// SPDX-License-Identifier: Apache-2.0
+// Copyright (C) 2021 Google
+
+#pragma once
+
+#include "config.hpp"
+#include "dbus_loop.hpp"
+#include "file_storage.hpp"
+#include "host_console.hpp"
+#include "log_buffer.hpp"
+#include "service.hpp"
+
+#include <sys/un.h>
+
+/**
+ * @class Service
+ * @brief Log service: watches for events and handles them.
+ */
+class StreamService : public Service
+{
+  public:
+    /**
+     * @brief Constructor for stream-only mode. All arguments should outlive
+     * this class.
+     *
+     * @param streamDestination the destination socket to stream logs.
+     * @param dbusLoop the DbusLoop instance.
+     * @param hostConsole the HostConsole instance.
+     */
+    StreamService(const char* streamDestination, DbusLoop& dbusLoop,
+                  HostConsole& hostConsole);
+
+    /**
+     * @brief Destructor; close the file descriptor.
+     */
+    ~StreamService() override;
+
+    /**
+     * @brief Run the service.
+     *
+     * @throw std::exception in case of errors
+     */
+    void run() override;
+
+  protected:
+    /**
+     * @brief Read data from host console and perform actions according to
+     * modes.
+     */
+    virtual void readConsole();
+
+    /**
+     * @brief Stream console data to a datagram unix socket.
+     *
+     * @param data the bytes to stream
+     * @param len the length of the bytes array
+     *
+     * @throw std::exception in case of errors
+     */
+    virtual void streamConsole(const char* data, size_t len);
+
+    /**
+     * @brief Set up stream socket
+     *
+     * @throw std::exception in case of errors
+     */
+    virtual void setStreamSocket();
+
+  private:
+    /** @brief Path to the destination (the rsyslog unix socket) */
+    const char* destinationPath;
+    /** @brief D-Bus event loop. */
+    DbusLoop* dbusLoop;
+    /** @brief Host console connection. */
+    HostConsole* hostConsole;
+    /** @brief File descriptor of the ouput socket */
+    int outputSocketFd;
+    /** @brief Address of the destination (the rsyslog unix socket) */
+    sockaddr_un destination;
+};
diff --git a/test/buffer_service_test.cpp b/test/buffer_service_test.cpp
new file mode 100644
index 0000000..602dfcc
--- /dev/null
+++ b/test/buffer_service_test.cpp
@@ -0,0 +1,154 @@
+// SPDX-License-Identifier: Apache-2.0
+// Copyright (C) 2021 Google
+
+#include "buffer_service.hpp"
+#include "config.hpp"
+#include "dbus_loop_mock.hpp"
+#include "file_storage_mock.hpp"
+#include "host_console_mock.hpp"
+#include "log_buffer_mock.hpp"
+
+#include <memory>
+#include <string>
+#include <system_error>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+namespace
+{
+
+constexpr char firstDatagram[] = "Hello world";
+// Shouldn't read more than maximum size of a datagram.
+constexpr int consoleReadMaxSize = 1024;
+
+using ::testing::_;
+using ::testing::DoAll;
+using ::testing::Eq;
+using ::testing::InSequence;
+using ::testing::Le;
+using ::testing::Ref;
+using ::testing::Return;
+using ::testing::SetArrayArgument;
+using ::testing::StrEq;
+using ::testing::Test;
+using ::testing::Throw;
+
+// A helper class that owns config.
+struct ConfigInTest
+{
+    Config config;
+    ConfigInTest() : config()
+    {}
+};
+
+class BufferServiceTest : public Test, public ConfigInTest, public BufferService
+{
+  public:
+    // ConfigInTest::config is initialized before BufferService.
+    BufferServiceTest() :
+        BufferService(ConfigInTest::config, dbusLoopMock, hostConsoleMock,
+                      logBufferMock, fileStorageMock)
+    {}
+
+    MOCK_METHOD(void, flush, (), (override));
+    MOCK_METHOD(void, readConsole, (), (override));
+
+  protected:
+    // Set hostConsole firstly read specified data and then read nothing.
+    void setHostConsoleOnce(char const* data, size_t len)
+    {
+        EXPECT_CALL(hostConsoleMock, read(_, Le(consoleReadMaxSize)))
+            .WillOnce(DoAll(SetArrayArgument<0>(data, data + len), Return(len)))
+            .WillOnce(Return(0));
+    }
+
+    DbusLoopMock dbusLoopMock;
+    HostConsoleMock hostConsoleMock;
+    LogBufferMock logBufferMock;
+    FileStorageMock fileStorageMock;
+};
+
+TEST_F(BufferServiceTest, FlushEmptyBuffer)
+{
+    EXPECT_CALL(logBufferMock, empty()).WillOnce(Return(true));
+    EXPECT_NO_THROW(BufferService::flush());
+}
+
+TEST_F(BufferServiceTest, FlushExceptionCaught)
+{
+    InSequence sequence;
+    EXPECT_CALL(logBufferMock, empty()).WillOnce(Return(false));
+    EXPECT_CALL(fileStorageMock, save(Ref(logBufferMock)))
+        .WillOnce(Throw(std::runtime_error("Mock error")));
+    EXPECT_NO_THROW(BufferService::flush());
+}
+
+TEST_F(BufferServiceTest, FlushOk)
+{
+    InSequence sequence;
+    EXPECT_CALL(logBufferMock, empty()).WillOnce(Return(false));
+    EXPECT_CALL(fileStorageMock, save(Ref(logBufferMock)));
+    EXPECT_CALL(logBufferMock, clear());
+    EXPECT_NO_THROW(BufferService::flush());
+}
+
+TEST_F(BufferServiceTest, ReadConsoleExceptionCaught)
+{
+    InSequence sequence;
+    // Shouldn't read more than maximum size of a datagram.
+    EXPECT_CALL(hostConsoleMock, read(_, Le(1024)))
+        .WillOnce(Throw(std::system_error(std::error_code(), "Mock error")));
+    EXPECT_NO_THROW(BufferService::readConsole());
+}
+
+TEST_F(BufferServiceTest, ReadConsoleOk)
+{
+
+    setHostConsoleOnce(firstDatagram, strlen(firstDatagram));
+    EXPECT_CALL(logBufferMock,
+                append(StrEq(firstDatagram), Eq(strlen(firstDatagram))))
+        .WillOnce(Return());
+    EXPECT_NO_THROW(BufferService::readConsole());
+}
+
+TEST_F(BufferServiceTest, RunIoRegisterError)
+{
+    EXPECT_CALL(hostConsoleMock, connect()).WillOnce(Return());
+    EXPECT_CALL(dbusLoopMock, addSignalHandler(Eq(SIGUSR1), _))
+        .WillOnce(Return());
+    EXPECT_CALL(dbusLoopMock, addSignalHandler(Eq(SIGTERM), _))
+        .WillOnce(Return());
+    EXPECT_CALL(dbusLoopMock, addIoHandler(Eq(int(hostConsoleMock)), _))
+        .WillOnce(Throw(std::runtime_error("Mock error")));
+    EXPECT_THROW(run(), std::runtime_error);
+}
+
+TEST_F(BufferServiceTest, RunSignalRegisterError)
+{
+    EXPECT_CALL(hostConsoleMock, connect()).WillOnce(Return());
+    EXPECT_CALL(dbusLoopMock, addSignalHandler(Eq(SIGUSR1), _))
+        .WillOnce(Throw(std::runtime_error("Mock error")));
+    EXPECT_THROW(run(), std::runtime_error);
+}
+
+TEST_F(BufferServiceTest, RunOk)
+{
+    ConfigInTest::config.bufFlushFull = true;
+    EXPECT_CALL(hostConsoleMock, connect()).WillOnce(Return());
+    EXPECT_CALL(dbusLoopMock, addIoHandler(Eq(int(hostConsoleMock)), _))
+        .WillOnce(Return());
+    EXPECT_CALL(dbusLoopMock, addSignalHandler(Eq(SIGTERM), _))
+        .WillOnce(Return());
+    EXPECT_CALL(logBufferMock, setFullHandler(_)).WillOnce(Return());
+    EXPECT_CALL(dbusLoopMock, addSignalHandler(Eq(SIGUSR1), _))
+        .WillOnce(Return());
+    EXPECT_CALL(dbusLoopMock,
+                addPropertyHandler(StrEq(ConfigInTest::config.hostState), _, _))
+        .WillOnce(Return());
+    EXPECT_CALL(dbusLoopMock, run).WillOnce(Return(0));
+    EXPECT_CALL(logBufferMock, empty()).WillOnce(Return(false));
+    EXPECT_CALL(*this, flush()).WillOnce(Return());
+    EXPECT_NO_THROW(run());
+}
+} // namespace
diff --git a/test/config_test.cpp b/test/config_test.cpp
index 15fbc4b..65936f5 100644
--- a/test/config_test.cpp
+++ b/test/config_test.cpp
@@ -3,16 +3,20 @@
 
 #include "config.hpp"
 
+#include <sys/un.h>
+
 #include <gtest/gtest.h>
 
 // Names of environment variables
 static const char* SOCKET_ID = "SOCKET_ID";
+static const char* MODE = "MODE";
 static const char* BUF_MAXSIZE = "BUF_MAXSIZE";
 static const char* BUF_MAXTIME = "BUF_MAXTIME";
 static const char* FLUSH_FULL = "FLUSH_FULL";
 static const char* HOST_STATE = "HOST_STATE";
 static const char* OUT_DIR = "OUT_DIR";
 static const char* MAX_FILES = "MAX_FILES";
+static const char* STREAM_DST = "STREAM_DST";
 
 /**
  * @class ConfigTest
@@ -35,12 +39,14 @@
     void resetEnv() const
     {
         unsetenv(SOCKET_ID);
+        unsetenv(MODE);
         unsetenv(BUF_MAXSIZE);
         unsetenv(BUF_MAXTIME);
         unsetenv(FLUSH_FULL);
         unsetenv(HOST_STATE);
         unsetenv(OUT_DIR);
         unsetenv(MAX_FILES);
+        unsetenv(STREAM_DST);
     }
 };
 
@@ -48,17 +54,20 @@
 {
     Config cfg;
     EXPECT_STREQ(cfg.socketId, "");
+    EXPECT_EQ(cfg.mode, Mode::bufferMode);
     EXPECT_EQ(cfg.bufMaxSize, 3000);
     EXPECT_EQ(cfg.bufMaxTime, 0);
     EXPECT_EQ(cfg.bufFlushFull, false);
     EXPECT_STREQ(cfg.hostState, "/xyz/openbmc_project/state/host0");
     EXPECT_STREQ(cfg.outDir, "/var/lib/obmc/hostlogs");
     EXPECT_EQ(cfg.maxFiles, 10);
+    EXPECT_STREQ(cfg.streamDestination, "/run/rsyslog/console_input");
 }
 
-TEST_F(ConfigTest, Load)
+TEST_F(ConfigTest, LoadInBufferMode)
 {
     setenv(SOCKET_ID, "id123", 1);
+    setenv(MODE, "buffer", 1);
     setenv(BUF_MAXSIZE, "1234", 1);
     setenv(BUF_MAXTIME, "4321", 1);
     setenv(FLUSH_FULL, "true", 1);
@@ -68,30 +77,73 @@
 
     Config cfg;
     EXPECT_STREQ(cfg.socketId, "id123");
+    EXPECT_EQ(cfg.mode, Mode::bufferMode);
     EXPECT_EQ(cfg.bufMaxSize, 1234);
     EXPECT_EQ(cfg.bufMaxTime, 4321);
     EXPECT_EQ(cfg.bufFlushFull, true);
     EXPECT_STREQ(cfg.hostState, "host123");
     EXPECT_STREQ(cfg.outDir, "path123");
     EXPECT_EQ(cfg.maxFiles, 1122);
+    // This should be default.
+    EXPECT_STREQ(cfg.streamDestination, "/run/rsyslog/console_input");
+}
+
+TEST_F(ConfigTest, LoadInStreamMode)
+{
+    setenv(SOCKET_ID, "id123", 1);
+    setenv(MODE, "stream", 1);
+    setenv(STREAM_DST, "path123", 1);
+
+    Config cfg;
+    EXPECT_STREQ(cfg.socketId, "id123");
+    EXPECT_EQ(cfg.mode, Mode::streamMode);
+    EXPECT_STREQ(cfg.streamDestination, "path123");
+
+    // These should be default.
+    EXPECT_EQ(cfg.bufMaxSize, 3000);
+    EXPECT_EQ(cfg.bufMaxTime, 0);
+    EXPECT_EQ(cfg.bufFlushFull, false);
+    EXPECT_STREQ(cfg.hostState, "/xyz/openbmc_project/state/host0");
+    EXPECT_STREQ(cfg.outDir, "/var/lib/obmc/hostlogs");
+    EXPECT_EQ(cfg.maxFiles, 10);
 }
 
 TEST_F(ConfigTest, InvalidNumeric)
 {
     setenv(BUF_MAXSIZE, "-1234", 1);
-    ASSERT_THROW(Config(), std::invalid_argument);
+    EXPECT_THROW(Config(), std::invalid_argument);
 }
 
 TEST_F(ConfigTest, InvalidBoolean)
 {
     setenv(FLUSH_FULL, "invalid", 1);
-    ASSERT_THROW(Config(), std::invalid_argument);
+    EXPECT_THROW(Config(), std::invalid_argument);
+    setenv(FLUSH_FULL, "true", 1);
+    EXPECT_NO_THROW(Config());
 }
 
-TEST_F(ConfigTest, InvalidConfig)
+TEST_F(ConfigTest, Mode)
+{
+    setenv(MODE, "invalid", 1);
+    EXPECT_THROW(Config(), std::invalid_argument);
+    setenv(MODE, "stream", 1);
+    EXPECT_EQ(Config().mode, Mode::streamMode);
+    setenv(MODE, "buffer", 1);
+    EXPECT_EQ(Config().mode, Mode::bufferMode);
+}
+
+TEST_F(ConfigTest, InvalidBufferModeConfig)
 {
     setenv(BUF_MAXSIZE, "0", 1);
     setenv(BUF_MAXTIME, "0", 1);
     setenv(FLUSH_FULL, "true", 1);
-    ASSERT_THROW(Config(), std::invalid_argument);
+    EXPECT_THROW(Config(), std::invalid_argument);
+}
+
+TEST_F(ConfigTest, InvalidStreamModeConfig)
+{
+    std::string tooLong(sizeof(sockaddr_un::sun_path), '0');
+    setenv(MODE, "stream", 1);
+    setenv(STREAM_DST, tooLong.c_str(), 1);
+    EXPECT_THROW(Config(), std::invalid_argument);
 }
diff --git a/test/dbus_loop_mock.hpp b/test/dbus_loop_mock.hpp
new file mode 100644
index 0000000..c8183fa
--- /dev/null
+++ b/test/dbus_loop_mock.hpp
@@ -0,0 +1,22 @@
+// SPDX-License-Identifier: Apache-2.0
+// Copyright (C) 2021 GOOGLE
+
+#pragma once
+
+#include "dbus_loop.hpp"
+
+#include <gmock/gmock.h>
+
+class DbusLoopMock : public DbusLoop
+{
+  public:
+    MOCK_METHOD(int, run, (), (const, override));
+    MOCK_METHOD(void, addIoHandler, (int fd, std::function<void()> callback),
+                (override));
+    MOCK_METHOD(void, addSignalHandler,
+                (int signal, std::function<void()> callback), (override));
+    MOCK_METHOD(void, addPropertyHandler,
+                (const std::string& objPath, const WatchProperties& props,
+                 std::function<void()> callback),
+                (override));
+};
diff --git a/test/file_storage_mock.hpp b/test/file_storage_mock.hpp
new file mode 100644
index 0000000..449ab29
--- /dev/null
+++ b/test/file_storage_mock.hpp
@@ -0,0 +1,16 @@
+// SPDX-License-Identifier: Apache-2.0
+// Copyright (C) 2021 GOOGLE
+
+#pragma once
+
+#include "file_storage.hpp"
+
+#include <gmock/gmock.h>
+
+class FileStorageMock : public FileStorage
+{
+  public:
+    FileStorageMock() : FileStorage("/tmp", "fake", -1)
+    {}
+    MOCK_METHOD(std::string, save, (const LogBuffer& buf), (const override));
+};
diff --git a/test/host_console_mock.hpp b/test/host_console_mock.hpp
new file mode 100644
index 0000000..3b71c3c
--- /dev/null
+++ b/test/host_console_mock.hpp
@@ -0,0 +1,22 @@
+// SPDX-License-Identifier: Apache-2.0
+// Copyright (C) 2021 GOOGLE
+
+#pragma once
+
+#include "host_console.hpp"
+
+#include <gmock/gmock.h>
+
+class HostConsoleMock : public HostConsole
+{
+  public:
+    HostConsoleMock() : HostConsole("")
+    {}
+    MOCK_METHOD(void, connect, (), (override));
+    MOCK_METHOD(size_t, read, (char* buf, size_t sz), (const, override));
+    // Returns a fixed integer for testing.
+    virtual operator int() const override
+    {
+        return 1;
+    };
+};
diff --git a/test/log_buffer_mock.hpp b/test/log_buffer_mock.hpp
new file mode 100644
index 0000000..f53fa87
--- /dev/null
+++ b/test/log_buffer_mock.hpp
@@ -0,0 +1,19 @@
+// SPDX-License-Identifier: Apache-2.0
+// Copyright (C) 2021 GOOGLE
+
+#pragma once
+
+#include "log_buffer.hpp"
+
+#include <gmock/gmock.h>
+
+class LogBufferMock : public LogBuffer
+{
+  public:
+    LogBufferMock() : LogBuffer(-1, -1)
+    {}
+    MOCK_METHOD(void, append, (const char* data, size_t sz), (override));
+    MOCK_METHOD(void, setFullHandler, (std::function<void()> cb), (override));
+    MOCK_METHOD(bool, empty, (), (const, override));
+    MOCK_METHOD(void, clear, (), (override));
+};
diff --git a/test/meson.build b/test/meson.build
index 1a10b6f..9916157 100644
--- a/test/meson.build
+++ b/test/meson.build
@@ -9,18 +9,26 @@
       'file_storage_test.cpp',
       'host_console_test.cpp',
       'log_buffer_test.cpp',
+      'buffer_service_test.cpp',
+      'stream_service_test.cpp',
       'zlib_file_test.cpp',
+      '../src/buffer_service.cpp',
       '../src/config.cpp',
+      '../src/dbus_loop.cpp',
       '../src/file_storage.cpp',
       '../src/host_console.cpp',
       '../src/log_buffer.cpp',
+      '../src/stream_service.cpp',
       '../src/zlib_exception.cpp',
       '../src/zlib_file.cpp',
     ],
     dependencies: [
       dependency('gtest', main: true, disabler: true, required: build_tests),
+      dependency('gmock', disabler: true, required: build_tests),
       dependency('zlib'),
+      dependency('phosphor-logging'),
     ],
+    cpp_args : ['-DSTREAM_SERVICE', '-DBUFFER_SERVICE'],
     include_directories: '../src',
   )
 )
diff --git a/test/stream_service_test.cpp b/test/stream_service_test.cpp
new file mode 100644
index 0000000..ab62c78
--- /dev/null
+++ b/test/stream_service_test.cpp
@@ -0,0 +1,160 @@
+// SPDX-License-Identifier: Apache-2.0
+// Copyright (C) 2021 Google
+
+#include "config.hpp"
+#include "dbus_loop_mock.hpp"
+#include "host_console_mock.hpp"
+#include "stream_service.hpp"
+
+#include <sys/socket.h>
+#include <unistd.h>
+
+#include <memory>
+#include <string>
+#include <system_error>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+namespace
+{
+
+constexpr char socketPath[] = "\0rsyslog";
+constexpr char firstDatagram[] = "Hello world";
+constexpr char secondDatagram[] = "World hello again";
+// Shouldn't read more than maximum size of a datagram.
+constexpr int consoleReadMaxSize = 1024;
+
+using ::testing::_;
+using ::testing::DoAll;
+using ::testing::Eq;
+using ::testing::InSequence;
+using ::testing::Le;
+using ::testing::Ref;
+using ::testing::Return;
+using ::testing::SetArrayArgument;
+using ::testing::StrEq;
+using ::testing::Test;
+using ::testing::Throw;
+
+class StreamServiceTest : public Test, public StreamService
+{
+  public:
+    StreamServiceTest() :
+        StreamService(socketPath, dbusLoopMock, hostConsoleMock),
+        serverSocket(-1)
+    {}
+    ~StreamServiceTest() override
+    {
+        // Stop server
+        if (serverSocket != -1)
+        {
+            close(serverSocket);
+        }
+    }
+
+    MOCK_METHOD(void, readConsole, (), (override));
+    MOCK_METHOD(void, streamConsole, (const char* data, size_t len),
+                (override));
+    MOCK_METHOD(void, setStreamSocket, (), (override));
+
+  protected:
+    // Start a server for reading datagrams.
+    void startServer()
+    {
+        serverSocket = socket(AF_UNIX, SOCK_DGRAM, 0);
+        ASSERT_NE(serverSocket, -1);
+        sockaddr_un sa{};
+        sa.sun_family = AF_UNIX;
+        memcpy(sa.sun_path, socketPath, sizeof(socketPath) - 1);
+        sa.sun_path[sizeof(socketPath) - 1] = '\0';
+        const socklen_t len =
+            sizeof(sa) - sizeof(sa.sun_path) + sizeof(socketPath) - 1;
+        ASSERT_NE(
+            bind(serverSocket, reinterpret_cast<const sockaddr*>(&sa), len),
+            -1);
+    }
+
+    // Set hostConsole firstly read specified data and then read nothing.
+    void setHostConsoleOnce(char const* data, size_t len)
+    {
+        EXPECT_CALL(hostConsoleMock, read(_, Le(consoleReadMaxSize)))
+            .WillOnce(DoAll(SetArrayArgument<0>(data, data + len), Return(len)))
+            .WillOnce(Return(0));
+    }
+
+    DbusLoopMock dbusLoopMock;
+    HostConsoleMock hostConsoleMock;
+    int serverSocket;
+};
+
+TEST_F(StreamServiceTest, ReadConsoleExceptionCaught)
+{
+    InSequence sequence;
+    // Shouldn't read more than maximum size of a datagram.
+    EXPECT_CALL(hostConsoleMock, read(_, Le(1024)))
+        .WillOnce(Throw(std::system_error(std::error_code(), "Mock error")));
+    EXPECT_NO_THROW(StreamService::readConsole());
+}
+
+TEST_F(StreamServiceTest, ReadConsoleOk)
+{
+    // stream mode
+    setHostConsoleOnce(firstDatagram, strlen(firstDatagram));
+    EXPECT_CALL(*this,
+                streamConsole(StrEq(firstDatagram), Eq(strlen(firstDatagram))))
+        .WillOnce(Return());
+    EXPECT_NO_THROW(StreamService::readConsole());
+}
+
+TEST_F(StreamServiceTest, StreamConsoleOk)
+{
+    startServer();
+    EXPECT_NO_THROW(StreamService::setStreamSocket());
+    EXPECT_NO_THROW(
+        StreamService::streamConsole(firstDatagram, strlen(firstDatagram)));
+    EXPECT_NO_THROW(
+        StreamService::streamConsole(secondDatagram, strlen(secondDatagram)));
+    char buffer[consoleReadMaxSize];
+    EXPECT_EQ(read(serverSocket, buffer, consoleReadMaxSize),
+              strlen(firstDatagram));
+    buffer[strlen(firstDatagram)] = '\0';
+    EXPECT_STREQ(buffer, firstDatagram);
+    EXPECT_EQ(read(serverSocket, buffer, consoleReadMaxSize),
+              strlen(secondDatagram));
+    buffer[strlen(secondDatagram)] = '\0';
+    EXPECT_STREQ(buffer, secondDatagram);
+}
+
+TEST_F(StreamServiceTest, RunIoRegisterError)
+{
+    EXPECT_CALL(*this, setStreamSocket()).WillOnce(Return());
+    EXPECT_CALL(hostConsoleMock, connect()).WillOnce(Return());
+    EXPECT_CALL(dbusLoopMock, addSignalHandler(Eq(SIGTERM), _))
+        .WillOnce(Return());
+    EXPECT_CALL(dbusLoopMock, addIoHandler(Eq(int(hostConsoleMock)), _))
+        .WillOnce(Throw(std::runtime_error("Mock error")));
+    EXPECT_THROW(run(), std::runtime_error);
+}
+
+TEST_F(StreamServiceTest, RunSignalRegisterError)
+{
+    EXPECT_CALL(*this, setStreamSocket()).WillOnce(Return());
+    EXPECT_CALL(hostConsoleMock, connect()).WillOnce(Return());
+    EXPECT_CALL(dbusLoopMock, addSignalHandler(Eq(SIGTERM), _))
+        .WillOnce(Throw(std::runtime_error("Mock error")));
+    EXPECT_THROW(run(), std::runtime_error);
+}
+
+TEST_F(StreamServiceTest, RunOk)
+{
+    EXPECT_CALL(hostConsoleMock, connect()).WillOnce(Return());
+    EXPECT_CALL(dbusLoopMock, addIoHandler(Eq(int(hostConsoleMock)), _))
+        .WillOnce(Return());
+    EXPECT_CALL(dbusLoopMock, addSignalHandler(Eq(SIGTERM), _))
+        .WillOnce(Return());
+    EXPECT_CALL(*this, setStreamSocket()).WillOnce(Return());
+    EXPECT_CALL(dbusLoopMock, run).WillOnce(Return(0));
+    EXPECT_NO_THROW(run());
+}
+} // namespace