Refactor HttpClient Class
Refactors HttpClient with the following changes:
- Convert class to singleton
- Replace circular buffers with devectors
- Sending queued requests and closing connections handled
within their own callback
- Add connection pooling (max size 4)
- HttpClient supports multiple connections to multiple clients
- Retry policies can be set for specific use cases
Also modifies its use in the Subscription class to be compatible
with the refactored code.
It is assumed that a BMC will be able to handle 4 parallel
connections and thus the max pool size is set as 4. The max
number of queued messages was left unchanged at 50. Eventually
we may want to allow tuning of these limits to boost performance.
That would come in a future patch.
Tested:
Launched two Event Listener servers that created 6 and 2
subscriptions. Sending a test event created a connection pool
for each server. 4 and 2 connections were added to each pool,
respectively and were used to send the test request. For the first
pool the 2 extra requests were placed into a queue until
connections became available. After a request completed, its
associated connection was used to send the next request in
the queue. Resending the test event caused those prior connections
to be reused instead of new connections being added to the pools.
Signed-off-by: Carson Labrado <clabrado@google.com>
Change-Id: Iba72b3e342cdc05d1fb972e2e9856763a0a1b3c5
diff --git a/http/http_client.hpp b/http/http_client.hpp
index 342ed1b..1637823 100644
--- a/http/http_client.hpp
+++ b/http/http_client.hpp
@@ -21,7 +21,7 @@
#include <boost/beast/core/tcp_stream.hpp>
#include <boost/beast/http/message.hpp>
#include <boost/beast/version.hpp>
-#include <boost/circular_buffer.hpp>
+#include <boost/container/devector.hpp>
#include <include/async_resolve.hpp>
#include <cstdlib>
@@ -34,8 +34,10 @@
namespace crow
{
-static constexpr uint8_t maxRequestQueueSize = 50;
-static constexpr unsigned int httpReadBodyLimit = 8192;
+// It is assumed that the BMC should be able to handle 4 parallel connections
+constexpr uint8_t maxPoolSize = 4;
+constexpr uint8_t maxRequestQueueSize = 50;
+constexpr unsigned int httpReadBodyLimit = 8192;
enum class ConnState
{
@@ -58,35 +60,66 @@
retry
};
-class HttpClient : public std::enable_shared_from_this<HttpClient>
+// We need to allow retry information to be set before a message has been sent
+// and a connection pool has been created
+struct RetryPolicyData
+{
+ uint32_t maxRetryAttempts = 5;
+ std::chrono::seconds retryIntervalSecs = std::chrono::seconds(0);
+ std::string retryPolicyAction = "TerminateAfterRetries";
+ std::string name;
+};
+
+struct PendingRequest
+{
+ std::string requestData;
+ std::function<void(bool, uint32_t)> callback;
+ RetryPolicyData retryPolicy;
+ PendingRequest(const std::string& requestData,
+ const std::function<void(bool, uint32_t)>& callback,
+ const RetryPolicyData& retryPolicy) :
+ requestData(requestData),
+ callback(callback), retryPolicy(retryPolicy)
+ {}
+};
+
+class ConnectionInfo : public std::enable_shared_from_this<ConnectionInfo>
{
private:
- crow::async_resolve::Resolver resolver;
- boost::beast::tcp_stream conn;
- boost::asio::steady_timer timer;
- boost::beast::flat_static_buffer<httpReadBodyLimit> buffer;
+ ConnState state = ConnState::initialized;
+ uint32_t retryCount = 0;
+ bool runningTimer = false;
+ std::string subId;
+ std::string host;
+ uint16_t port;
+ uint32_t connId;
+
+ // Retry policy information
+ // This should be updated before each message is sent
+ RetryPolicyData retryPolicy;
+
+ // Data buffers
+ std::string data;
boost::beast::http::request<boost::beast::http::string_body> req;
std::optional<
boost::beast::http::response_parser<boost::beast::http::string_body>>
parser;
- boost::circular_buffer_space_optimized<std::string> requestDataQueue{
- maxRequestQueueSize};
+ boost::beast::flat_static_buffer<httpReadBodyLimit> buffer;
- ConnState state = ConnState::initialized;
+ // Ascync callables
+ std::function<void(bool, uint32_t)> callback;
+ crow::async_resolve::Resolver resolver;
+ boost::beast::tcp_stream conn;
+ boost::asio::steady_timer timer;
- std::string subId;
- std::string host;
- uint16_t port = 0;
- uint32_t retryCount = 0;
- uint32_t maxRetryAttempts = 5;
- uint32_t retryIntervalSecs = 0;
- std::string retryPolicyAction = "TerminateAfterRetries";
- bool runningTimer = false;
+ friend class ConnectionPool;
void doResolve()
{
state = ConnState::resolveInProgress;
- BMCWEB_LOG_DEBUG << "Trying to resolve: " << host << ":" << port;
+ BMCWEB_LOG_DEBUG << "Trying to resolve: " << host << ":"
+ << std::to_string(port)
+ << ", id: " << std::to_string(connId);
auto respHandler =
[self(shared_from_this())](
@@ -97,12 +130,15 @@
{
BMCWEB_LOG_ERROR << "Resolve failed: " << ec.message();
self->state = ConnState::resolveFailed;
- self->handleConnState();
+ self->waitAndRetry();
return;
}
- BMCWEB_LOG_DEBUG << "Resolved";
+ BMCWEB_LOG_DEBUG << "Resolved " << self->host << ":"
+ << std::to_string(self->port)
+ << ", id: " << std::to_string(self->connId);
self->doConnect(endpointList);
};
+
resolver.asyncResolve(host, port, std::move(respHandler));
}
@@ -111,7 +147,9 @@
{
state = ConnState::connectInProgress;
- BMCWEB_LOG_DEBUG << "Trying to connect to: " << host << ":" << port;
+ BMCWEB_LOG_DEBUG << "Trying to connect to: " << host << ":"
+ << std::to_string(port)
+ << ", id: " << std::to_string(connId);
conn.expires_after(std::chrono::seconds(30));
conn.async_connect(
@@ -121,20 +159,24 @@
if (ec)
{
BMCWEB_LOG_ERROR << "Connect "
- << endpoint.address().to_string()
+ << endpoint.address().to_string() << ":"
+ << std::to_string(endpoint.port())
+ << ", id: " << std::to_string(self->connId)
<< " failed: " << ec.message();
self->state = ConnState::connectFailed;
- self->handleConnState();
+ self->waitAndRetry();
return;
}
- BMCWEB_LOG_DEBUG << "Connected to: "
- << endpoint.address().to_string();
+ BMCWEB_LOG_DEBUG
+ << "Connected to: " << endpoint.address().to_string() << ":"
+ << std::to_string(endpoint.port())
+ << ", id: " << std::to_string(self->connId);
self->state = ConnState::connected;
- self->handleConnState();
+ self->sendMessage();
});
}
- void sendMessage(const std::string& data)
+ void sendMessage()
{
state = ConnState::sendInProgress;
@@ -154,7 +196,7 @@
BMCWEB_LOG_ERROR << "sendMessage() failed: "
<< ec.message();
self->state = ConnState::sendFailed;
- self->handleConnState();
+ self->waitAndRetry();
return;
}
BMCWEB_LOG_DEBUG << "sendMessage() bytes transferred: "
@@ -182,7 +224,7 @@
BMCWEB_LOG_ERROR << "recvMessage() failed: "
<< ec.message();
self->state = ConnState::recvFailed;
- self->handleConnState();
+ self->waitAndRetry();
return;
}
BMCWEB_LOG_DEBUG << "recvMessage() bytes transferred: "
@@ -203,80 +245,44 @@
"receive Sent-Event. Header Response Code: "
<< respCode;
self->state = ConnState::recvFailed;
- self->handleConnState();
+ self->waitAndRetry();
return;
}
- // Send is successful, Lets remove data from queue
- // check for next request data in queue.
- if (!self->requestDataQueue.empty())
- {
- self->requestDataQueue.pop_front();
- }
- self->state = ConnState::idle;
+ // Send is successful
+ // Reset the counter just in case this was after retrying
+ self->retryCount = 0;
// Keep the connection alive if server supports it
// Else close the connection
BMCWEB_LOG_DEBUG << "recvMessage() keepalive : "
<< self->parser->keep_alive();
- if (!self->parser->keep_alive())
- {
- // Abort the connection since server is not keep-alive
- // enabled
- self->state = ConnState::abortConnection;
- }
- self->handleConnState();
+ self->callback(self->parser->keep_alive(), self->connId);
});
}
- void doClose()
- {
- state = ConnState::closeInProgress;
- boost::beast::error_code ec;
- conn.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
- conn.close();
-
- // not_connected happens sometimes so don't bother reporting it.
- if (ec && ec != boost::beast::errc::not_connected)
- {
- BMCWEB_LOG_ERROR << "shutdown failed: " << ec.message();
- return;
- }
- BMCWEB_LOG_DEBUG << "Connection closed gracefully";
- if ((state != ConnState::suspended) && (state != ConnState::terminated))
- {
- state = ConnState::closed;
- handleConnState();
- }
- }
-
void waitAndRetry()
{
- if (retryCount >= maxRetryAttempts)
+ if (retryCount >= retryPolicy.maxRetryAttempts)
{
BMCWEB_LOG_ERROR << "Maximum number of retries reached.";
-
- // Clear queue.
- while (!requestDataQueue.empty())
- {
- requestDataQueue.pop_front();
- }
-
- BMCWEB_LOG_DEBUG << "Retry policy: " << retryPolicyAction;
- if (retryPolicyAction == "TerminateAfterRetries")
+ BMCWEB_LOG_DEBUG << "Retry policy: "
+ << retryPolicy.retryPolicyAction;
+ if (retryPolicy.retryPolicyAction == "TerminateAfterRetries")
{
// TODO: delete subscription
state = ConnState::terminated;
+ callback(false, connId);
}
- if (retryPolicyAction == "SuspendRetries")
+ if (retryPolicy.retryPolicyAction == "SuspendRetries")
{
state = ConnState::suspended;
+ callback(false, connId);
}
// Reset the retrycount to zero so that client can try connecting
// again if needed
retryCount = 0;
- handleConnState();
return;
}
@@ -289,11 +295,13 @@
retryCount++;
- BMCWEB_LOG_DEBUG << "Attempt retry after " << retryIntervalSecs
+ BMCWEB_LOG_DEBUG << "Attempt retry after "
+ << std::to_string(
+ retryPolicy.retryIntervalSecs.count())
<< " seconds. RetryCount = " << retryCount;
- timer.expires_after(std::chrono::seconds(retryIntervalSecs));
+ timer.expires_after(retryPolicy.retryIntervalSecs);
timer.async_wait(
- [self = shared_from_this()](const boost::system::error_code ec) {
+ [self(shared_from_this())](const boost::system::error_code ec) {
if (ec == boost::asio::error::operation_aborted)
{
BMCWEB_LOG_DEBUG
@@ -308,119 +316,383 @@
}
self->runningTimer = false;
- // Lets close connection and start from resolve.
- self->doClose();
+ // Let's close the connection and restart from resolve.
+ self->doCloseAndRetry();
});
}
- void handleConnState()
+ void doClose()
{
- switch (state)
+ state = ConnState::closeInProgress;
+ boost::beast::error_code ec;
+ conn.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
+ conn.close();
+
+ // not_connected happens sometimes so don't bother reporting it.
+ if (ec && ec != boost::beast::errc::not_connected)
{
- case ConnState::resolveInProgress:
- case ConnState::connectInProgress:
- case ConnState::sendInProgress:
- case ConnState::recvInProgress:
- case ConnState::closeInProgress:
- {
- BMCWEB_LOG_DEBUG << "Async operation is already in progress";
- break;
- }
- case ConnState::initialized:
- case ConnState::closed:
- {
- if (requestDataQueue.empty())
- {
- BMCWEB_LOG_DEBUG << "requestDataQueue is empty";
- return;
- }
- doResolve();
- break;
- }
- case ConnState::suspended:
- case ConnState::terminated:
- {
- doClose();
- break;
- }
- case ConnState::resolveFailed:
- case ConnState::connectFailed:
- case ConnState::sendFailed:
- case ConnState::recvFailed:
- case ConnState::retry:
- {
- // In case of failures during connect and handshake
- // the retry policy will be applied
- waitAndRetry();
- break;
- }
- case ConnState::connected:
- case ConnState::idle:
- {
- // State idle means, previous attempt is successful
- // State connected means, client connection is established
- // successfully
- if (requestDataQueue.empty())
- {
- BMCWEB_LOG_DEBUG << "requestDataQueue is empty";
- return;
- }
- std::string data = requestDataQueue.front();
- sendMessage(data);
- break;
- }
- case ConnState::abortConnection:
- {
- // Server did not want to keep alive the session
- doClose();
- break;
- }
+ BMCWEB_LOG_ERROR << host << ":" << std::to_string(port)
+ << ", id: " << std::to_string(connId)
+ << "shutdown failed: " << ec.message();
+ return;
+ }
+ BMCWEB_LOG_DEBUG << host << ":" << std::to_string(port)
+ << ", id: " << std::to_string(connId)
+ << " closed gracefully";
+ if ((state != ConnState::suspended) && (state != ConnState::terminated))
+ {
+ state = ConnState::closed;
+ }
+ }
+
+ void doCloseAndRetry()
+ {
+ state = ConnState::closeInProgress;
+ boost::beast::error_code ec;
+ conn.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
+ conn.close();
+
+ // not_connected happens sometimes so don't bother reporting it.
+ if (ec && ec != boost::beast::errc::not_connected)
+ {
+ BMCWEB_LOG_ERROR << host << ":" << std::to_string(port)
+ << ", id: " << std::to_string(connId)
+ << "shutdown failed: " << ec.message();
+ return;
+ }
+ BMCWEB_LOG_DEBUG << host << ":" << std::to_string(port)
+ << ", id: " << std::to_string(connId)
+ << " closed gracefully";
+ if ((state != ConnState::suspended) && (state != ConnState::terminated))
+ {
+ // Now let's try to resend the data
+ state = ConnState::retry;
+ this->doResolve();
}
}
public:
- explicit HttpClient(boost::asio::io_context& ioc, const std::string& id,
- const std::string& destIP, uint16_t destPort,
- const std::string& destUri,
- const boost::beast::http::fields& httpHeader) :
- conn(ioc),
- timer(ioc),
+ explicit ConnectionInfo(boost::asio::io_context& ioc, const std::string& id,
+ const std::string& destIP, const uint16_t destPort,
+ const std::string& destUri,
+ const boost::beast::http::fields& httpHeader,
+ const unsigned int connId) :
+ subId(id),
+ host(destIP), port(destPort), connId(connId),
req(boost::beast::http::verb::post, destUri, 11, "", httpHeader),
- subId(id), host(destIP), port(destPort)
+ conn(ioc), timer(ioc)
{
req.set(boost::beast::http::field::host, host);
req.keep_alive(true);
}
+};
- void sendData(const std::string& data)
+class ConnectionPool : public std::enable_shared_from_this<ConnectionPool>
+{
+ private:
+ boost::asio::io_context& ioc;
+ const std::string id;
+ const std::string destIP;
+ const uint16_t destPort;
+ const std::string destUri;
+ const boost::beast::http::fields httpHeader;
+ std::vector<std::shared_ptr<ConnectionInfo>> connections;
+ boost::container::devector<PendingRequest> requestQueue;
+
+ friend class HttpClient;
+
+ // Configure a connections's data, callback, and retry info in preparation
+ // to begin sending a request
+ void setConnProps(ConnectionInfo& conn)
{
- if ((state == ConnState::suspended) || (state == ConnState::terminated))
+ if (requestQueue.empty())
{
+ BMCWEB_LOG_ERROR
+ << "setConnProps() should not have been called when requestQueue is empty";
return;
}
- if (requestDataQueue.size() <= maxRequestQueueSize)
+ auto req = requestQueue.front();
+ conn.retryPolicy = std::move(req.retryPolicy);
+ conn.data = std::move(req.requestData);
+ conn.callback = std::move(req.callback);
+
+ BMCWEB_LOG_DEBUG << "Setting properties for connection " << conn.host
+ << ":" << std::to_string(conn.port)
+ << ", id: " << std::to_string(conn.connId)
+ << ", retry policy is \"" << conn.retryPolicy.name
+ << "\"";
+
+ // We can remove the request from the queue at this point
+ requestQueue.pop_front();
+ }
+
+ // Configures a connection to use the specific retry policy.
+ inline void setConnRetryPolicy(ConnectionInfo& conn,
+ const RetryPolicyData& retryPolicy)
+ {
+ BMCWEB_LOG_DEBUG << destIP << ":" << std::to_string(destPort)
+ << ", id: " << std::to_string(conn.connId)
+ << " using retry policy \"" << retryPolicy.name
+ << "\"";
+
+ conn.retryPolicy = retryPolicy;
+ }
+
+ // Gets called as part of callback after request is sent
+ // Reuses the connection if there are any requests waiting to be sent
+ // Otherwise closes the connection if it is not a keep-alive
+ void sendNext(bool keepAlive, uint32_t connId)
+ {
+ auto conn = connections[connId];
+ // Reuse the connection to send the next request in the queue
+ if (!requestQueue.empty())
{
- requestDataQueue.push_back(data);
- handleConnState();
+ BMCWEB_LOG_DEBUG << std::to_string(requestQueue.size())
+ << " requests remaining in queue for " << destIP
+ << ":" << std::to_string(destPort)
+ << ", reusing connnection "
+ << std::to_string(connId);
+
+ setConnProps(*conn);
+
+ if (keepAlive)
+ {
+ conn->sendMessage();
+ }
+ else
+ {
+ // Server is not keep-alive enabled so we need to close the
+ // connection and then start over from resolve
+ conn->doClose();
+ conn->doResolve();
+ }
+ return;
+ }
+
+ // No more messages to send so close the connection if necessary
+ if (keepAlive)
+ {
+ conn->state = ConnState::idle;
}
else
{
- BMCWEB_LOG_ERROR << "Request queue is full. So ignoring data.";
+ // Abort the connection since server is not keep-alive enabled
+ conn->state = ConnState::abortConnection;
+ conn->doClose();
}
}
- void setRetryConfig(const uint32_t retryAttempts,
- const uint32_t retryTimeoutInterval)
+ void sendData(std::string& data, const RetryPolicyData& retryPolicy)
{
- maxRetryAttempts = retryAttempts;
- retryIntervalSecs = retryTimeoutInterval;
+ std::weak_ptr<ConnectionPool> weakSelf = weak_from_this();
+
+ // Callback to be called once the request has been sent
+ auto cb = [weakSelf](bool keepAlive, uint32_t connId) {
+ // If requests remain in the queue then we want to reuse this
+ // connection to send the next request
+ std::shared_ptr<ConnectionPool> self = weakSelf.lock();
+ if (!self)
+ {
+ BMCWEB_LOG_CRITICAL << self << " Failed to capture connection";
+ return;
+ }
+
+ self->sendNext(keepAlive, connId);
+ };
+
+ // Reuse an existing connection if one is available
+ for (unsigned int i = 0; i < connections.size(); i++)
+ {
+ auto conn = connections[i];
+ if ((conn->state == ConnState::idle) ||
+ (conn->state == ConnState::initialized) ||
+ (conn->state == ConnState::closed))
+ {
+ conn->data = std::move(data);
+ conn->callback = std::move(cb);
+ conn->retryPolicy = retryPolicy;
+ setConnRetryPolicy(*conn, retryPolicy);
+ std::string commonMsg = std::to_string(i) + " from pool " +
+ destIP + ":" + std::to_string(destPort);
+
+ if (conn->state == ConnState::idle)
+ {
+ BMCWEB_LOG_DEBUG << "Grabbing idle connection "
+ << commonMsg;
+ conn->sendMessage();
+ }
+ else
+ {
+ BMCWEB_LOG_DEBUG << "Reusing existing connection "
+ << commonMsg;
+ conn->doResolve();
+ }
+ return;
+ }
+ }
+
+ // All connections in use so create a new connection or add request to
+ // the queue
+ if (connections.size() < maxPoolSize)
+ {
+ BMCWEB_LOG_DEBUG << "Adding new connection to pool " << destIP
+ << ":" << std::to_string(destPort);
+ auto conn = addConnection();
+ conn->data = std::move(data);
+ conn->callback = std::move(cb);
+ setConnRetryPolicy(*conn, retryPolicy);
+ conn->doResolve();
+ }
+ else if (requestQueue.size() < maxRequestQueueSize)
+ {
+ BMCWEB_LOG_ERROR << "Max pool size reached. Adding data to queue.";
+ requestQueue.emplace_back(std::move(data), std::move(cb),
+ retryPolicy);
+ }
+ else
+ {
+ BMCWEB_LOG_ERROR << destIP << ":" << std::to_string(destPort)
+ << " request queue full. Dropping request.";
+ }
}
- void setRetryPolicy(const std::string& retryPolicy)
+ std::shared_ptr<ConnectionInfo>& addConnection()
{
- retryPolicyAction = retryPolicy;
+ unsigned int newId = static_cast<unsigned int>(connections.size());
+
+ auto& ret = connections.emplace_back(std::make_shared<ConnectionInfo>(
+ ioc, id, destIP, destPort, destUri, httpHeader, newId));
+
+ BMCWEB_LOG_DEBUG << "Added connection "
+ << std::to_string(connections.size() - 1)
+ << " to pool " << destIP << ":"
+ << std::to_string(destPort);
+
+ return ret;
+ }
+
+ public:
+ explicit ConnectionPool(boost::asio::io_context& ioc, const std::string& id,
+ const std::string& destIP, const uint16_t destPort,
+ const std::string& destUri,
+ const boost::beast::http::fields& httpHeader) :
+ ioc(ioc),
+ id(id), destIP(destIP), destPort(destPort), destUri(destUri),
+ httpHeader(httpHeader)
+ {
+ std::string clientKey = destIP + ":" + std::to_string(destPort);
+ BMCWEB_LOG_DEBUG << "Initializing connection pool for " << destIP << ":"
+ << std::to_string(destPort);
+
+ // Initialize the pool with a single connection
+ addConnection();
}
};
+class HttpClient
+{
+ private:
+ std::unordered_map<std::string, std::shared_ptr<ConnectionPool>>
+ connectionPools;
+ boost::asio::io_context& ioc =
+ crow::connections::systemBus->get_io_context();
+ std::unordered_map<std::string, RetryPolicyData> retryInfo;
+ HttpClient() = default;
+
+ public:
+ HttpClient(const HttpClient&) = delete;
+ HttpClient& operator=(const HttpClient&) = delete;
+ HttpClient(HttpClient&&) = delete;
+ HttpClient& operator=(HttpClient&&) = delete;
+ ~HttpClient() = default;
+
+ static HttpClient& getInstance()
+ {
+ static HttpClient handler;
+ return handler;
+ }
+
+ void sendData(std::string& data, const std::string& id,
+ const std::string& destIP, const uint16_t destPort,
+ const std::string& destUri,
+ const boost::beast::http::fields& httpHeader,
+ std::string& retryPolicyName)
+ {
+ std::string clientKey = destIP + ":" + std::to_string(destPort);
+ // Use nullptr to avoid creating a ConnectionPool each time
+ auto result = connectionPools.try_emplace(clientKey, nullptr);
+ if (result.second)
+ {
+ // Now actually create the ConnectionPool shared_ptr since it does
+ // not already exist
+ result.first->second = std::make_shared<ConnectionPool>(
+ ioc, id, destIP, destPort, destUri, httpHeader);
+ BMCWEB_LOG_DEBUG << "Created connection pool for " << clientKey;
+ }
+ else
+ {
+ BMCWEB_LOG_DEBUG << "Using existing connection pool for "
+ << clientKey;
+ }
+
+ // Get the associated retry policy
+ auto policy = retryInfo.try_emplace(retryPolicyName);
+ if (policy.second)
+ {
+ BMCWEB_LOG_DEBUG << "Creating retry policy \"" << retryPolicyName
+ << "\" with default values";
+ policy.first->second.name = retryPolicyName;
+ }
+
+ // Send the data using either the existing connection pool or the newly
+ // created connection pool
+ result.first->second->sendData(data, policy.first->second);
+ }
+
+ void setRetryConfig(const uint32_t retryAttempts,
+ const uint32_t retryTimeoutInterval,
+ const std::string& retryPolicyName)
+ {
+ // We need to create the retry policy if one does not already exist for
+ // the given retryPolicyName
+ auto result = retryInfo.try_emplace(retryPolicyName);
+ if (result.second)
+ {
+ BMCWEB_LOG_DEBUG << "setRetryConfig(): Creating new retry policy \""
+ << retryPolicyName << "\"";
+ result.first->second.name = retryPolicyName;
+ }
+ else
+ {
+ BMCWEB_LOG_DEBUG << "setRetryConfig(): Updating retry info for \""
+ << retryPolicyName << "\"";
+ }
+
+ result.first->second.maxRetryAttempts = retryAttempts;
+ result.first->second.retryIntervalSecs =
+ std::chrono::seconds(retryTimeoutInterval);
+ }
+
+ void setRetryPolicy(const std::string& retryPolicy,
+ const std::string& retryPolicyName)
+ {
+ // We need to create the retry policy if one does not already exist for
+ // the given retryPolicyName
+ auto result = retryInfo.try_emplace(retryPolicyName);
+ if (result.second)
+ {
+ BMCWEB_LOG_DEBUG << "setRetryPolicy(): Creating new retry policy \""
+ << retryPolicyName << "\"";
+ result.first->second.name = retryPolicyName;
+ }
+ else
+ {
+ BMCWEB_LOG_DEBUG << "setRetryPolicy(): Updating retry policy for \""
+ << retryPolicyName << "\"";
+ }
+
+ result.first->second.retryPolicyAction = retryPolicy;
+ }
+};
} // namespace crow
diff --git a/redfish-core/include/event_service_manager.hpp b/redfish-core/include/event_service_manager.hpp
index bc5af85..58bf257 100644
--- a/redfish-core/include/event_service_manager.hpp
+++ b/redfish-core/include/event_service_manager.hpp
@@ -381,7 +381,7 @@
~Subscription() = default;
- bool sendEvent(const std::string& msg)
+ bool sendEvent(std::string& msg)
{
persistent_data::EventServiceConfig eventServiceConfig =
persistent_data::EventServiceStore::getInstance()
@@ -391,15 +391,9 @@
return false;
}
- if (conn == nullptr)
- {
- // create the HttpClient connection
- conn = std::make_shared<crow::HttpClient>(
- crow::connections::systemBus->get_io_context(), id, host, port,
- path, httpHeaders);
- }
-
- conn->sendData(msg);
+ // A connection pool will be created if one does not already exist
+ crow::HttpClient::getInstance().sendData(msg, id, host, port, path,
+ httpHeaders, retryPolicyName);
eventSeqNum++;
if (sseConn != nullptr)
@@ -430,8 +424,9 @@
{"Name", "Event Log"},
{"Events", logEntryArray}};
- return this->sendEvent(
- msg.dump(2, ' ', true, nlohmann::json::error_handler_t::replace));
+ std::string strMsg =
+ msg.dump(2, ' ', true, nlohmann::json::error_handler_t::replace);
+ return this->sendEvent(strMsg);
}
#ifndef BMCWEB_ENABLE_REDFISH_DBUS_LOG_ENTRIES
@@ -497,8 +492,9 @@
{"Name", "Event Log"},
{"Events", logEntryArray}};
- this->sendEvent(
- msg.dump(2, ' ', true, nlohmann::json::error_handler_t::replace));
+ std::string strMsg =
+ msg.dump(2, ' ', true, nlohmann::json::error_handler_t::replace);
+ this->sendEvent(strMsg);
}
#endif
@@ -529,25 +525,22 @@
return;
}
- this->sendEvent(
- msg.dump(2, ' ', true, nlohmann::json::error_handler_t::replace));
+ std::string strMsg =
+ msg.dump(2, ' ', true, nlohmann::json::error_handler_t::replace);
+ this->sendEvent(strMsg);
}
void updateRetryConfig(const uint32_t retryAttempts,
const uint32_t retryTimeoutInterval)
{
- if (conn != nullptr)
- {
- conn->setRetryConfig(retryAttempts, retryTimeoutInterval);
- }
+ crow::HttpClient::getInstance().setRetryConfig(
+ retryAttempts, retryTimeoutInterval, retryPolicyName);
}
void updateRetryPolicy()
{
- if (conn != nullptr)
- {
- conn->setRetryPolicy(retryPolicy);
- }
+ crow::HttpClient::getInstance().setRetryPolicy(retryPolicy,
+ retryPolicyName);
}
uint64_t getEventSeqNum() const
@@ -561,8 +554,8 @@
uint16_t port = 0;
std::string path;
std::string uriProto;
- std::shared_ptr<crow::HttpClient> conn = nullptr;
std::shared_ptr<crow::ServerSentEvents> sseConn = nullptr;
+ std::string retryPolicyName = "SubscriptionEvent";
};
class EventServiceManager
@@ -1040,8 +1033,10 @@
{"Name", "Event Log"},
{"Id", eventId},
{"Events", eventRecord}};
- entry->sendEvent(msgJson.dump(
- 2, ' ', true, nlohmann::json::error_handler_t::replace));
+
+ std::string strMsg = msgJson.dump(
+ 2, ' ', true, nlohmann::json::error_handler_t::replace);
+ entry->sendEvent(strMsg);
eventId++; // increament the eventId
}
else
@@ -1060,8 +1055,10 @@
{"OriginOfCondition", "/ibm/v1/HMC/BroadcastService"},
{"Name", "Broadcast Message"},
{"Message", broadcastMsg}};
- entry->sendEvent(msgJson.dump(
- 2, ' ', true, nlohmann::json::error_handler_t::replace));
+
+ std::string strMsg = msgJson.dump(
+ 2, ' ', true, nlohmann::json::error_handler_t::replace);
+ entry->sendEvent(strMsg);
}
}