socket-handler: Use a global backlog buffer instead of per-client
Currently, we keep a backlog buffer for each connected client. This is a
waste, as it's storing the same data, just at different offsets.
This change uses a global buffer for the client backlog, with each
client tracking its current position in this buffer. We just make this
fixed-size, rather than trying to dynamically allocate.
Change-Id: I20bd0772c95d8237677108c7a62d9ec6ff8ed35d
Signed-off-by: Jeremy Kerr <jk@ozlabs.org>
diff --git a/socket-handler.c b/socket-handler.c
index 87db7f0..af43989 100644
--- a/socket-handler.c
+++ b/socket-handler.c
@@ -32,14 +32,18 @@
#include "console-server.h"
-const size_t buffer_size_max = 100 * 1024;
+#define min(a,b) ({ \
+ const typeof(a) _a = (a); \
+ const typeof(b) _b = (b); \
+ _a < _b ? _a : _b; \
+ })
+
+const size_t buffer_size = 128 * 1024;
struct client {
struct poller *poller;
int fd;
- uint8_t *buf;
- size_t buf_alloc;
- size_t buf_len;
+ size_t buf_pos;
};
struct socket_handler {
@@ -48,6 +52,9 @@
struct poller *poller;
int sd;
+ uint8_t *buf;
+ size_t buf_len;
+
struct client **clients;
int n_clients;
};
@@ -81,30 +88,16 @@
sizeof(*sh->clients) * sh->n_clients);
}
-static size_t client_buffer_space(struct client *client)
+static size_t client_buffer_len(struct socket_handler *sh,
+ struct client *client)
{
- return buffer_size_max - client->buf_len;
+ return sh->buf_len - client->buf_pos;
}
-static int client_queue_data(struct client *client, uint8_t *buf, size_t len)
+static void *client_buffer_data(struct socket_handler *sh,
+ struct client *client)
{
- /* we may need to allocate space, but we know that increasing by len
- * won't exceed buffer_size_max */
-
- if (client->buf_len + len > client->buf_alloc) {
- if (!client->buf_alloc)
- client->buf_alloc = 2048;
- client->buf_alloc *= 2;
-
- if (client->buf_alloc > buffer_size_max)
- client->buf_alloc = buffer_size_max;
-
- client->buf = realloc(client->buf, client->buf_alloc);
- }
-
- memcpy(client->buf + client->buf_len, buf, len);
- client->buf_len += len;
- return 0;
+ return sh->buf + client->buf_pos;
}
static ssize_t send_all(int fd, void *buf, size_t len, bool block)
@@ -137,31 +130,33 @@
/* Drain the queue to the socket and update the queue buffer. If force_len is
* set, send at least that many bytes from the queue, possibly while blocking
*/
-static int client_drain_queue(struct client *client, size_t force_len)
+static int client_drain_queue(struct socket_handler *sh,
+ struct client *client, size_t force_len)
{
ssize_t wlen;
size_t len;
bool block;
- if (!client->buf_len)
+ len = client_buffer_len(sh, client);
+ if (!len)
return 0;
block = false;
- len = client->buf_len;
if (force_len) {
+ assert(force_len <= len);
block = true;
len = force_len;
}
- wlen = send_all(client->fd, client->buf, len, block);
+ wlen = send_all(client->fd, client_buffer_data(sh, client), len, block);
if (wlen < 0)
return -1;
if (force_len && wlen < force_len)
return -1;
- client->buf_len -= wlen;
- memmove(client->buf, client->buf + wlen, client->buf_len);
+ client->buf_pos += wlen;
+ assert(client->buf_pos <= sh->buf_len);
return 0;
}
@@ -189,7 +184,7 @@
}
if (events & POLLOUT) {
- rc = client_drain_queue(client, 0);
+ rc = client_drain_queue(sh, client, 0);
if (rc)
goto err_close;
}
@@ -202,40 +197,6 @@
return POLLER_REMOVE;
}
-static int client_data(struct client *client, uint8_t *buf, size_t len)
-{
- ssize_t wlen;
- size_t space;
- int rc;
-
- space = client_buffer_space(client);
-
- /* blocking send to create space in the queue for this data */
- if (len > space) {
- rc = client_drain_queue(client, len - space);
- if (rc)
- return -1;
- }
-
- wlen = 0;
-
- /* if the queue is empty, try to send without queuing */
- if (!client->buf_len) {
- wlen = send_all(client->fd, buf, len, false);
- if (wlen < 0)
- return -1;
-
- if (wlen == len)
- return 0;
- }
-
- /* queue anything unsent. this queue should always succeed, as we've
- * created space above */
- client_queue_data(client, buf + wlen, len - wlen);
-
- return client_drain_queue(client, 0);
-}
-
static enum poller_ret socket_poll(struct handler *handler,
int events, void __attribute__((unused)) *data)
{
@@ -276,6 +237,13 @@
sh->console = console;
sh->clients = NULL;
sh->n_clients = 0;
+ sh->buf_len = 0;
+
+ sh->buf = malloc(buffer_size);
+ if (!sh->buf) {
+ warn("Can't allocate backlog buffer");
+ return -1;
+ }
sh->sd = socket(AF_UNIX, SOCK_STREAM, 0);
if(sh->sd < 0) {
@@ -309,20 +277,77 @@
static int socket_data(struct handler *handler, uint8_t *buf, size_t len)
{
struct socket_handler *sh = to_socket_handler(handler);
+ struct client *client;
+ size_t space, min_pos;
int i, rc;
- for (i = 0; i < sh->n_clients; i++) {
- struct client *client = sh->clients[i];
- rc = client_data(client, buf, len);
- if (!rc)
- continue;
+ space = buffer_size - sh->buf_len;
+ min_pos = sh->buf_len;
- /* if we failed to send data, close the client. This will
- * remove it from the clients array, so skip back to the item
- * that has taken its place
- */
- client_close(sh, client);
- i--;
+ /* Ensure there is at least len bytes available in the global buffer.
+ *
+ * The 'space' var tells us how many bytes are available at the tail of
+ * the buffer. However, if all clients have a non-zero buf_pos, then we
+ * can drop bytes from the beginning (in the memmove below). So, we
+ * account for each clients' buf_pos in the space check.
+ *
+ * If a client doesn't have sufficient space, perform a blocking write
+ * to create it. This will result in incrementing client->buf_pos to
+ * create space.
+ */
+ for (i = 0; i < sh->n_clients; i++) {
+ ssize_t client_space;
+
+ client = sh->clients[i];
+ client_space = space + client->buf_pos;
+
+ if (len > client_space) {
+ /* Blocking send enough to create len bytes of space in
+ * the global buffer. On success, this will increment
+ * client->buf_pos by the number of bytes written
+ */
+ rc = client_drain_queue(sh, client, len - client_space);
+ if (rc) {
+ client_close(sh, client);
+ i--;
+ continue;
+ }
+ }
+
+ min_pos = min(min_pos, client->buf_pos);
+ }
+
+ /* avoid pointless copying */
+ if (!sh->n_clients) {
+ sh->buf_len = 0;
+ return 0;
+ }
+
+ /* drop unneeded buffer data... */
+ sh->buf_len -= min_pos;
+ memmove(sh->buf, sh->buf + min_pos, sh->buf_len);
+
+ /* ... and add new data */
+ memcpy(sh->buf + sh->buf_len, buf, len);
+ sh->buf_len += len;
+
+ /* now that the queue contains the new data, perform non-blocking send
+ * to all clients */
+ for (i = 0; i < sh->n_clients; i++) {
+ client = sh->clients[i];
+
+ /* We've dropped data in the global buffer, so need to update
+ * clients' pos pointers to suit the new start of buffer
+ * data */
+ client->buf_pos -= min_pos;
+ assert(client->buf_pos >= 0);
+
+ rc = client_drain_queue(sh, client, 0);
+ if (rc) {
+ client_close(sh, client);
+ i--;
+ continue;
+ }
}
return 0;
}
@@ -339,6 +364,7 @@
console_unregister_poller(sh->console, sh->poller);
close(sh->sd);
+ free(sh->buf);
}
static struct socket_handler socket_handler = {