EventService : Fix retry handling for http-client

When the event send/receive is failed, the bmcweb does not handle
the failure to tear-down the complete connection and start a fresh

The keep-alive header from the event listener is read to update
the connection states, so that the connection will be kept alive
or closed as per the subscriber's specifications

Updated the connection state machine to handle retry logic properly.
Avoided multiple simultaneous async calls which crashes the bmcweb. So
added connBusy flag which protects simultaneous async calls.

Used boost http response parser as parser for producing the response
message. Set the parser skip option to handle the empty response message
from listening server.

Tested by:
  - Subscribe for the events at BMC using DMTF event listener
  - Generate an event and see the same is received at the listener's console
  - Update the listner to change the keep-alive to true/false and
    observe the http-client connection states at bmcweb
  - Changed listener client to return non success HTTP status code
    and observed retry logic gets trigrred in http-client.
  - Gave wrong fqdn and observed async resolve failure and retry logc.
  - Stopped listener after connect and verified timeouts on http-client
    side.

Change-Id: Ibb45691f139916ba2954da37beda9d4f91c7cef3
Signed-off-by: Sunitha Harish <sunithaharish04@gmail.com>
Signed-off-by: AppaRao Puli <apparao.puli@linux.intel.com>
diff --git a/http/http_client.hpp b/http/http_client.hpp
index 992ac2b..feabbba 100644
--- a/http/http_client.hpp
+++ b/http/http_client.hpp
@@ -34,22 +34,28 @@
 {
 
 static constexpr uint8_t maxRequestQueueSize = 50;
+static constexpr unsigned int httpReadBodyLimit = 8192;
 
 enum class ConnState
 {
     initialized,
     resolveInProgress,
     resolveFailed,
+    resolved,
     connectInProgress,
     connectFailed,
     connected,
     sendInProgress,
     sendFailed,
+    recvInProgress,
     recvFailed,
     idle,
-    suspended,
+    closeInProgress,
     closed,
-    terminated
+    suspended,
+    terminated,
+    abortConnection,
+    retry
 };
 
 class HttpClient : public std::enable_shared_from_this<HttpClient>
@@ -58,11 +64,13 @@
     crow::async_resolve::Resolver resolver;
     boost::beast::tcp_stream conn;
     boost::asio::steady_timer timer;
-    boost::beast::flat_buffer buffer;
+    boost::beast::flat_static_buffer<httpReadBodyLimit> buffer;
     boost::beast::http::request<boost::beast::http::string_body> req;
-    boost::beast::http::response<boost::beast::http::string_body> res;
-    std::vector<std::pair<std::string, std::string>> headers;
-    std::queue<std::string> requestDataQueue;
+    std::optional<
+        boost::beast::http::response_parser<boost::beast::http::string_body>>
+        parser;
+    boost::circular_buffer_space_optimized<std::string> requestDataQueue{};
+    std::vector<boost::asio::ip::tcp::endpoint> endPoints;
     ConnState state;
     std::string subId;
     std::string host;
@@ -76,12 +84,7 @@
 
     void doResolve()
     {
-        if (state == ConnState::resolveInProgress)
-        {
-            return;
-        }
         state = ConnState::resolveInProgress;
-
         BMCWEB_LOG_DEBUG << "Trying to resolve: " << host << ":" << port;
 
         auto respHandler =
@@ -89,78 +92,56 @@
                 const boost::beast::error_code ec,
                 const std::vector<boost::asio::ip::tcp::endpoint>&
                     endpointList) {
-                if (ec)
+                if (ec || (endpointList.size() == 0))
                 {
                     BMCWEB_LOG_ERROR << "Resolve failed: " << ec.message();
                     self->state = ConnState::resolveFailed;
-                    self->checkQueue();
+                    self->handleConnState();
                     return;
                 }
                 BMCWEB_LOG_DEBUG << "Resolved";
-                self->doConnect(endpointList);
+                self->endPoints.assign(endpointList.begin(),
+                                       endpointList.end());
+                self->state = ConnState::resolved;
+                self->handleConnState();
             };
         resolver.asyncResolve(host, port, std::move(respHandler));
     }
 
-    void doConnect(
-        const std::vector<boost::asio::ip::tcp::endpoint>& endpointList)
+    void doConnect()
     {
-        if (state == ConnState::connectInProgress)
-        {
-            return;
-        }
         state = ConnState::connectInProgress;
 
         BMCWEB_LOG_DEBUG << "Trying to connect to: " << host << ":" << port;
 
         conn.expires_after(std::chrono::seconds(30));
         conn.async_connect(
-            endpointList, [self(shared_from_this())](
-                              const boost::beast::error_code ec,
-                              const boost::asio::ip::tcp::endpoint& endpoint) {
+            endPoints, [self(shared_from_this())](
+                           const boost::beast::error_code ec,
+                           const boost::asio::ip::tcp::endpoint& endpoint) {
                 if (ec)
                 {
                     BMCWEB_LOG_ERROR << "Connect " << endpoint
                                      << " failed: " << ec.message();
                     self->state = ConnState::connectFailed;
-                    self->checkQueue();
+                    self->handleConnState();
                     return;
                 }
-                self->state = ConnState::connected;
                 BMCWEB_LOG_DEBUG << "Connected to: " << endpoint;
-
-                self->checkQueue();
+                self->state = ConnState::connected;
+                self->handleConnState();
             });
     }
 
     void sendMessage(const std::string& data)
     {
-        if (state == ConnState::sendInProgress)
-        {
-            return;
-        }
         state = ConnState::sendInProgress;
 
         BMCWEB_LOG_DEBUG << __FUNCTION__ << "(): " << host << ":" << port;
 
-        req.version(static_cast<int>(11)); // HTTP 1.1
-        req.target(uri);
-        req.method(boost::beast::http::verb::post);
-
-        // Set headers
-        for (const auto& [key, value] : headers)
-        {
-            req.set(key, value);
-        }
-        req.set(boost::beast::http::field::host, host);
-        req.keep_alive(true);
-
         req.body() = data;
         req.prepare_payload();
 
-        // Set a timeout on the operation
-        conn.expires_after(std::chrono::seconds(30));
-
         // Send the HTTP request to the remote host
         boost::beast::http::async_write(
             conn, req,
@@ -171,7 +152,7 @@
                     BMCWEB_LOG_ERROR << "sendMessage() failed: "
                                      << ec.message();
                     self->state = ConnState::sendFailed;
-                    self->checkQueue();
+                    self->handleConnState();
                     return;
                 }
                 BMCWEB_LOG_DEBUG << "sendMessage() bytes transferred: "
@@ -184,9 +165,17 @@
 
     void recvMessage()
     {
+        state = ConnState::recvInProgress;
+
+        parser.emplace(std::piecewise_construct, std::make_tuple());
+        parser->body_limit(httpReadBodyLimit);
+
+        // Check only for the response header
+        parser->skip(true);
+
         // Receive the HTTP response
         boost::beast::http::async_read(
-            conn, buffer, res,
+            conn, buffer, *parser,
             [self(shared_from_this())](const boost::beast::error_code& ec,
                                        const std::size_t& bytesTransferred) {
                 if (ec)
@@ -194,30 +183,47 @@
                     BMCWEB_LOG_ERROR << "recvMessage() failed: "
                                      << ec.message();
                     self->state = ConnState::recvFailed;
-                    self->checkQueue();
+                    self->handleConnState();
                     return;
                 }
                 BMCWEB_LOG_DEBUG << "recvMessage() bytes transferred: "
                                  << bytesTransferred;
-                boost::ignore_unused(bytesTransferred);
-
-                // Discard received data. We are not interested.
-                BMCWEB_LOG_DEBUG << "recvMessage() data: " << self->res;
+                BMCWEB_LOG_DEBUG << "recvMessage() data: "
+                                 << self->parser->get();
 
                 // Send is successful, Lets remove data from queue
                 // check for next request data in queue.
-                self->requestDataQueue.pop();
+                if (!self->requestDataQueue.empty())
+                {
+                    self->requestDataQueue.pop_front();
+                }
                 self->state = ConnState::idle;
-                self->checkQueue();
+
+                // Keep the connection alive if server supports it
+                // Else close the connection
+                BMCWEB_LOG_DEBUG << "recvMessage() keepalive : "
+                                 << self->parser->keep_alive();
+                if (!self->parser->keep_alive())
+                {
+                    // Abort the connection since server is not keep-alive
+                    // enabled
+                    self->state = ConnState::abortConnection;
+                }
+
+                // Returns ownership of the parsed message
+                self->parser->release();
+
+                self->handleConnState();
             });
     }
 
     void doClose()
     {
+        state = ConnState::closeInProgress;
         boost::beast::error_code ec;
         conn.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
+        conn.close();
 
-        state = ConnState::closed;
         // not_connected happens sometimes so don't bother reporting it.
         if (ec && ec != boost::beast::errc::not_connected)
         {
@@ -225,112 +231,139 @@
             return;
         }
         BMCWEB_LOG_DEBUG << "Connection closed gracefully";
+        if ((state != ConnState::suspended) && (state != ConnState::terminated))
+        {
+            state = ConnState::closed;
+            handleConnState();
+        }
     }
 
-    void checkQueue(const bool newRecord = false)
+    void waitAndRetry()
     {
-        if (requestDataQueue.empty())
-        {
-            // TODO: Having issue in keeping connection alive. So lets close if
-            // nothing to be transferred.
-            doClose();
-
-            BMCWEB_LOG_DEBUG << "requestDataQueue is empty\n";
-            return;
-        }
-
         if (retryCount >= maxRetryAttempts)
         {
-            BMCWEB_LOG_ERROR << "Maximum number of retries is reached.";
+            BMCWEB_LOG_ERROR << "Maximum number of retries reached.";
 
             // Clear queue.
             while (!requestDataQueue.empty())
             {
-                requestDataQueue.pop();
+                requestDataQueue.pop_front();
             }
 
-            BMCWEB_LOG_DEBUG << "Retry policy is set to " << retryPolicyAction;
+            BMCWEB_LOG_DEBUG << "Retry policy: " << retryPolicyAction;
             if (retryPolicyAction == "TerminateAfterRetries")
             {
                 // TODO: delete subscription
                 state = ConnState::terminated;
-                return;
             }
             if (retryPolicyAction == "SuspendRetries")
             {
                 state = ConnState::suspended;
-                return;
             }
-            // keep retrying, reset count and continue.
+            // Reset the retrycount to zero so that client can try connecting
+            // again if needed
             retryCount = 0;
-        }
-
-        if ((state == ConnState::connectFailed) ||
-            (state == ConnState::sendFailed) ||
-            (state == ConnState::recvFailed))
-        {
-            if (newRecord)
-            {
-                // We are already running async wait and retry.
-                // Since record is added to queue, it gets the
-                // turn in FIFO.
-                return;
-            }
-
-            if (runningTimer)
-            {
-                BMCWEB_LOG_DEBUG << "Retry timer is already running.";
-                return;
-            }
-            runningTimer = true;
-
-            retryCount++;
-
-            BMCWEB_LOG_DEBUG << "Attempt retry after " << retryIntervalSecs
-                             << " seconds. RetryCount = " << retryCount;
-            timer.expires_after(std::chrono::seconds(retryIntervalSecs));
-            timer.async_wait(
-                [self = shared_from_this()](const boost::system::error_code&) {
-                    self->runningTimer = false;
-                    self->connStateCheck();
-                });
+            handleConnState();
             return;
         }
-        // reset retry count.
-        retryCount = 0;
-        connStateCheck();
 
+        if (runningTimer)
+        {
+            BMCWEB_LOG_DEBUG << "Retry timer is already running.";
+            return;
+        }
+        runningTimer = true;
+
+        retryCount++;
+
+        BMCWEB_LOG_DEBUG << "Attempt retry after " << retryIntervalSecs
+                         << " seconds. RetryCount = " << retryCount;
+        timer.expires_after(std::chrono::seconds(retryIntervalSecs));
+        timer.async_wait(
+            [self = shared_from_this()](const boost::system::error_code ec) {
+                if (ec)
+                {
+                    BMCWEB_LOG_ERROR << "async_wait failed: " << ec.message();
+                    // Ignore the error and continue the retry loop to attempt
+                    // sending the event as per the retry policy
+                }
+                self->runningTimer = false;
+
+                // Lets close connection and start from resolve.
+                self->doClose();
+            });
         return;
     }
 
-    void connStateCheck()
+    void handleConnState()
     {
         switch (state)
         {
             case ConnState::resolveInProgress:
             case ConnState::connectInProgress:
             case ConnState::sendInProgress:
-            case ConnState::suspended:
-            case ConnState::terminated:
-                // do nothing
+            case ConnState::recvInProgress:
+            case ConnState::closeInProgress:
+            {
+                BMCWEB_LOG_DEBUG << "Async operation is already in progress";
                 break;
+            }
             case ConnState::initialized:
             case ConnState::closed:
+            {
+                if (requestDataQueue.empty())
+                {
+                    BMCWEB_LOG_DEBUG << "requestDataQueue is empty";
+                    return;
+                }
+                doResolve();
+                break;
+            }
+            case ConnState::resolved:
+            {
+                doConnect();
+                break;
+            }
+            case ConnState::suspended:
+            case ConnState::terminated:
+            {
+                doClose();
+                break;
+            }
+            case ConnState::resolveFailed:
             case ConnState::connectFailed:
             case ConnState::sendFailed:
             case ConnState::recvFailed:
-            case ConnState::resolveFailed:
+            case ConnState::retry:
             {
-                doResolve();
+                // In case of failures during connect and handshake
+                // the retry policy will be applied
+                waitAndRetry();
                 break;
             }
             case ConnState::connected:
             case ConnState::idle:
             {
+                // State idle means, previous attempt is successful
+                // State connected means, client connection is established
+                // successfully
+                if (requestDataQueue.empty())
+                {
+                    BMCWEB_LOG_DEBUG << "requestDataQueue is empty";
+                    return;
+                }
                 std::string data = requestDataQueue.front();
                 sendMessage(data);
                 break;
             }
+            case ConnState::abortConnection:
+            {
+                // Server did not want to keep alive the session
+                doClose();
+                break;
+            }
+            default:
+                break;
         }
     }
 
@@ -339,37 +372,38 @@
                         const std::string& destIP, const std::string& destPort,
                         const std::string& destUri) :
         conn(ioc),
-        timer(ioc), subId(id), host(destIP), port(destPort), uri(destUri),
-        retryCount(0), maxRetryAttempts(5), retryIntervalSecs(0),
+        timer(ioc), req(boost::beast::http::verb::post, destUri, 11),
+        state(ConnState::initialized), subId(id), host(destIP), port(destPort),
+        uri(destUri), retryCount(0), maxRetryAttempts(5), retryIntervalSecs(0),
         retryPolicyAction("TerminateAfterRetries"), runningTimer(false)
     {
-        state = ConnState::initialized;
+        // Set the request header
+        req.set(boost::beast::http::field::host, host);
+        req.set(boost::beast::http::field::content_type, "application/json");
+        req.keep_alive(true);
+
+        requestDataQueue.set_capacity(maxRequestQueueSize);
     }
 
     void sendData(const std::string& data)
     {
-        if (state == ConnState::suspended)
+        if ((state == ConnState::suspended) || (state == ConnState::terminated))
         {
             return;
         }
-
-        if (requestDataQueue.size() <= maxRequestQueueSize)
-        {
-            requestDataQueue.push(data);
-            checkQueue(true);
-        }
-        else
-        {
-            BMCWEB_LOG_ERROR << "Request queue is full. So ignoring data.";
-        }
-
+        requestDataQueue.push_back(data);
+        handleConnState();
         return;
     }
 
-    void setHeaders(
+    void addHeaders(
         const std::vector<std::pair<std::string, std::string>>& httpHeaders)
     {
-        headers = httpHeaders;
+        // Set custom headers
+        for (const auto& [key, value] : httpHeaders)
+        {
+            req.set(key, value);
+        }
     }
 
     void setRetryConfig(const uint32_t retryAttempts,
diff --git a/redfish-core/include/event_service_manager.hpp b/redfish-core/include/event_service_manager.hpp
index fa4e41a..c999121 100644
--- a/redfish-core/include/event_service_manager.hpp
+++ b/redfish-core/include/event_service_manager.hpp
@@ -423,7 +423,7 @@
                     reqHeaders.emplace_back(std::pair(key, val));
                 }
             }
-            conn->setHeaders(reqHeaders);
+            conn->addHeaders(reqHeaders);
             conn->sendData(msg);
             this->eventSeqNum++;
         }