server: use ringbuffer for all handlers

Currently, we use the a ringbuffer within the socket handler to manage
bursts of data to slower clients.

However, we're also seeing cases where the local tty handler becomes
blocking as well. So, we want to implement a buffer within the tty
handler too.

This change moves the ringbuffer 'up a layer' - from the socket handler
to the core console code.

We remove the ->data_in callback from handlers, and work on the
assumption that handlers have registered their own consumer on the
console's ringbuffer (through a new helper function,
console_ringbuffer_consumer_register()).

Change-Id: Ie8f02d6632578c50bb5e2dfb9bee6ece86432135
Signed-off-by: Jeremy Kerr <jk@ozlabs.org>
diff --git a/log-handler.c b/log-handler.c
index 29792e9..149806c 100644
--- a/log-handler.c
+++ b/log-handler.c
@@ -30,12 +30,13 @@
 #include "console-server.h"
 
 struct log_handler {
-	struct handler	handler;
-	struct console	*console;
-	int		fd;
-	size_t		size;
-	size_t		maxsize;
-	int		pagesize;
+	struct handler			handler;
+	struct console			*console;
+	struct ringbuffer_consumer	*rbc;
+	int				fd;
+	size_t				size;
+	size_t				maxsize;
+	int				pagesize;
 };
 
 
@@ -48,37 +49,6 @@
 	return container_of(handler, struct log_handler, handler);
 }
 
-static int log_init(struct handler *handler, struct console *console,
-		struct config *config __attribute__((unused)))
-{
-	struct log_handler *lh = to_log_handler(handler);
-	const char *filename;
-	int rc;
-
-	lh->console = console;
-	lh->maxsize = logsize;
-	lh->pagesize = 4096;
-	lh->size = 0;
-
-	filename = config_get_value(config, "logfile");
-	if (!filename)
-		filename = default_filename;
-
-	lh->fd = open(filename, O_RDWR | O_CREAT, 0644);
-	if (lh->fd < 0) {
-		warn("Can't open log buffer file %s", filename);
-		return -1;
-	}
-	rc = ftruncate(lh->fd, 0);
-	if (rc) {
-		warn("Can't truncate file %s", filename);
-		close(lh->fd);
-		return -1;
-	}
-
-	return 0;
-}
-
 static int log_trim(struct log_handler *lh, size_t space)
 {
 	int rc, n_shift_pages, shift_len, shift_start;
@@ -109,9 +79,8 @@
 
 }
 
-static int log_data(struct handler *handler, uint8_t *buf, size_t len)
+static int log_data(struct log_handler *lh, uint8_t *buf, size_t len)
 {
-	struct log_handler *lh = to_log_handler(handler);
 	int rc;
 
 	if (len > lh->maxsize) {
@@ -134,9 +103,69 @@
 	return 0;
 }
 
+static enum ringbuffer_poll_ret log_ringbuffer_poll(void *arg,
+		size_t force_len __attribute__((unused)))
+{
+	struct log_handler *lh = arg;
+	uint8_t *buf;
+	size_t len;
+	int rc;
+
+	/* we log synchronously, so just dequeue everything we can, and
+	 * commit straight away. */
+	for (;;) {
+		len = ringbuffer_dequeue_peek(lh->rbc, 0, &buf);
+		if (!len)
+			break;
+
+		rc = log_data(lh, buf, len);
+		if (rc)
+			return RINGBUFFER_POLL_REMOVE;
+
+		ringbuffer_dequeue_commit(lh->rbc, len);
+	}
+
+	return RINGBUFFER_POLL_OK;
+}
+
+static int log_init(struct handler *handler, struct console *console,
+		struct config *config __attribute__((unused)))
+{
+	struct log_handler *lh = to_log_handler(handler);
+	const char *filename;
+	int rc;
+
+	lh->console = console;
+	lh->maxsize = logsize;
+	lh->pagesize = 4096;
+	lh->size = 0;
+
+	filename = config_get_value(config, "logfile");
+	if (!filename)
+		filename = default_filename;
+
+	lh->fd = open(filename, O_RDWR | O_CREAT, 0644);
+	if (lh->fd < 0) {
+		warn("Can't open log buffer file %s", filename);
+		return -1;
+	}
+	rc = ftruncate(lh->fd, 0);
+	if (rc) {
+		warn("Can't truncate file %s", filename);
+		close(lh->fd);
+		return -1;
+	}
+
+	lh->rbc = console_ringbuffer_consumer_register(console,
+			log_ringbuffer_poll, lh);
+
+	return 0;
+}
+
 static void log_fini(struct handler *handler)
 {
 	struct log_handler *lh = to_log_handler(handler);
+	ringbuffer_consumer_unregister(lh->rbc);
 	close(lh->fd);
 }
 
@@ -144,7 +173,6 @@
 	.handler = {
 		.name		= "log",
 		.init		= log_init,
-		.data_in	= log_data,
 		.fini		= log_fini,
 	},
 };