EventService: Queuing the request data and retry
Add support to queue the request event data.
Maintaining the proper state of connection and
retry for appropriate action depending on state.
If max retries are reached then suspend the retries.
Need to add async timeout between retries and take
action as configured 'DeliveryRetryPolicy' by user.
Tested:
- Using Telemetry reports, streamed data to validate
the queue by stopping listener.
- Verified the retry logic.
- Verfiied the appropriate action depending failed
state of connection.
Change-Id: Idd562cd512897625bf7b0d9a376207db5cb8642f
Signed-off-by: AppaRao Puli <apparao.puli@linux.intel.com>
diff --git a/http/http_client.hpp b/http/http_client.hpp
index 64f70cb..caaaccb 100644
--- a/http/http_client.hpp
+++ b/http/http_client.hpp
@@ -22,15 +22,25 @@
#include <functional>
#include <iostream>
#include <memory>
+#include <queue>
#include <string>
namespace crow
{
+static constexpr uint8_t maxRequestQueueSize = 50;
+
enum class ConnState
{
- initializing,
+ initialized,
+ connectInProgress,
+ connectFailed,
connected,
+ sendInProgress,
+ sendFailed,
+ recvFailed,
+ idle,
+ suspended,
closed
};
@@ -43,17 +53,68 @@
boost::beast::http::response<boost::beast::http::string_body> res;
boost::asio::ip::tcp::resolver::results_type endpoint;
std::vector<std::pair<std::string, std::string>> headers;
+ std::queue<std::string> requestDataQueue;
ConnState state;
std::string host;
std::string port;
+ std::string uri;
+ int retryCount;
+ int maxRetryAttempts;
- void sendMessage()
+ void doConnect()
{
- if (state != ConnState::connected)
+ if (state == ConnState::connectInProgress)
{
- BMCWEB_LOG_DEBUG << "Not connected to: " << host;
return;
}
+ state = ConnState::connectInProgress;
+
+ BMCWEB_LOG_DEBUG << "Trying to connect to: " << host << ":" << port;
+ // Set a timeout on the operation
+ conn.expires_after(std::chrono::seconds(30));
+ conn.async_connect(endpoint, [self(shared_from_this())](
+ const boost::beast::error_code& ec,
+ const boost::asio::ip::tcp::resolver::
+ results_type::endpoint_type& ep) {
+ if (ec)
+ {
+ BMCWEB_LOG_ERROR << "Connect " << ep
+ << " failed: " << ec.message();
+ self->state = ConnState::connectFailed;
+ self->checkQueue();
+ return;
+ }
+ self->state = ConnState::connected;
+ BMCWEB_LOG_DEBUG << "Connected to: " << ep;
+
+ self->checkQueue();
+ });
+ }
+
+ void sendMessage(const std::string& data)
+ {
+ if (state == ConnState::sendInProgress)
+ {
+ return;
+ }
+ state = ConnState::sendInProgress;
+
+ BMCWEB_LOG_DEBUG << __FUNCTION__ << "(): " << host << ":" << port;
+
+ req.version(static_cast<int>(11)); // HTTP 1.1
+ req.target(uri);
+ req.method(boost::beast::http::verb::post);
+
+ // Set headers
+ for (const auto& [key, value] : headers)
+ {
+ req.set(key, value);
+ }
+ req.set(boost::beast::http::field::host, host);
+ req.keep_alive(true);
+
+ req.body() = data;
+ req.prepare_payload();
// Set a timeout on the operation
conn.expires_after(std::chrono::seconds(30));
@@ -61,43 +122,37 @@
// Send the HTTP request to the remote host
boost::beast::http::async_write(
conn, req,
- [this,
- self(shared_from_this())](const boost::beast::error_code& ec,
+ [self(shared_from_this())](const boost::beast::error_code& ec,
const std::size_t& bytesTransferred) {
if (ec)
{
BMCWEB_LOG_ERROR << "sendMessage() failed: "
<< ec.message();
- this->doClose();
+ self->state = ConnState::sendFailed;
+ self->checkQueue();
return;
}
BMCWEB_LOG_DEBUG << "sendMessage() bytes transferred: "
<< bytesTransferred;
boost::ignore_unused(bytesTransferred);
- this->recvMessage();
+ self->recvMessage();
});
}
void recvMessage()
{
- if (state != ConnState::connected)
- {
- BMCWEB_LOG_DEBUG << "Not connected to: " << host;
- return;
- }
-
// Receive the HTTP response
boost::beast::http::async_read(
conn, buffer, res,
- [this,
- self(shared_from_this())](const boost::beast::error_code& ec,
+ [self(shared_from_this())](const boost::beast::error_code& ec,
const std::size_t& bytesTransferred) {
if (ec)
{
BMCWEB_LOG_ERROR << "recvMessage() failed: "
<< ec.message();
- this->doClose();
+ self->state = ConnState::recvFailed;
+ self->checkQueue();
return;
}
BMCWEB_LOG_DEBUG << "recvMessage() bytes transferred: "
@@ -105,9 +160,13 @@
boost::ignore_unused(bytesTransferred);
// Discard received data. We are not interested.
- BMCWEB_LOG_DEBUG << "recvMessage() data: " << res;
+ BMCWEB_LOG_DEBUG << "recvMessage() data: " << self->res;
- this->doClose();
+ // Send is successful, Lets remove data from queue
+ // check for next request data in queue.
+ self->requestDataQueue.pop();
+ self->state = ConnState::idle;
+ self->checkQueue();
});
}
@@ -126,58 +185,119 @@
BMCWEB_LOG_DEBUG << "Connection closed gracefully";
}
- ConnState getState()
+ void checkQueue(const bool newRecord = false)
{
- return state;
+ if (requestDataQueue.empty())
+ {
+ // TODO: Having issue in keeping connection alive. So lets close if
+ // nothing to be trasferred.
+ doClose();
+
+ BMCWEB_LOG_DEBUG << "requestDataQueue is empty\n";
+ return;
+ }
+
+ if (retryCount >= maxRetryAttempts)
+ {
+ BMCWEB_LOG_ERROR << "Maximum number of retries is reached.";
+
+ // Clear queue.
+ while (!requestDataQueue.empty())
+ {
+ requestDataQueue.pop();
+ }
+
+ // TODO: Take 'DeliveryRetryPolicy' action.
+ // For now, doing 'SuspendRetries' action.
+ state = ConnState::suspended;
+ return;
+ }
+
+ if ((state == ConnState::connectFailed) ||
+ (state == ConnState::sendFailed) ||
+ (state == ConnState::recvFailed))
+ {
+ if (newRecord)
+ {
+ // We are already running async wait and retry.
+ // Since record is added to queue, it gets the
+ // turn in FIFO.
+ return;
+ }
+
+ retryCount++;
+ // TODO: Perform async wait for retryTimeoutInterval before proceed.
+ }
+ else
+ {
+ // reset retry count.
+ retryCount = 0;
+ }
+
+ switch (state)
+ {
+ case ConnState::connectInProgress:
+ case ConnState::sendInProgress:
+ case ConnState::suspended:
+ // do nothing
+ break;
+ case ConnState::initialized:
+ case ConnState::closed:
+ case ConnState::connectFailed:
+ case ConnState::sendFailed:
+ case ConnState::recvFailed:
+ {
+ // After establishing the connection, checkQueue() will
+ // get called and it will attempt to send data.
+ doConnect();
+ break;
+ }
+ case ConnState::connected:
+ case ConnState::idle:
+ {
+ std::string data = requestDataQueue.front();
+ sendMessage(data);
+ break;
+ }
+ default:
+ break;
+ }
+
+ return;
}
public:
explicit HttpClient(boost::asio::io_context& ioc, const std::string& destIP,
- const std::string& destPort) :
+ const std::string& destPort,
+ const std::string& destUri) :
conn(ioc),
- host(destIP), port(destPort)
+ host(destIP), port(destPort), uri(destUri)
{
boost::asio::ip::tcp::resolver resolver(ioc);
endpoint = resolver.resolve(host, port);
- state = ConnState::initializing;
+ state = ConnState::initialized;
+ retryCount = 0;
+ maxRetryAttempts = 5;
}
- void doConnectAndSend(const std::string& path, const std::string& data)
+ void sendData(const std::string& data)
{
- BMCWEB_LOG_DEBUG << "doConnectAndSend " << host << ":" << port;
-
- req.version(static_cast<int>(11)); // HTTP 1.1
- req.target(path);
- req.method(boost::beast::http::verb::post);
-
- // Set headers
- for (const auto& [key, value] : headers)
+ if (state == ConnState::suspended)
{
- req.set(key, value);
+ return;
}
- req.set(boost::beast::http::field::host, host);
- req.keep_alive(true);
- req.body() = data;
- req.prepare_payload();
+ if (requestDataQueue.size() <= maxRequestQueueSize)
+ {
+ requestDataQueue.push(data);
+ checkQueue(true);
+ }
+ else
+ {
+ BMCWEB_LOG_ERROR << "Request queue is full. So ignoring data.";
+ }
- // Set a timeout on the operation
- conn.expires_after(std::chrono::seconds(30));
- conn.async_connect(endpoint, [this, self(shared_from_this())](
- const boost::beast::error_code& ec,
- const boost::asio::ip::tcp::resolver::
- results_type::endpoint_type& ep) {
- if (ec)
- {
- BMCWEB_LOG_ERROR << "Connect " << ep
- << " failed: " << ec.message();
- return;
- }
- state = ConnState::connected;
- BMCWEB_LOG_DEBUG << "Connected to: " << ep;
-
- sendMessage();
- });
+ return;
}
void setHeaders(
diff --git a/redfish-core/include/event_service_manager.hpp b/redfish-core/include/event_service_manager.hpp
index 9b907ba..0634328 100644
--- a/redfish-core/include/event_service_manager.hpp
+++ b/redfish-core/include/event_service_manager.hpp
@@ -52,7 +52,7 @@
port(inPort), path(inPath), uriProto(inUriProto)
{
conn = std::make_shared<crow::HttpClient>(
- crow::connections::systemBus->get_io_context(), host, port);
+ crow::connections::systemBus->get_io_context(), host, port, path);
}
~Subscription()
{
@@ -71,7 +71,7 @@
}
}
conn->setHeaders(reqHeaders);
- conn->doConnectAndSend(path, msg);
+ conn->sendData(msg);
}
private: