requestHandler: Queue the request messages to one endpoint

Section "Requirements for requesters" in DSP0240 details "A PLDM
terminus that issues PLDM requests to another PLDM terminus shall wait
until one of the following occurs before issuing a new PLDM request:
it gets the response to a particular request, it times out waiting for
the response, or it receives an indication that transmission of the
particular request failed." So the registered request messages to one
endpoint have to be queued before sending to meet the requester
requirement.
When a sensor manager is added to pldmd daemon to manage the sensors
of one terminus, the sensor manager will support the sensor polling
timers. The timers will poll the PLDM sensors with different intervals
so there can be many `getSensorReading` requests to one endpoint to
get the sensor values. Moreover, BMC also sends
`PollForPlatformEventMessage` requests to get the event data when it
receives the `pldmMessagePollEvent` event from the terminus. So those
TX request messages have to be queued.

Signed-off-by: Thu Nguyen <thu@os.amperecomputing.com>
Change-Id: I9756e1029b41a297a8f73e2ab7be47bc143ef710
diff --git a/requester/handler.hpp b/requester/handler.hpp
index 6987b16..02c5d02 100644
--- a/requester/handler.hpp
+++ b/requester/handler.hpp
@@ -15,8 +15,11 @@
 
 #include <cassert>
 #include <chrono>
+#include <deque>
 #include <functional>
 #include <memory>
+#include <mutex>
+#include <queue>
 #include <tuple>
 #include <unordered_map>
 
@@ -60,9 +63,37 @@
     }
 };
 
-using ResponseHandler = std::move_only_function<void(
+using ResponseHandler = std::function<void(
     mctp_eid_t eid, const pldm_msg* response, size_t respMsgLen)>;
 
+/** @struct RegisteredRequest
+ *
+ *  This struct is used to store the registered request to one endpoint.
+ */
+struct RegisteredRequest
+{
+    RequestKey key;                  //!< Responder MCTP endpoint ID
+    std::vector<uint8_t> reqMsg;     //!< Request messages queue
+    ResponseHandler responseHandler; //!< Waiting for response flag
+};
+
+/** @struct EndpointMessageQueue
+ *
+ *  This struct is used to save the list of request messages of one endpoint and
+ *  the existing of the request message to the endpoint with its' EID.
+ */
+struct EndpointMessageQueue
+{
+    mctp_eid_t eid; //!< Responder MCTP endpoint ID
+    std::deque<std::shared_ptr<RegisteredRequest>> requestQueue; //!< Queue
+    bool activeRequest; //!< Waiting for response flag
+
+    bool operator==(const mctp_eid_t& mctpEid) const
+    {
+        return (eid == mctpEid);
+    }
+};
+
 /** @class Handler
  *
  *  This class handles the lifecycle of the PLDM request message based on the
@@ -109,6 +140,98 @@
         numRetries(numRetries), responseTimeOut(responseTimeOut)
     {}
 
+    void instanceIdExpiryCallBack(RequestKey key)
+    {
+        auto eid = key.eid;
+        if (this->handlers.contains(key))
+        {
+            error("The eid:InstanceID {EID}:{IID} is using.", "EID",
+                  (unsigned)key.eid, "IID", (unsigned)key.instanceId);
+            auto& [request, responseHandler,
+                   timerInstance] = this->handlers[key];
+            request->stop();
+            auto rc = timerInstance->stop();
+            if (rc)
+            {
+                error("Failed to stop the instance ID expiry timer. RC = {RC}",
+                      "RC", static_cast<int>(rc));
+            }
+            // Call response handler with an empty response to indicate no
+            // response
+            responseHandler(eid, nullptr, 0);
+            this->removeRequestContainer.emplace(
+                key,
+                std::make_unique<sdeventplus::source::Defer>(
+                    event, std::bind(&Handler::removeRequestEntry, this, key)));
+            endpointMessageQueues[eid]->activeRequest = false;
+
+            /* try to send new request if the endpoint is free */
+            pollEndpointQueue(eid);
+        }
+        else
+        {
+            // This condition is not possible, if a response is received
+            // before the instance ID expiry, then the response handler
+            // is executed and the entry will be removed.
+            assert(false);
+        }
+    }
+
+    /** @brief Send the remaining PLDM request messages in endpoint queue
+     *
+     *  @param[in] eid - endpoint ID of the remote MCTP endpoint
+     */
+    int pollEndpointQueue(mctp_eid_t eid)
+    {
+        if (endpointMessageQueues[eid]->activeRequest ||
+            endpointMessageQueues[eid]->requestQueue.empty())
+        {
+            return PLDM_SUCCESS;
+        }
+
+        endpointMessageQueues[eid]->activeRequest = true;
+        auto requestMsg = endpointMessageQueues[eid]->requestQueue.front();
+        endpointMessageQueues[eid]->requestQueue.pop_front();
+
+        auto request = std::make_unique<RequestInterface>(
+            pldmTransport, requestMsg->key.eid, event,
+            std::move(requestMsg->reqMsg), numRetries, responseTimeOut,
+            verbose);
+        auto timer = std::make_unique<phosphor::Timer>(
+            event.get(), std::bind(&Handler::instanceIdExpiryCallBack, this,
+                                   requestMsg->key));
+
+        auto rc = request->start();
+        if (rc)
+        {
+            instanceIdDb.free(requestMsg->key.eid, requestMsg->key.instanceId);
+            error("Failure to send the PLDM request message");
+            endpointMessageQueues[eid]->activeRequest = false;
+            return rc;
+        }
+
+        try
+        {
+            timer->start(duration_cast<std::chrono::microseconds>(
+                instanceIdExpiryInterval));
+        }
+        catch (const std::runtime_error& e)
+        {
+            instanceIdDb.free(requestMsg->key.eid, requestMsg->key.instanceId);
+            error(
+                "Failed to start the instance ID expiry timer. RC = {ERR_EXCEP}",
+                "ERR_EXCEP", e.what());
+            endpointMessageQueues[eid]->activeRequest = false;
+            return PLDM_ERROR;
+        }
+
+        handlers.emplace(requestMsg->key,
+                         std::make_tuple(std::move(request),
+                                         std::move(requestMsg->responseHandler),
+                                         std::move(timer)));
+        return PLDM_SUCCESS;
+    }
+
     /** @brief Register a PLDM request message
      *
      *  @param[in] eid - endpoint ID of the remote MCTP endpoint
@@ -126,41 +249,6 @@
     {
         RequestKey key{eid, instanceId, type, command};
 
-        auto instanceIdExpiryCallBack = [key, this](void) {
-            if (this->handlers.contains(key))
-            {
-                error(
-                    "Response not received for the request, instance ID expired. EID = {EID} INSTANCE_ID = {INST_ID} TYPE = {REQ_KEY_TYPE} COMMAND = {REQ_KEY_CMD}",
-                    "EID", (unsigned)key.eid, "INST_ID",
-                    (unsigned)key.instanceId, "REQ_KEY_TYPE",
-                    (unsigned)key.type, "REQ_KEY_CMD", (unsigned)key.command);
-                auto& [request, responseHandler,
-                       timerInstance] = this->handlers[key];
-                request->stop();
-                auto rc = timerInstance->stop();
-                if (rc)
-                {
-                    error(
-                        "Failed to stop the instance ID expiry timer. RC = {RC}",
-                        "RC", static_cast<int>(rc));
-                }
-                // Call response handler with an empty response to indicate no
-                // response
-                responseHandler(key.eid, nullptr, 0);
-                this->removeRequestContainer.emplace(
-                    key, std::make_unique<sdeventplus::source::Defer>(
-                             event, std::bind(&Handler::removeRequestEntry,
-                                              this, key)));
-            }
-            else
-            {
-                // This condition is not possible, if a response is received
-                // before the instance ID expiry, then the response handler
-                // is executed and the entry will be removed.
-                assert(false);
-            }
-        };
-
         if (handlers.contains(key))
         {
             error("The eid:InstanceID {EID}:{IID} is using.", "EID",
@@ -168,38 +256,24 @@
             return PLDM_ERROR;
         }
 
-        auto request = std::make_unique<RequestInterface>(
-            pldmTransport, eid, event, std::move(requestMsg), numRetries,
-            responseTimeOut, verbose);
-        auto timer = std::make_unique<phosphor::Timer>(
-            event.get(), instanceIdExpiryCallBack);
-
-        auto rc = request->start();
-        if (rc)
+        auto inputRequest = std::make_shared<RegisteredRequest>(
+            key, std::move(requestMsg), std::move(responseHandler));
+        if (endpointMessageQueues.contains(eid))
         {
-            instanceIdDb.free(eid, instanceId);
-            error("Failure to send the PLDM request message");
-            return rc;
+            endpointMessageQueues[eid]->requestQueue.push_back(inputRequest);
+        }
+        else
+        {
+            std::deque<std::shared_ptr<RegisteredRequest>> reqQueue;
+            reqQueue.push_back(inputRequest);
+            endpointMessageQueues[eid] =
+                std::make_shared<EndpointMessageQueue>(eid, reqQueue, false);
         }
 
-        try
-        {
-            timer->start(duration_cast<std::chrono::microseconds>(
-                instanceIdExpiryInterval));
-        }
-        catch (const std::runtime_error& e)
-        {
-            instanceIdDb.free(eid, instanceId);
-            error(
-                "Failed to start the instance ID expiry timer. RC = {ERR_EXCEP}",
-                "ERR_EXCEP", e.what());
-            return PLDM_ERROR;
-        }
+        /* try to send new request if the endpoint is free */
+        pollEndpointQueue(eid);
 
-        handlers.emplace(key, std::make_tuple(std::move(request),
-                                              std::move(responseHandler),
-                                              std::move(timer)));
-        return rc;
+        return PLDM_SUCCESS;
     }
 
     /** @brief Handle PLDM response message
@@ -229,6 +303,10 @@
             responseHandler(eid, response, respMsgLen);
             instanceIdDb.free(key.eid, key.instanceId);
             handlers.erase(key);
+
+            endpointMessageQueues[eid]->activeRequest = false;
+            /* try to send new request if the endpoint is free */
+            pollEndpointQueue(eid);
         }
         else
         {
@@ -259,6 +337,10 @@
         std::tuple<std::unique_ptr<RequestInterface>, ResponseHandler,
                    std::unique_ptr<phosphor::Timer>>;
 
+    // Manage the requests of responders base on MCTP EID
+    std::map<mctp_eid_t, std::shared_ptr<EndpointMessageQueue>>
+        endpointMessageQueues;
+
     /** @brief Container for storing the PLDM request entries */
     std::unordered_map<RequestKey, RequestValue, RequestKeyHasher> handlers;
 
diff --git a/requester/test/handler_test.cpp b/requester/test/handler_test.cpp
index b656c1a..3b003a0 100644
--- a/requester/test/handler_test.cpp
+++ b/requester/test/handler_test.cpp
@@ -134,7 +134,7 @@
 
     pldm::Response response(sizeof(pldm_msg_hdr) + sizeof(uint8_t));
     auto responsePtr = reinterpret_cast<const pldm_msg*>(response.data());
-    reqHandler.handleResponse(eid, instanceIdNxt, 0, 0, responsePtr,
+    reqHandler.handleResponse(eid, instanceId, 0, 0, responsePtr,
                               sizeof(response));
     EXPECT_EQ(validResponse, true);
     EXPECT_EQ(callbackCount, 1);
@@ -144,7 +144,7 @@
     // simulate a delayed response for the first request
     waitEventExpiry(milliseconds(500));
 
-    reqHandler.handleResponse(eid, instanceId, 0, 0, responsePtr,
+    reqHandler.handleResponse(eid, instanceIdNxt, 0, 0, responsePtr,
                               sizeof(response));
 
     EXPECT_EQ(validResponse, true);