server: use ringbuffer for all handlers
Currently, we use the a ringbuffer within the socket handler to manage
bursts of data to slower clients.
However, we're also seeing cases where the local tty handler becomes
blocking as well. So, we want to implement a buffer within the tty
handler too.
This change moves the ringbuffer 'up a layer' - from the socket handler
to the core console code.
We remove the ->data_in callback from handlers, and work on the
assumption that handlers have registered their own consumer on the
console's ringbuffer (through a new helper function,
console_ringbuffer_consumer_register()).
Change-Id: Ie8f02d6632578c50bb5e2dfb9bee6ece86432135
Signed-off-by: Jeremy Kerr <jk@ozlabs.org>
diff --git a/console-server.c b/console-server.c
index 0ed358d..e8f0352 100644
--- a/console-server.c
+++ b/console-server.c
@@ -46,6 +46,8 @@
int tty_lpc_addr;
int tty_fd;
+ struct ringbuffer *rb;
+
struct handler **handlers;
int n_handlers;
@@ -65,6 +67,9 @@
/* we have one extra entry in the pollfds array for the VUART tty */
static const int n_internal_pollfds = 1;
+/* size of the shared backlog ringbuffer */
+const size_t buffer_size = 128 * 1024;
+
/* state shared with the signal handler */
static bool sigint;
@@ -311,29 +316,11 @@
}
}
-static int handlers_data_in(struct console *console, uint8_t *buf, size_t len)
+struct ringbuffer_consumer *console_ringbuffer_consumer_register(
+ struct console *console,
+ ringbuffer_poll_fn_t poll_fn, void *data)
{
- struct handler *handler;
- int i, rc, tmp;
-
- rc = 0;
-
- for (i = 0; i < console->n_handlers; i++) {
- handler = console->handlers[i];
-
- if (!handler->active)
- continue;
-
- if (!handler->data_in)
- continue;
-
- tmp = handler->data_in(handler, buf, len);
- if (tmp == HANDLER_EXIT)
- rc = 1;
- }
-
- return rc;
-
+ return ringbuffer_consumer_register(console->rb, poll_fn, data);
}
struct poller *console_poller_register(struct console *console,
@@ -501,7 +488,7 @@
rc = -1;
break;
}
- rc = handlers_data_in(console, buf, rc);
+ rc = ringbuffer_queue(console->rb, buf, rc);
if (rc)
break;
}
@@ -561,6 +548,7 @@
memset(console, 0, sizeof(*console));
console->pollfds = calloc(n_internal_pollfds,
sizeof(*console->pollfds));
+ console->rb = ringbuffer_init(buffer_size);
config = config_init(config_filename);
if (!config) {
diff --git a/console-server.h b/console-server.h
index 67cc99e..0831c57 100644
--- a/console-server.h
+++ b/console-server.h
@@ -21,13 +21,8 @@
struct console;
struct config;
-/* handler API */
-enum {
- HANDLER_OK = 0,
- HANDLER_EXIT,
-};
-
-/*
+/* Handler API.
+ *
* Console data handlers: these implement the functions that process
* data coming out of the main tty device.
*
@@ -35,8 +30,9 @@
* macro. We call each handler's ->init() function at startup, and ->fini() at
* exit.
*
- * Incoming data from the tty will be passed to the handler through the
- * ->data_in() function. To send data to the tty, use console_data_out().
+ * Handlers will almost always want to register a ringbuffer consumer, which
+ * provides data coming from the tty. Use cosole_register_ringbuffer_consumer()
+ * for this. To send data to the tty, use console_data_out().
*
* If a handler needs to monitor a separate file descriptor for events, use the
* poller API, through console_poller_register().
@@ -46,8 +42,6 @@
int (*init)(struct handler *handler,
struct console *console,
struct config *config);
- int (*data_in)(struct handler *handler,
- uint8_t *buf, size_t len);
void (*fini)(struct handler *handler);
bool active;
};
@@ -108,6 +102,11 @@
int ringbuffer_dequeue_commit(struct ringbuffer_consumer *rbc, size_t len);
+/* console wrapper around ringbuffer consumer registration */
+struct ringbuffer_consumer *console_ringbuffer_consumer_register(
+ struct console *console,
+ ringbuffer_poll_fn_t poll_fn, void *data);
+
/* config API */
struct config;
const char *config_get_value(struct config *config, const char *name);
diff --git a/log-handler.c b/log-handler.c
index 29792e9..149806c 100644
--- a/log-handler.c
+++ b/log-handler.c
@@ -30,12 +30,13 @@
#include "console-server.h"
struct log_handler {
- struct handler handler;
- struct console *console;
- int fd;
- size_t size;
- size_t maxsize;
- int pagesize;
+ struct handler handler;
+ struct console *console;
+ struct ringbuffer_consumer *rbc;
+ int fd;
+ size_t size;
+ size_t maxsize;
+ int pagesize;
};
@@ -48,37 +49,6 @@
return container_of(handler, struct log_handler, handler);
}
-static int log_init(struct handler *handler, struct console *console,
- struct config *config __attribute__((unused)))
-{
- struct log_handler *lh = to_log_handler(handler);
- const char *filename;
- int rc;
-
- lh->console = console;
- lh->maxsize = logsize;
- lh->pagesize = 4096;
- lh->size = 0;
-
- filename = config_get_value(config, "logfile");
- if (!filename)
- filename = default_filename;
-
- lh->fd = open(filename, O_RDWR | O_CREAT, 0644);
- if (lh->fd < 0) {
- warn("Can't open log buffer file %s", filename);
- return -1;
- }
- rc = ftruncate(lh->fd, 0);
- if (rc) {
- warn("Can't truncate file %s", filename);
- close(lh->fd);
- return -1;
- }
-
- return 0;
-}
-
static int log_trim(struct log_handler *lh, size_t space)
{
int rc, n_shift_pages, shift_len, shift_start;
@@ -109,9 +79,8 @@
}
-static int log_data(struct handler *handler, uint8_t *buf, size_t len)
+static int log_data(struct log_handler *lh, uint8_t *buf, size_t len)
{
- struct log_handler *lh = to_log_handler(handler);
int rc;
if (len > lh->maxsize) {
@@ -134,9 +103,69 @@
return 0;
}
+static enum ringbuffer_poll_ret log_ringbuffer_poll(void *arg,
+ size_t force_len __attribute__((unused)))
+{
+ struct log_handler *lh = arg;
+ uint8_t *buf;
+ size_t len;
+ int rc;
+
+ /* we log synchronously, so just dequeue everything we can, and
+ * commit straight away. */
+ for (;;) {
+ len = ringbuffer_dequeue_peek(lh->rbc, 0, &buf);
+ if (!len)
+ break;
+
+ rc = log_data(lh, buf, len);
+ if (rc)
+ return RINGBUFFER_POLL_REMOVE;
+
+ ringbuffer_dequeue_commit(lh->rbc, len);
+ }
+
+ return RINGBUFFER_POLL_OK;
+}
+
+static int log_init(struct handler *handler, struct console *console,
+ struct config *config __attribute__((unused)))
+{
+ struct log_handler *lh = to_log_handler(handler);
+ const char *filename;
+ int rc;
+
+ lh->console = console;
+ lh->maxsize = logsize;
+ lh->pagesize = 4096;
+ lh->size = 0;
+
+ filename = config_get_value(config, "logfile");
+ if (!filename)
+ filename = default_filename;
+
+ lh->fd = open(filename, O_RDWR | O_CREAT, 0644);
+ if (lh->fd < 0) {
+ warn("Can't open log buffer file %s", filename);
+ return -1;
+ }
+ rc = ftruncate(lh->fd, 0);
+ if (rc) {
+ warn("Can't truncate file %s", filename);
+ close(lh->fd);
+ return -1;
+ }
+
+ lh->rbc = console_ringbuffer_consumer_register(console,
+ log_ringbuffer_poll, lh);
+
+ return 0;
+}
+
static void log_fini(struct handler *handler)
{
struct log_handler *lh = to_log_handler(handler);
+ ringbuffer_consumer_unregister(lh->rbc);
close(lh->fd);
}
@@ -144,7 +173,6 @@
.handler = {
.name = "log",
.init = log_init,
- .data_in = log_data,
.fini = log_fini,
},
};
diff --git a/socket-handler.c b/socket-handler.c
index 24389c9..cf5a2de 100644
--- a/socket-handler.c
+++ b/socket-handler.c
@@ -32,8 +32,6 @@
#include "console-server.h"
-const size_t buffer_size = 128 * 1024;
-
struct client {
struct socket_handler *sh;
struct poller *poller;
@@ -45,7 +43,6 @@
struct handler handler;
struct console *console;
struct poller *poller;
- struct ringbuffer *ringbuffer;
int sd;
struct client **clients;
@@ -224,7 +221,7 @@
client->fd = fd;
client->poller = console_poller_register(sh->console, handler,
client_poll, client->fd, POLLIN, client);
- client->rbc = ringbuffer_consumer_register(sh->ringbuffer,
+ client->rbc = console_ringbuffer_consumer_register(sh->console,
client_ringbuffer_poll, client);
n = sh->n_clients++;
@@ -247,12 +244,6 @@
sh->clients = NULL;
sh->n_clients = 0;
- sh->ringbuffer = ringbuffer_init(buffer_size);
- if (!sh->ringbuffer) {
- warn("Can't allocate backlog ring buffer");
- return -1;
- }
-
sh->sd = socket(AF_UNIX, SOCK_STREAM, 0);
if(sh->sd < 0) {
warn("Can't create socket");
@@ -282,13 +273,6 @@
return 0;
}
-static int socket_data(struct handler *handler, uint8_t *buf, size_t len)
-{
- struct socket_handler *sh = to_socket_handler(handler);
- ringbuffer_queue(sh->ringbuffer, buf, len);
- return 0;
-}
-
static void socket_fini(struct handler *handler)
{
struct socket_handler *sh = to_socket_handler(handler);
@@ -299,9 +283,6 @@
if (sh->poller)
console_poller_unregister(sh->console, sh->poller);
- if (sh->ringbuffer)
- ringbuffer_fini(sh->ringbuffer);
-
close(sh->sd);
}
@@ -309,7 +290,6 @@
.handler = {
.name = "socket",
.init = socket_init,
- .data_in = socket_data,
.fini = socket_fini,
},
};
diff --git a/tty-handler.c b/tty-handler.c
index 523743b..22f9a75 100644
--- a/tty-handler.c
+++ b/tty-handler.c
@@ -18,6 +18,7 @@
#include <assert.h>
#include <err.h>
+#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
@@ -28,10 +29,11 @@
#include "console-server.h"
struct tty_handler {
- struct handler handler;
- struct console *console;
- struct poller *poller;
- int fd;
+ struct handler handler;
+ struct console *console;
+ struct ringbuffer_consumer *rbc;
+ struct poller *poller;
+ int fd;
};
struct terminal_speed_name {
@@ -44,24 +46,98 @@
return container_of(handler, struct tty_handler, handler);
}
+static int tty_drain_queue(struct tty_handler *th, size_t force_len)
+{
+ size_t len, total_len;
+ ssize_t wlen;
+ uint8_t *buf;
+ int flags;
+
+ /* if we're forcing data, we need to clear non-blocking mode */
+ if (force_len) {
+ flags = fcntl(th->fd, F_GETFL, 0);
+ if (flags & O_NONBLOCK) {
+ flags &= ~O_NONBLOCK;
+ fcntl(th->fd, F_SETFL, flags);
+ }
+ }
+
+ total_len = 0;
+
+ for (;;) {
+ len = ringbuffer_dequeue_peek(th->rbc, total_len, &buf);
+ if (!len)
+ break;
+
+ /* write as little as possible while blocking */
+ if (force_len && force_len < total_len + len)
+ len = force_len - total_len;
+
+ wlen = write(th->fd, buf, len);
+ if (wlen < 0) {
+ if (errno == EINTR)
+ continue;
+ if ((errno == EAGAIN || errno == EWOULDBLOCK)
+ && !force_len)
+ break;
+ warn("failed writing to local tty; disabling");
+ return -1;
+ }
+
+ total_len += wlen;
+
+ if (force_len && total_len >= force_len)
+ break;
+ }
+
+ ringbuffer_dequeue_commit(th->rbc, total_len);
+
+ if (force_len)
+ fcntl(th->fd, F_SETFL, flags | O_NONBLOCK);
+
+ return 0;
+}
+
+static enum ringbuffer_poll_ret tty_ringbuffer_poll(void *arg, size_t force_len)
+{
+ struct tty_handler *th = arg;
+ int rc;
+
+ rc = tty_drain_queue(th, force_len);
+ if (rc) {
+ console_poller_unregister(th->console, th->poller);
+ return RINGBUFFER_POLL_REMOVE;
+ }
+
+ return RINGBUFFER_POLL_OK;
+}
+
static enum poller_ret tty_poll(struct handler *handler,
int events, void __attribute__((unused)) *data)
{
struct tty_handler *th = to_tty_handler(handler);
uint8_t buf[4096];
ssize_t len;
+ int rc;
- if (!(events & POLLIN))
- return POLLER_OK;
+ if (events & POLLIN) {
+ len = read(th->fd, buf, sizeof(buf));
+ if (len <= 0) {
+ th->poller = NULL;
+ close(th->fd);
+ return POLLER_REMOVE;
+ }
- len = read(th->fd, buf, sizeof(buf));
- if (len <= 0) {
- th->poller = NULL;
- close(th->fd);
- return POLLER_REMOVE;
+ console_data_out(th->console, buf, len);
}
- console_data_out(th->console, buf, len);
+ if (events & POLLOUT) {
+ rc = tty_drain_queue(th, 0);
+ if (rc) {
+ ringbuffer_consumer_unregister(th->rbc);
+ return POLLER_REMOVE;
+ }
+ }
return POLLER_OK;
}
@@ -202,16 +278,12 @@
th->poller = console_poller_register(console, handler, tty_poll,
th->fd, POLLIN, NULL);
th->console = console;
+ th->rbc = console_ringbuffer_consumer_register(console,
+ tty_ringbuffer_poll, th);
return 0;
}
-static int tty_data(struct handler *handler, uint8_t *buf, size_t len)
-{
- struct tty_handler *th = to_tty_handler(handler);
- return write_buf_to_fd(th->fd, buf, len);
-}
-
static void tty_fini(struct handler *handler)
{
struct tty_handler *th = to_tty_handler(handler);
@@ -224,7 +296,6 @@
.handler = {
.name = "tty",
.init = tty_init,
- .data_in = tty_data,
.fini = tty_fini,
},
};