EventService : Fix retry handling for http-client

When the event send/receive is failed, the bmcweb does not handle
the failure to tear-down the complete connection and start a fresh

The keep-alive header from the event listener is read to update
the connection states, so that the connection will be kept alive
or closed as per the subscriber's specifications

Updated the connection state machine to handle retry logic properly.
Avoided multiple simultaneous async calls which crashes the bmcweb. So
added few "InProgress" flags which protects simultaneous async calls.

Changed buffer type from flat_buffer to flat_static_buffer and
imposed an upper limit on total size avoiding heap allocations.
Also changed the requestDataQueue from std::queue to
circular_buffer_space_optimized which allocates memory as needed
and dynamically controls size.

Used boost http response parser as parser for producing the response
message. Set the parser skip option to handle the empty response message
from listening server. On reception of response, the response code in
the header is checked to determine success/failure and trigger retry
in the case of failure.

Tested by:
  - Subscribe for the events at BMC using DMTF event listener
  - Generate an event and see the same is received at the listener's console
  - Update the listner to change the keep-alive to true/false and
    observe the http-client connection states at bmcweb
  - Changed listener client to return non success HTTP status code
    and observed retry logic gets trigrred in http-client.
  - Gave wrong fqdn and observed async resolve failure and retry logc.
  - Stopped listener after connect and verified timeouts on http-client
    side.

Change-Id: Ibb45691f139916ba2954da37beda9d4f91c7cef3
Signed-off-by: Sunitha Harish <sunithaharish04@gmail.com>
Signed-off-by: AppaRao Puli <apparao.puli@linux.intel.com>
Signed-off-by: P Dheeraj Srujan Kumar <p.dheeraj.srujan.kumar@intel.com>
diff --git a/http/http_client.hpp b/http/http_client.hpp
index b135999..ab20eb0 100644
--- a/http/http_client.hpp
+++ b/http/http_client.hpp
@@ -46,11 +46,15 @@
     connected,
     sendInProgress,
     sendFailed,
+    recvInProgress,
     recvFailed,
     idle,
-    suspended,
+    closeInProgress,
     closed,
-    terminated
+    suspended,
+    terminated,
+    abortConnection,
+    retry
 };
 
 class HttpClient : public std::enable_shared_from_this<HttpClient>
@@ -61,9 +65,12 @@
     boost::asio::steady_timer timer;
     boost::beast::flat_static_buffer<httpReadBodyLimit> buffer;
     boost::beast::http::request<boost::beast::http::string_body> req;
-    boost::beast::http::response<boost::beast::http::string_body> res;
+    std::optional<
+        boost::beast::http::response_parser<boost::beast::http::string_body>>
+        parser;
     std::vector<std::pair<std::string, std::string>> headers;
     boost::circular_buffer_space_optimized<std::string> requestDataQueue{};
+
     ConnState state;
     std::string subId;
     std::string host;
@@ -77,12 +84,7 @@
 
     void doResolve()
     {
-        if (state == ConnState::resolveInProgress)
-        {
-            return;
-        }
         state = ConnState::resolveInProgress;
-
         BMCWEB_LOG_DEBUG << "Trying to resolve: " << host << ":" << port;
 
         auto respHandler =
@@ -90,11 +92,11 @@
                 const boost::beast::error_code ec,
                 const std::vector<boost::asio::ip::tcp::endpoint>&
                     endpointList) {
-                if (ec)
+                if (ec || (endpointList.size() == 0))
                 {
                     BMCWEB_LOG_ERROR << "Resolve failed: " << ec.message();
                     self->state = ConnState::resolveFailed;
-                    self->checkQueue();
+                    self->handleConnState();
                     return;
                 }
                 BMCWEB_LOG_DEBUG << "Resolved";
@@ -106,10 +108,6 @@
     void doConnect(
         const std::vector<boost::asio::ip::tcp::endpoint>& endpointList)
     {
-        if (state == ConnState::connectInProgress)
-        {
-            return;
-        }
         state = ConnState::connectInProgress;
 
         BMCWEB_LOG_DEBUG << "Trying to connect to: " << host << ":" << port;
@@ -124,22 +122,17 @@
                     BMCWEB_LOG_ERROR << "Connect " << endpoint
                                      << " failed: " << ec.message();
                     self->state = ConnState::connectFailed;
-                    self->checkQueue();
+                    self->handleConnState();
                     return;
                 }
-                self->state = ConnState::connected;
                 BMCWEB_LOG_DEBUG << "Connected to: " << endpoint;
-
-                self->checkQueue();
+                self->state = ConnState::connected;
+                self->handleConnState();
             });
     }
 
     void sendMessage(const std::string& data)
     {
-        if (state == ConnState::sendInProgress)
-        {
-            return;
-        }
         state = ConnState::sendInProgress;
 
         BMCWEB_LOG_DEBUG << __FUNCTION__ << "(): " << host << ":" << port;
@@ -172,7 +165,7 @@
                     BMCWEB_LOG_ERROR << "sendMessage() failed: "
                                      << ec.message();
                     self->state = ConnState::sendFailed;
-                    self->checkQueue();
+                    self->handleConnState();
                     return;
                 }
                 BMCWEB_LOG_DEBUG << "sendMessage() bytes transferred: "
@@ -185,9 +178,17 @@
 
     void recvMessage()
     {
+        state = ConnState::recvInProgress;
+
+        parser.emplace(std::piecewise_construct, std::make_tuple());
+        parser->body_limit(httpReadBodyLimit);
+
+        // Check only for the response header
+        parser->skip(true);
+
         // Receive the HTTP response
         boost::beast::http::async_read(
-            conn, buffer, res,
+            conn, buffer, *parser,
             [self(shared_from_this())](const boost::beast::error_code& ec,
                                        const std::size_t& bytesTransferred) {
                 if (ec)
@@ -195,15 +196,28 @@
                     BMCWEB_LOG_ERROR << "recvMessage() failed: "
                                      << ec.message();
                     self->state = ConnState::recvFailed;
-                    self->checkQueue();
+                    self->handleConnState();
                     return;
                 }
                 BMCWEB_LOG_DEBUG << "recvMessage() bytes transferred: "
                                  << bytesTransferred;
-                boost::ignore_unused(bytesTransferred);
+                BMCWEB_LOG_DEBUG << "recvMessage() data: "
+                                 << self->parser->get();
 
-                // Discard received data. We are not interested.
-                BMCWEB_LOG_DEBUG << "recvMessage() data: " << self->res;
+                unsigned int respCode = self->parser->get().result_int();
+                BMCWEB_LOG_DEBUG << "recvMessage() Header Response Code: "
+                                 << respCode;
+
+                // 2XX response is considered to be successful
+                if ((respCode < 200) || (respCode >= 300))
+                {
+                    // The listener failed to receive the Sent-Event
+                    BMCWEB_LOG_ERROR << "recvMessage() Listener Failed to "
+                                        "receive Sent-Event";
+                    self->state = ConnState::recvFailed;
+                    self->handleConnState();
+                    return;
+                }
 
                 // Send is successful, Lets remove data from queue
                 // check for next request data in queue.
@@ -212,16 +226,29 @@
                     self->requestDataQueue.pop_front();
                 }
                 self->state = ConnState::idle;
-                self->checkQueue();
+
+                // Keep the connection alive if server supports it
+                // Else close the connection
+                BMCWEB_LOG_DEBUG << "recvMessage() keepalive : "
+                                 << self->parser->keep_alive();
+                if (!self->parser->keep_alive())
+                {
+                    // Abort the connection since server is not keep-alive
+                    // enabled
+                    self->state = ConnState::abortConnection;
+                }
+
+                self->handleConnState();
             });
     }
 
     void doClose()
     {
+        state = ConnState::closeInProgress;
         boost::beast::error_code ec;
         conn.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
+        conn.close();
 
-        state = ConnState::closed;
         // not_connected happens sometimes so don't bother reporting it.
         if (ec && ec != boost::beast::errc::not_connected)
         {
@@ -229,23 +256,18 @@
             return;
         }
         BMCWEB_LOG_DEBUG << "Connection closed gracefully";
+        if ((state != ConnState::suspended) && (state != ConnState::terminated))
+        {
+            state = ConnState::closed;
+            handleConnState();
+        }
     }
 
-    void checkQueue(const bool newRecord = false)
+    void waitAndRetry()
     {
-        if (requestDataQueue.empty())
-        {
-            // TODO: Having issue in keeping connection alive. So lets close if
-            // nothing to be transferred.
-            doClose();
-
-            BMCWEB_LOG_DEBUG << "requestDataQueue is empty\n";
-            return;
-        }
-
         if (retryCount >= maxRetryAttempts)
         {
-            BMCWEB_LOG_ERROR << "Maximum number of retries is reached.";
+            BMCWEB_LOG_ERROR << "Maximum number of retries reached.";
 
             // Clear queue.
             while (!requestDataQueue.empty())
@@ -253,88 +275,121 @@
                 requestDataQueue.pop_front();
             }
 
-            BMCWEB_LOG_DEBUG << "Retry policy is set to " << retryPolicyAction;
+            BMCWEB_LOG_DEBUG << "Retry policy: " << retryPolicyAction;
             if (retryPolicyAction == "TerminateAfterRetries")
             {
                 // TODO: delete subscription
                 state = ConnState::terminated;
-                return;
             }
             if (retryPolicyAction == "SuspendRetries")
             {
                 state = ConnState::suspended;
-                return;
             }
-            // keep retrying, reset count and continue.
+            // Reset the retrycount to zero so that client can try connecting
+            // again if needed
             retryCount = 0;
-        }
-
-        if ((state == ConnState::connectFailed) ||
-            (state == ConnState::sendFailed) ||
-            (state == ConnState::recvFailed))
-        {
-            if (newRecord)
-            {
-                // We are already running async wait and retry.
-                // Since record is added to queue, it gets the
-                // turn in FIFO.
-                return;
-            }
-
-            if (runningTimer)
-            {
-                BMCWEB_LOG_DEBUG << "Retry timer is already running.";
-                return;
-            }
-            runningTimer = true;
-
-            retryCount++;
-
-            BMCWEB_LOG_DEBUG << "Attempt retry after " << retryIntervalSecs
-                             << " seconds. RetryCount = " << retryCount;
-            timer.expires_after(std::chrono::seconds(retryIntervalSecs));
-            timer.async_wait(
-                [self = shared_from_this()](const boost::system::error_code&) {
-                    self->runningTimer = false;
-                    self->connStateCheck();
-                });
+            handleConnState();
             return;
         }
-        // reset retry count.
-        retryCount = 0;
-        connStateCheck();
 
+        if (runningTimer)
+        {
+            BMCWEB_LOG_DEBUG << "Retry timer is already running.";
+            return;
+        }
+        runningTimer = true;
+
+        retryCount++;
+
+        BMCWEB_LOG_DEBUG << "Attempt retry after " << retryIntervalSecs
+                         << " seconds. RetryCount = " << retryCount;
+        timer.expires_after(std::chrono::seconds(retryIntervalSecs));
+        timer.async_wait(
+            [self = shared_from_this()](const boost::system::error_code ec) {
+                if (ec == boost::asio::error::operation_aborted)
+                {
+                    BMCWEB_LOG_DEBUG
+                        << "async_wait failed since the operation is aborted"
+                        << ec.message();
+                }
+                else if (ec)
+                {
+                    BMCWEB_LOG_ERROR << "async_wait failed: " << ec.message();
+                    // Ignore the error and continue the retry loop to attempt
+                    // sending the event as per the retry policy
+                }
+                self->runningTimer = false;
+
+                // Lets close connection and start from resolve.
+                self->doClose();
+            });
         return;
     }
 
-    void connStateCheck()
+    void handleConnState()
     {
         switch (state)
         {
             case ConnState::resolveInProgress:
             case ConnState::connectInProgress:
             case ConnState::sendInProgress:
-            case ConnState::suspended:
-            case ConnState::terminated:
-                // do nothing
+            case ConnState::recvInProgress:
+            case ConnState::closeInProgress:
+            {
+                BMCWEB_LOG_DEBUG << "Async operation is already in progress";
                 break;
+            }
             case ConnState::initialized:
             case ConnState::closed:
+            {
+                if (requestDataQueue.empty())
+                {
+                    BMCWEB_LOG_DEBUG << "requestDataQueue is empty";
+                    return;
+                }
+                doResolve();
+                break;
+            }
+            case ConnState::suspended:
+            case ConnState::terminated:
+            {
+                doClose();
+                break;
+            }
+            case ConnState::resolveFailed:
             case ConnState::connectFailed:
             case ConnState::sendFailed:
             case ConnState::recvFailed:
-            case ConnState::resolveFailed:
+            case ConnState::retry:
             {
-                doResolve();
+                // In case of failures during connect and handshake
+                // the retry policy will be applied
+                waitAndRetry();
                 break;
             }
             case ConnState::connected:
             case ConnState::idle:
             {
+                // State idle means, previous attempt is successful
+                // State connected means, client connection is established
+                // successfully
+                if (requestDataQueue.empty())
+                {
+                    BMCWEB_LOG_DEBUG << "requestDataQueue is empty";
+                    return;
+                }
                 std::string data = requestDataQueue.front();
                 sendMessage(data);
                 break;
             }
+            case ConnState::abortConnection:
+            {
+                // Server did not want to keep alive the session
+                doClose();
+                break;
+            }
+            default:
+                break;
         }
     }
 
@@ -352,7 +407,7 @@
 
     void sendData(const std::string& data)
     {
-        if (state == ConnState::suspended)
+        if ((state == ConnState::suspended) || (state == ConnState::terminated))
         {
             return;
         }
@@ -360,7 +415,7 @@
         if (requestDataQueue.size() <= maxRequestQueueSize)
         {
             requestDataQueue.push_back(data);
-            checkQueue(true);
+            handleConnState();
         }
         else
         {