server: Use ringbuffer for socket backlog

Currently, the socket handler uses a linear buffer for the backlog data;
this means we need to shift up to 128kB of data after each socket
write().

This change introduces a single-producer-multiple-consumer ringbuffer,
to avoid the need for memmove()ing data around; we can simply update
pointers instead of shifting data.

We add this as a new file (ringbuffer.c), to make it a little more
modular. To mitigate the risk of subtle pointer arithmetic issues, we
add a set of tests too.

Change-Id: Ib7c5151d3cf1f588436f5461000b6fed22d0681c
Signed-off-by: Jeremy Kerr <jk@ozlabs.org>
diff --git a/socket-handler.c b/socket-handler.c
index a0b3028..11f183c 100644
--- a/socket-handler.c
+++ b/socket-handler.c
@@ -32,31 +32,24 @@
 
 #include "console-server.h"
 
-#define min(a,b) ({				\
-		const typeof(a) _a = (a);	\
-		const typeof(b) _b = (b);	\
-		_a < _b ? _a : _b;		\
-	})
-
 const size_t buffer_size = 128 * 1024;
 
 struct client {
-	struct poller	*poller;
-	int		fd;
-	size_t		buf_pos;
+	struct socket_handler		*sh;
+	struct poller			*poller;
+	struct ringbuffer_consumer	*rbc;
+	int				fd;
 };
 
 struct socket_handler {
-	struct handler	handler;
-	struct console	*console;
-	struct poller	*poller;
-	int		sd;
+	struct handler		handler;
+	struct console		*console;
+	struct poller		*poller;
+	struct ringbuffer	*ringbuffer;
+	int			sd;
 
-	uint8_t		*buf;
-	size_t		buf_len;
-
-	struct client	**clients;
-	int		n_clients;
+	struct client		**clients;
+	int			n_clients;
 };
 
 static struct socket_handler *to_socket_handler(struct handler *handler)
@@ -64,14 +57,18 @@
 	return container_of(handler, struct socket_handler, handler);
 }
 
-static void client_close(struct socket_handler *sh, struct client *client)
+static void client_close(struct client *client)
 {
+	struct socket_handler *sh = client->sh;
 	int idx;
 
 	close(client->fd);
 	if (client->poller)
 		console_unregister_poller(sh->console, client->poller);
 
+	if (client->rbc)
+		ringbuffer_consumer_unregister(client->rbc);
+
 	for (idx = 0; idx < sh->n_clients; idx++)
 		if (sh->clients[idx] == client)
 			break;
@@ -88,18 +85,6 @@
 			sizeof(*sh->clients) * sh->n_clients);
 }
 
-static size_t client_buffer_len(struct socket_handler *sh,
-		struct client *client)
-{
-	return sh->buf_len - client->buf_pos;
-}
-
-static void *client_buffer_data(struct socket_handler *sh,
-		struct client *client)
-{
-	return sh->buf + client->buf_pos;
-}
-
 static ssize_t send_all(int fd, void *buf, size_t len, bool block)
 {
 	int rc, flags;
@@ -130,37 +115,58 @@
 /* Drain the queue to the socket and update the queue buffer. If force_len is
  * set, send at least that many bytes from the queue, possibly while blocking
  */
-static int client_drain_queue(struct socket_handler *sh,
-		struct client *client, size_t force_len)
+static int client_drain_queue(struct client *client, size_t force_len)
 {
+	uint8_t *buf;
 	ssize_t wlen;
-	size_t len;
+	size_t len, total_len;
 	bool block;
 
-	len = client_buffer_len(sh, client);
-	if (!len)
-		return 0;
+	total_len = 0;
+	wlen = 0;
+	block = !!force_len;
 
-	block = false;
-	if (force_len) {
-		assert(force_len <= len);
-		block = true;
-		len = force_len;
+	for (;;) {
+		len = ringbuffer_dequeue_peek(client->rbc, total_len, &buf);
+		if (!len)
+			break;
+
+		wlen = send_all(client->fd, buf, len, block);
+		if (wlen <= 0)
+			break;
+
+		total_len += wlen;
+
+		if (force_len && total_len >= force_len)
+			break;
 	}
 
-	wlen = send_all(client->fd, client_buffer_data(sh, client), len, block);
 	if (wlen < 0)
 		return -1;
 
-	if (force_len && wlen < force_len)
+	if (force_len && total_len < force_len)
 		return -1;
 
-	client->buf_pos += wlen;
-	assert(client->buf_pos <= sh->buf_len);
-
+	ringbuffer_dequeue_commit(client->rbc, total_len);
 	return 0;
 }
 
+static enum ringbuffer_poll_ret client_ringbuffer_poll(void *arg,
+		size_t force_len)
+{
+	struct client *client = arg;
+	int rc;
+
+	rc = client_drain_queue(client, force_len);
+	if (rc) {
+		client->rbc = NULL;
+		client_close(client);
+		return RINGBUFFER_POLL_REMOVE;
+	}
+
+	return RINGBUFFER_POLL_OK;
+}
+
 static enum poller_ret client_poll(struct handler *handler,
 		int events, void *data)
 {
@@ -184,7 +190,7 @@
 	}
 
 	if (events & POLLOUT) {
-		rc = client_drain_queue(sh, client, 0);
+		rc = client_drain_queue(client, 0);
 		if (rc)
 			goto err_close;
 	}
@@ -193,7 +199,7 @@
 
 err_close:
 	client->poller = NULL;
-	client_close(sh, client);
+	client_close(client);
 	return POLLER_REMOVE;
 }
 
@@ -214,9 +220,12 @@
 	client = malloc(sizeof(*client));
 	memset(client, 0, sizeof(*client));
 
+	client->sh = sh;
 	client->fd = fd;
 	client->poller = console_register_poller(sh->console, handler,
 			client_poll, client->fd, POLLIN, client);
+	client->rbc = ringbuffer_consumer_register(sh->ringbuffer,
+			client_ringbuffer_poll, client);
 
 	n = sh->n_clients++;
 	sh->clients = realloc(sh->clients,
@@ -237,11 +246,10 @@
 	sh->console = console;
 	sh->clients = NULL;
 	sh->n_clients = 0;
-	sh->buf_len = 0;
 
-	sh->buf = malloc(buffer_size);
-	if (!sh->buf) {
-		warn("Can't allocate backlog buffer");
+	sh->ringbuffer = ringbuffer_init(buffer_size);
+	if (!sh->ringbuffer) {
+		warn("Can't allocate backlog ring buffer");
 		return -1;
 	}
 
@@ -277,78 +285,7 @@
 static int socket_data(struct handler *handler, uint8_t *buf, size_t len)
 {
 	struct socket_handler *sh = to_socket_handler(handler);
-	struct client *client;
-	size_t space, min_pos;
-	int i, rc;
-
-	space = buffer_size - sh->buf_len;
-	min_pos = sh->buf_len;
-
-	/* Ensure there is at least len bytes available in the global buffer.
-	 *
-	 * The 'space' var tells us how many bytes are available at the tail of
-	 * the buffer. However, if all clients have a non-zero buf_pos, then we
-	 * can drop bytes from the beginning (in the memmove below). So, we
-	 * account for each clients' buf_pos in the space check.
-	 *
-	 * If a client doesn't have sufficient space, perform a blocking write
-	 * to create it. This will result in incrementing client->buf_pos to
-	 * create space.
-	 */
-	for (i = 0; i < sh->n_clients; i++) {
-		ssize_t client_space;
-
-		client = sh->clients[i];
-		client_space = space + client->buf_pos;
-
-		if (len > client_space) {
-			/* Blocking send enough to create len bytes of space in
-			 * the global buffer. On success, this will increment
-			 * client->buf_pos by the number of bytes written
-			 */
-			rc = client_drain_queue(sh, client, len - client_space);
-			if (rc) {
-				client_close(sh, client);
-				i--;
-				continue;
-			}
-		}
-
-		min_pos = min(min_pos, client->buf_pos);
-	}
-
-	/* avoid pointless copying */
-	if (!sh->n_clients) {
-		sh->buf_len = 0;
-		return 0;
-	}
-
-	/* drop unneeded buffer data... */
-	sh->buf_len -= min_pos;
-	memmove(sh->buf, sh->buf + min_pos, sh->buf_len);
-
-	/* ... and add new data */
-	memcpy(sh->buf + sh->buf_len, buf, len);
-	sh->buf_len += len;
-
-	/* now that the queue contains the new data, perform non-blocking send
-	 * to all clients */
-	for (i = 0; i < sh->n_clients; i++) {
-		client = sh->clients[i];
-
-		/* We've dropped data in the global buffer, so need to update
-		 * clients' pos pointers to suit the new start of buffer
-		 * data */
-		client->buf_pos -= min_pos;
-		assert(client->buf_pos >= 0);
-
-		rc = client_drain_queue(sh, client, 0);
-		if (rc) {
-			client_close(sh, client);
-			i--;
-			continue;
-		}
-	}
+	ringbuffer_queue(sh->ringbuffer, buf, len);
 	return 0;
 }
 
@@ -357,13 +294,15 @@
 	struct socket_handler *sh = to_socket_handler(handler);
 
 	while (sh->n_clients)
-		client_close(sh, sh->clients[0]);
+		client_close(sh->clients[0]);
 
 	if (sh->poller)
 		console_unregister_poller(sh->console, sh->poller);
 
+	if (sh->ringbuffer)
+		ringbuffer_fini(sh->ringbuffer);
+
 	close(sh->sd);
-	free(sh->buf);
 }
 
 static struct socket_handler socket_handler = {