server: Use ringbuffer for socket backlog
Currently, the socket handler uses a linear buffer for the backlog data;
this means we need to shift up to 128kB of data after each socket
write().
This change introduces a single-producer-multiple-consumer ringbuffer,
to avoid the need for memmove()ing data around; we can simply update
pointers instead of shifting data.
We add this as a new file (ringbuffer.c), to make it a little more
modular. To mitigate the risk of subtle pointer arithmetic issues, we
add a set of tests too.
Change-Id: Ib7c5151d3cf1f588436f5461000b6fed22d0681c
Signed-off-by: Jeremy Kerr <jk@ozlabs.org>
diff --git a/socket-handler.c b/socket-handler.c
index a0b3028..11f183c 100644
--- a/socket-handler.c
+++ b/socket-handler.c
@@ -32,31 +32,24 @@
#include "console-server.h"
-#define min(a,b) ({ \
- const typeof(a) _a = (a); \
- const typeof(b) _b = (b); \
- _a < _b ? _a : _b; \
- })
-
const size_t buffer_size = 128 * 1024;
struct client {
- struct poller *poller;
- int fd;
- size_t buf_pos;
+ struct socket_handler *sh;
+ struct poller *poller;
+ struct ringbuffer_consumer *rbc;
+ int fd;
};
struct socket_handler {
- struct handler handler;
- struct console *console;
- struct poller *poller;
- int sd;
+ struct handler handler;
+ struct console *console;
+ struct poller *poller;
+ struct ringbuffer *ringbuffer;
+ int sd;
- uint8_t *buf;
- size_t buf_len;
-
- struct client **clients;
- int n_clients;
+ struct client **clients;
+ int n_clients;
};
static struct socket_handler *to_socket_handler(struct handler *handler)
@@ -64,14 +57,18 @@
return container_of(handler, struct socket_handler, handler);
}
-static void client_close(struct socket_handler *sh, struct client *client)
+static void client_close(struct client *client)
{
+ struct socket_handler *sh = client->sh;
int idx;
close(client->fd);
if (client->poller)
console_unregister_poller(sh->console, client->poller);
+ if (client->rbc)
+ ringbuffer_consumer_unregister(client->rbc);
+
for (idx = 0; idx < sh->n_clients; idx++)
if (sh->clients[idx] == client)
break;
@@ -88,18 +85,6 @@
sizeof(*sh->clients) * sh->n_clients);
}
-static size_t client_buffer_len(struct socket_handler *sh,
- struct client *client)
-{
- return sh->buf_len - client->buf_pos;
-}
-
-static void *client_buffer_data(struct socket_handler *sh,
- struct client *client)
-{
- return sh->buf + client->buf_pos;
-}
-
static ssize_t send_all(int fd, void *buf, size_t len, bool block)
{
int rc, flags;
@@ -130,37 +115,58 @@
/* Drain the queue to the socket and update the queue buffer. If force_len is
* set, send at least that many bytes from the queue, possibly while blocking
*/
-static int client_drain_queue(struct socket_handler *sh,
- struct client *client, size_t force_len)
+static int client_drain_queue(struct client *client, size_t force_len)
{
+ uint8_t *buf;
ssize_t wlen;
- size_t len;
+ size_t len, total_len;
bool block;
- len = client_buffer_len(sh, client);
- if (!len)
- return 0;
+ total_len = 0;
+ wlen = 0;
+ block = !!force_len;
- block = false;
- if (force_len) {
- assert(force_len <= len);
- block = true;
- len = force_len;
+ for (;;) {
+ len = ringbuffer_dequeue_peek(client->rbc, total_len, &buf);
+ if (!len)
+ break;
+
+ wlen = send_all(client->fd, buf, len, block);
+ if (wlen <= 0)
+ break;
+
+ total_len += wlen;
+
+ if (force_len && total_len >= force_len)
+ break;
}
- wlen = send_all(client->fd, client_buffer_data(sh, client), len, block);
if (wlen < 0)
return -1;
- if (force_len && wlen < force_len)
+ if (force_len && total_len < force_len)
return -1;
- client->buf_pos += wlen;
- assert(client->buf_pos <= sh->buf_len);
-
+ ringbuffer_dequeue_commit(client->rbc, total_len);
return 0;
}
+static enum ringbuffer_poll_ret client_ringbuffer_poll(void *arg,
+ size_t force_len)
+{
+ struct client *client = arg;
+ int rc;
+
+ rc = client_drain_queue(client, force_len);
+ if (rc) {
+ client->rbc = NULL;
+ client_close(client);
+ return RINGBUFFER_POLL_REMOVE;
+ }
+
+ return RINGBUFFER_POLL_OK;
+}
+
static enum poller_ret client_poll(struct handler *handler,
int events, void *data)
{
@@ -184,7 +190,7 @@
}
if (events & POLLOUT) {
- rc = client_drain_queue(sh, client, 0);
+ rc = client_drain_queue(client, 0);
if (rc)
goto err_close;
}
@@ -193,7 +199,7 @@
err_close:
client->poller = NULL;
- client_close(sh, client);
+ client_close(client);
return POLLER_REMOVE;
}
@@ -214,9 +220,12 @@
client = malloc(sizeof(*client));
memset(client, 0, sizeof(*client));
+ client->sh = sh;
client->fd = fd;
client->poller = console_register_poller(sh->console, handler,
client_poll, client->fd, POLLIN, client);
+ client->rbc = ringbuffer_consumer_register(sh->ringbuffer,
+ client_ringbuffer_poll, client);
n = sh->n_clients++;
sh->clients = realloc(sh->clients,
@@ -237,11 +246,10 @@
sh->console = console;
sh->clients = NULL;
sh->n_clients = 0;
- sh->buf_len = 0;
- sh->buf = malloc(buffer_size);
- if (!sh->buf) {
- warn("Can't allocate backlog buffer");
+ sh->ringbuffer = ringbuffer_init(buffer_size);
+ if (!sh->ringbuffer) {
+ warn("Can't allocate backlog ring buffer");
return -1;
}
@@ -277,78 +285,7 @@
static int socket_data(struct handler *handler, uint8_t *buf, size_t len)
{
struct socket_handler *sh = to_socket_handler(handler);
- struct client *client;
- size_t space, min_pos;
- int i, rc;
-
- space = buffer_size - sh->buf_len;
- min_pos = sh->buf_len;
-
- /* Ensure there is at least len bytes available in the global buffer.
- *
- * The 'space' var tells us how many bytes are available at the tail of
- * the buffer. However, if all clients have a non-zero buf_pos, then we
- * can drop bytes from the beginning (in the memmove below). So, we
- * account for each clients' buf_pos in the space check.
- *
- * If a client doesn't have sufficient space, perform a blocking write
- * to create it. This will result in incrementing client->buf_pos to
- * create space.
- */
- for (i = 0; i < sh->n_clients; i++) {
- ssize_t client_space;
-
- client = sh->clients[i];
- client_space = space + client->buf_pos;
-
- if (len > client_space) {
- /* Blocking send enough to create len bytes of space in
- * the global buffer. On success, this will increment
- * client->buf_pos by the number of bytes written
- */
- rc = client_drain_queue(sh, client, len - client_space);
- if (rc) {
- client_close(sh, client);
- i--;
- continue;
- }
- }
-
- min_pos = min(min_pos, client->buf_pos);
- }
-
- /* avoid pointless copying */
- if (!sh->n_clients) {
- sh->buf_len = 0;
- return 0;
- }
-
- /* drop unneeded buffer data... */
- sh->buf_len -= min_pos;
- memmove(sh->buf, sh->buf + min_pos, sh->buf_len);
-
- /* ... and add new data */
- memcpy(sh->buf + sh->buf_len, buf, len);
- sh->buf_len += len;
-
- /* now that the queue contains the new data, perform non-blocking send
- * to all clients */
- for (i = 0; i < sh->n_clients; i++) {
- client = sh->clients[i];
-
- /* We've dropped data in the global buffer, so need to update
- * clients' pos pointers to suit the new start of buffer
- * data */
- client->buf_pos -= min_pos;
- assert(client->buf_pos >= 0);
-
- rc = client_drain_queue(sh, client, 0);
- if (rc) {
- client_close(sh, client);
- i--;
- continue;
- }
- }
+ ringbuffer_queue(sh->ringbuffer, buf, len);
return 0;
}
@@ -357,13 +294,15 @@
struct socket_handler *sh = to_socket_handler(handler);
while (sh->n_clients)
- client_close(sh, sh->clients[0]);
+ client_close(sh->clients[0]);
if (sh->poller)
console_unregister_poller(sh->console, sh->poller);
+ if (sh->ringbuffer)
+ ringbuffer_fini(sh->ringbuffer);
+
close(sh->sd);
- free(sh->buf);
}
static struct socket_handler socket_handler = {