socket-handler: Use a global backlog buffer instead of per-client

Currently, we keep a backlog buffer for each connected client. This is a
waste, as it's storing the same data, just at different offsets.

This change uses a global buffer for the client backlog, with each
client tracking its current position in this buffer. We just make this
fixed-size, rather than trying to dynamically allocate.

Change-Id: I20bd0772c95d8237677108c7a62d9ec6ff8ed35d
Signed-off-by: Jeremy Kerr <jk@ozlabs.org>
diff --git a/socket-handler.c b/socket-handler.c
index 87db7f0..af43989 100644
--- a/socket-handler.c
+++ b/socket-handler.c
@@ -32,14 +32,18 @@
 
 #include "console-server.h"
 
-const size_t buffer_size_max = 100 * 1024;
+#define min(a,b) ({				\
+		const typeof(a) _a = (a);	\
+		const typeof(b) _b = (b);	\
+		_a < _b ? _a : _b;		\
+	})
+
+const size_t buffer_size = 128 * 1024;
 
 struct client {
 	struct poller	*poller;
 	int		fd;
-	uint8_t		*buf;
-	size_t		buf_alloc;
-	size_t		buf_len;
+	size_t		buf_pos;
 };
 
 struct socket_handler {
@@ -48,6 +52,9 @@
 	struct poller	*poller;
 	int		sd;
 
+	uint8_t		*buf;
+	size_t		buf_len;
+
 	struct client	**clients;
 	int		n_clients;
 };
@@ -81,30 +88,16 @@
 			sizeof(*sh->clients) * sh->n_clients);
 }
 
-static size_t client_buffer_space(struct client *client)
+static size_t client_buffer_len(struct socket_handler *sh,
+		struct client *client)
 {
-	return buffer_size_max - client->buf_len;
+	return sh->buf_len - client->buf_pos;
 }
 
-static int client_queue_data(struct client *client, uint8_t *buf, size_t len)
+static void *client_buffer_data(struct socket_handler *sh,
+		struct client *client)
 {
-	/* we may need to allocate space, but we know that increasing by len
-	 * won't exceed buffer_size_max */
-
-	if (client->buf_len + len > client->buf_alloc) {
-		if (!client->buf_alloc)
-			client->buf_alloc = 2048;
-		client->buf_alloc *= 2;
-
-		if (client->buf_alloc > buffer_size_max)
-			client->buf_alloc = buffer_size_max;
-
-		client->buf = realloc(client->buf, client->buf_alloc);
-	}
-
-	memcpy(client->buf + client->buf_len, buf, len);
-	client->buf_len += len;
-	return 0;
+	return sh->buf + client->buf_pos;
 }
 
 static ssize_t send_all(int fd, void *buf, size_t len, bool block)
@@ -137,31 +130,33 @@
 /* Drain the queue to the socket and update the queue buffer. If force_len is
  * set, send at least that many bytes from the queue, possibly while blocking
  */
-static int client_drain_queue(struct client *client, size_t force_len)
+static int client_drain_queue(struct socket_handler *sh,
+		struct client *client, size_t force_len)
 {
 	ssize_t wlen;
 	size_t len;
 	bool block;
 
-	if (!client->buf_len)
+	len = client_buffer_len(sh, client);
+	if (!len)
 		return 0;
 
 	block = false;
-	len = client->buf_len;
 	if (force_len) {
+		assert(force_len <= len);
 		block = true;
 		len = force_len;
 	}
 
-	wlen = send_all(client->fd, client->buf, len, block);
+	wlen = send_all(client->fd, client_buffer_data(sh, client), len, block);
 	if (wlen < 0)
 		return -1;
 
 	if (force_len && wlen < force_len)
 		return -1;
 
-	client->buf_len -= wlen;
-	memmove(client->buf, client->buf + wlen, client->buf_len);
+	client->buf_pos += wlen;
+	assert(client->buf_pos <= sh->buf_len);
 
 	return 0;
 }
@@ -189,7 +184,7 @@
 	}
 
 	if (events & POLLOUT) {
-		rc = client_drain_queue(client, 0);
+		rc = client_drain_queue(sh, client, 0);
 		if (rc)
 			goto err_close;
 	}
@@ -202,40 +197,6 @@
 	return POLLER_REMOVE;
 }
 
-static int client_data(struct client *client, uint8_t *buf, size_t len)
-{
-	ssize_t wlen;
-	size_t space;
-	int rc;
-
-	space = client_buffer_space(client);
-
-	/* blocking send to create space in the queue for this data */
-	if (len > space) {
-		rc = client_drain_queue(client, len - space);
-		if (rc)
-			return -1;
-	}
-
-	wlen = 0;
-
-	/* if the queue is empty, try to send without queuing */
-	if (!client->buf_len) {
-		wlen = send_all(client->fd, buf, len, false);
-		if (wlen < 0)
-			return -1;
-
-		if (wlen == len)
-			return 0;
-	}
-
-	/* queue anything unsent. this queue should always succeed, as we've
-	 * created space above */
-	client_queue_data(client, buf + wlen, len - wlen);
-
-	return client_drain_queue(client, 0);
-}
-
 static enum poller_ret socket_poll(struct handler *handler,
 		int events, void __attribute__((unused)) *data)
 {
@@ -276,6 +237,13 @@
 	sh->console = console;
 	sh->clients = NULL;
 	sh->n_clients = 0;
+	sh->buf_len = 0;
+
+	sh->buf = malloc(buffer_size);
+	if (!sh->buf) {
+		warn("Can't allocate backlog buffer");
+		return -1;
+	}
 
 	sh->sd = socket(AF_UNIX, SOCK_STREAM, 0);
 	if(sh->sd < 0) {
@@ -309,20 +277,77 @@
 static int socket_data(struct handler *handler, uint8_t *buf, size_t len)
 {
 	struct socket_handler *sh = to_socket_handler(handler);
+	struct client *client;
+	size_t space, min_pos;
 	int i, rc;
 
-	for (i = 0; i < sh->n_clients; i++) {
-		struct client *client = sh->clients[i];
-		rc = client_data(client, buf, len);
-		if (!rc)
-			continue;
+	space = buffer_size - sh->buf_len;
+	min_pos = sh->buf_len;
 
-		/* if we failed to send data, close the client. This will
-		 * remove it from the clients array, so skip back to the item
-		 * that has taken its place
-		 */
-		client_close(sh, client);
-		i--;
+	/* Ensure there is at least len bytes available in the global buffer.
+	 *
+	 * The 'space' var tells us how many bytes are available at the tail of
+	 * the buffer. However, if all clients have a non-zero buf_pos, then we
+	 * can drop bytes from the beginning (in the memmove below). So, we
+	 * account for each clients' buf_pos in the space check.
+	 *
+	 * If a client doesn't have sufficient space, perform a blocking write
+	 * to create it. This will result in incrementing client->buf_pos to
+	 * create space.
+	 */
+	for (i = 0; i < sh->n_clients; i++) {
+		ssize_t client_space;
+
+		client = sh->clients[i];
+		client_space = space + client->buf_pos;
+
+		if (len > client_space) {
+			/* Blocking send enough to create len bytes of space in
+			 * the global buffer. On success, this will increment
+			 * client->buf_pos by the number of bytes written
+			 */
+			rc = client_drain_queue(sh, client, len - client_space);
+			if (rc) {
+				client_close(sh, client);
+				i--;
+				continue;
+			}
+		}
+
+		min_pos = min(min_pos, client->buf_pos);
+	}
+
+	/* avoid pointless copying */
+	if (!sh->n_clients) {
+		sh->buf_len = 0;
+		return 0;
+	}
+
+	/* drop unneeded buffer data... */
+	sh->buf_len -= min_pos;
+	memmove(sh->buf, sh->buf + min_pos, sh->buf_len);
+
+	/* ... and add new data */
+	memcpy(sh->buf + sh->buf_len, buf, len);
+	sh->buf_len += len;
+
+	/* now that the queue contains the new data, perform non-blocking send
+	 * to all clients */
+	for (i = 0; i < sh->n_clients; i++) {
+		client = sh->clients[i];
+
+		/* We've dropped data in the global buffer, so need to update
+		 * clients' pos pointers to suit the new start of buffer
+		 * data */
+		client->buf_pos -= min_pos;
+		assert(client->buf_pos >= 0);
+
+		rc = client_drain_queue(sh, client, 0);
+		if (rc) {
+			client_close(sh, client);
+			i--;
+			continue;
+		}
 	}
 	return 0;
 }
@@ -339,6 +364,7 @@
 		console_unregister_poller(sh->console, sh->poller);
 
 	close(sh->sd);
+	free(sh->buf);
 }
 
 static struct socket_handler socket_handler = {