Handle timeouts in asio connections
It was reported in #98 that timeouts weren't working in some simple
cases. This appears to be because we've broken some of the assumptions
used by sd-bus around manually handling polling loops, and ignoring
sd_bus_get_timeout and sd_bus_get_events, as well as not calling
sd_bus_get_fd before every invocation. [1]
This commit:
Adds a steady_timer class to the asio object, which indirectly allows
expiring the underlying epoll loop early, based on the result of
sd_bus_get_timeout.
Changes the flow such that sd_bus_get_events is called and obeyed when
adding poll events to the reactor. While this wasn't seen in practice,
in theory we have the potential to miss events during heavy write loads
where the socket might block. This should resolve it.
Re-calls sd_bus_get_fd on each invocation of the poll loop, as directed
by the sd_bus documentation. Further testing is needed to see if this
resolves some of the match_t issues we've seen.
Tested: asio-example functions and prints expected content. More
in-system testing needed.
[1] https://www.freedesktop.org/software/systemd/man/249/sd_bus_get_fd.html#
Change-Id: I9abe99c95eb5395753dde33f92c831bd828238f3
Signed-off-by: Ed Tanous <etanous@nvidia.com>
diff --git a/include/sdbusplus/asio/connection.hpp b/include/sdbusplus/asio/connection.hpp
index c8f3bf0..9e6ecbf 100644
--- a/include/sdbusplus/asio/connection.hpp
+++ b/include/sdbusplus/asio/connection.hpp
@@ -25,6 +25,8 @@
#include <boost/asio/io_context.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/asio/post.hpp>
+#include <boost/asio/steady_timer.hpp>
+
#ifndef SDBUSPLUS_DISABLE_BOOST_COROUTINES
#include <boost/asio/spawn.hpp>
#endif
@@ -54,12 +56,13 @@
// default to system bus
connection(boost::asio::io_context& io) :
sdbusplus::bus_t(sdbusplus::bus::new_default()), io_(io),
- socket(io_.get_executor(), get_fd())
+ socket(io_.get_executor(), get_fd()), timer(io_.get_executor())
{
read_immediate();
}
connection(boost::asio::io_context& io, sd_bus* bus) :
- sdbusplus::bus_t(bus), io_(io), socket(io_.get_executor(), get_fd())
+ sdbusplus::bus_t(bus), io_(io), socket(io_.get_executor(), get_fd()),
+ timer(io_.get_executor())
{
read_immediate();
}
@@ -311,38 +314,110 @@
private:
boost::asio::io_context& io_;
boost::asio::posix::stream_descriptor socket;
+ boost::asio::steady_timer timer;
+
+ void process()
+ {
+ if (process_discard())
+ {
+ read_immediate();
+ }
+ else
+ {
+ read_wait();
+ }
+ }
+
+ void on_fd_event(const boost::system::error_code& ec)
+ {
+ // This is expected if the timer expired before an fd event was
+ // available
+ if (ec == boost::asio::error::operation_aborted)
+ {
+ return;
+ }
+ timer.cancel();
+ if (ec)
+ {
+ return;
+ }
+ process();
+ }
+
+ void on_timer_event(const boost::system::error_code& ec)
+ {
+ if (ec == boost::asio::error::operation_aborted)
+ {
+ // This is expected if the fd was available before the timer expired
+ return;
+ }
+ if (ec)
+ {
+ return;
+ }
+ // Abort existing operations on the socket
+ socket.cancel();
+ process();
+ }
void read_wait()
{
- socket.async_read_some(
- boost::asio::null_buffers(),
- [&](const boost::system::error_code& ec, std::size_t) {
- if (ec)
- {
- return;
- }
- if (process_discard())
- {
- read_immediate();
- }
- else
- {
- read_wait();
- }
- });
+ int fd = get_fd();
+ if (fd < 0)
+ {
+ return;
+ }
+ if (fd != socket.native_handle())
+ {
+ socket.release();
+ socket.assign(fd);
+ }
+ int events = get_events();
+ if (events < 0)
+ {
+ return;
+ }
+ if (events & POLLIN)
+ {
+ socket.async_wait(boost::asio::posix::stream_descriptor::wait_read,
+ std::bind_front(&connection::on_fd_event, this));
+ }
+ if (events & POLLOUT)
+ {
+ socket.async_wait(boost::asio::posix::stream_descriptor::wait_write,
+ std::bind_front(&connection::on_fd_event, this));
+ }
+ if (events & POLLERR)
+ {
+ socket.async_wait(boost::asio::posix::stream_descriptor::wait_error,
+ std::bind_front(&connection::on_fd_event, this));
+ }
+
+ uint64_t timeout = 0;
+ int timeret = get_timeout(&timeout);
+ if (timeret < 0)
+ {
+ return;
+ }
+ using clock = std::chrono::steady_clock;
+
+ using SdDuration = std::chrono::duration<uint64_t, std::micro>;
+ SdDuration sdTimeout(timeout);
+ // sd-bus always returns a 64 bit timeout regardless of architecture,
+ // and per the documentation routinely returns UINT64_MAX
+ if (sdTimeout > clock::duration::max())
+ {
+ // No need to start the timer if the expiration is longer than
+ // underlying timer can run.
+ return;
+ }
+ auto nativeTimeout = std::chrono::floor<clock::duration>(sdTimeout);
+ timer.expires_at(clock::time_point(nativeTimeout));
+ timer.async_wait(std::bind_front(&connection::on_timer_event, this));
}
void read_immediate()
{
- boost::asio::post(io_, [&] {
- if (process_discard())
- {
- read_immediate();
- }
- else
- {
- read_wait();
- }
- });
+ boost::asio::post(io_, std::bind_front(&connection::process, this));
}
};