server: Use ringbuffer for socket backlog

Currently, the socket handler uses a linear buffer for the backlog data;
this means we need to shift up to 128kB of data after each socket
write().

This change introduces a single-producer-multiple-consumer ringbuffer,
to avoid the need for memmove()ing data around; we can simply update
pointers instead of shifting data.

We add this as a new file (ringbuffer.c), to make it a little more
modular. To mitigate the risk of subtle pointer arithmetic issues, we
add a set of tests too.

Change-Id: Ib7c5151d3cf1f588436f5461000b6fed22d0681c
Signed-off-by: Jeremy Kerr <jk@ozlabs.org>
diff --git a/Makefile.am b/Makefile.am
index e07d4ad..96fa9fb 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -11,6 +11,7 @@
 	 console-server.c \
 	 console-server.h \
 	 util.c \
+	 ringbuffer.c \
 	 config.c \
 	 log-handler.c \
 	 socket-handler.c \
@@ -23,4 +24,4 @@
 	 console-socket.c \
 	 util.c
 
-#SUBDIRS = test
+SUBDIRS = test
diff --git a/configure.ac b/configure.ac
index 04bf36a..a306b31 100644
--- a/configure.ac
+++ b/configure.ac
@@ -44,5 +44,5 @@
 )
 
 # Create configured output
-AC_CONFIG_FILES([Makefile])
+AC_CONFIG_FILES([Makefile test/Makefile])
 AC_OUTPUT
diff --git a/console-server.h b/console-server.h
index 2658e89..4d0d5b4 100644
--- a/console-server.h
+++ b/console-server.h
@@ -67,6 +67,33 @@
 
 void console_unregister_poller(struct console *console, struct poller *poller);
 
+/* ringbuffer API */
+enum ringbuffer_poll_ret {
+	RINGBUFFER_POLL_OK = 0,
+	RINGBUFFER_POLL_REMOVE,
+};
+
+typedef enum ringbuffer_poll_ret (*ringbuffer_poll_fn_t)(void *data,
+		size_t force_len);
+
+struct ringbuffer;
+struct ringbuffer_consumer;
+
+struct ringbuffer *ringbuffer_init(size_t size);
+void ringbuffer_fini(struct ringbuffer *rb);
+
+struct ringbuffer_consumer *ringbuffer_consumer_register(struct ringbuffer *rb,
+		ringbuffer_poll_fn_t poll_fn, void *data);
+
+void ringbuffer_consumer_unregister(struct ringbuffer_consumer *rbc);
+
+int ringbuffer_queue(struct ringbuffer *rb, uint8_t *data, size_t len);
+
+size_t ringbuffer_dequeue_peek(struct ringbuffer_consumer *rbc, size_t offset,
+		uint8_t **data);
+
+int ringbuffer_dequeue_commit(struct ringbuffer_consumer *rbc, size_t len);
+
 /* config API */
 struct config;
 const char *config_get_value(struct config *config, const char *name);
diff --git a/ringbuffer.c b/ringbuffer.c
new file mode 100644
index 0000000..3edab0d
--- /dev/null
+++ b/ringbuffer.c
@@ -0,0 +1,220 @@
+/**
+ * Copyright © 2017 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include <assert.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "console-server.h"
+
+#define min(a,b) ({				\
+		const typeof(a) _a = (a);	\
+		const typeof(b) _b = (b);	\
+		_a < _b ? _a : _b;		\
+	})
+
+struct ringbuffer {
+	uint8_t				*buf;
+	size_t				size;
+	size_t				tail;
+	struct ringbuffer_consumer	**consumers;
+	int				n_consumers;
+};
+
+struct ringbuffer_consumer {
+	struct ringbuffer		*rb;
+	ringbuffer_poll_fn_t		poll_fn;
+	void				*poll_data;
+	size_t				pos;
+};
+
+struct ringbuffer *ringbuffer_init(size_t size)
+{
+	struct ringbuffer *rb;
+
+	rb = malloc(sizeof(*rb) + size);
+	if (!rb)
+		return NULL;
+
+	memset(rb, 0, sizeof(*rb));
+	rb->size = size;
+	rb->buf = (void *)(rb + 1);
+
+	return rb;
+}
+
+void ringbuffer_fini(struct ringbuffer *rb)
+{
+	while (rb->n_consumers)
+		ringbuffer_consumer_unregister(rb->consumers[0]);
+	free(rb);
+}
+
+struct ringbuffer_consumer *ringbuffer_consumer_register(struct ringbuffer *rb,
+		ringbuffer_poll_fn_t fn, void *data)
+{
+	struct ringbuffer_consumer *rbc;
+	int n;
+
+	rbc = malloc(sizeof(*rbc));
+	rbc->rb = rb;
+	rbc->poll_fn = fn;
+	rbc->poll_data = data;
+	rbc->pos = rb->tail;
+
+	n = rb->n_consumers++;
+	rb->consumers = realloc(rb->consumers,
+			sizeof(*rb->consumers) * rb->n_consumers);
+	rb->consumers[n] = rbc;
+
+	return rbc;
+}
+
+void ringbuffer_consumer_unregister(struct ringbuffer_consumer *rbc)
+{
+	struct ringbuffer *rb = rbc->rb;
+	int i;
+
+	for (i = 0; i < rb->n_consumers; i++)
+		if (rb->consumers[i] == rbc)
+			break;
+
+	assert(i < rb->n_consumers);
+
+	rb->n_consumers--;
+
+	memmove(&rb->consumers[i], &rb->consumers[i+1],
+			sizeof(*rb->consumers)	* (rb->n_consumers - i));
+
+	rb->consumers = realloc(rb->consumers,
+			sizeof(*rb->consumers) * rb->n_consumers);
+
+	free(rbc);
+}
+
+static size_t ringbuffer_len(struct ringbuffer_consumer *rbc)
+{
+	if (rbc->pos <= rbc->rb->tail)
+		return rbc->rb->tail - rbc->pos;
+	else
+		return rbc->rb->tail + rbc->rb->size - rbc->pos;
+}
+
+static size_t ringbuffer_space(struct ringbuffer_consumer *rbc)
+{
+	return rbc->rb->size - ringbuffer_len(rbc) - 1;
+}
+
+static int ringbuffer_consumer_ensure_space(
+		struct ringbuffer_consumer *rbc, size_t len)
+{
+	enum ringbuffer_poll_ret prc;
+	int force_len;
+
+	if (ringbuffer_space(rbc) >= len)
+		return 0;
+
+	force_len = len - ringbuffer_space(rbc);
+
+	prc = rbc->poll_fn(rbc->poll_data, force_len);
+	if (prc != RINGBUFFER_POLL_OK)
+		return -1;
+
+	return 0;
+}
+
+int ringbuffer_queue(struct ringbuffer *rb, uint8_t *data, size_t len)
+{
+	struct ringbuffer_consumer *rbc;
+	size_t wlen;
+	int i, rc;
+
+	if (len >= rb->size)
+		return -1;
+
+	/* Ensure there is at least len bytes of space available.
+	 *
+	 * If a client doesn't have sufficient space, perform a blocking write
+	 * (by calling ->poll_fn with force_len) to create it.
+	 */
+	for (i = 0; i < rb->n_consumers; i++) {
+		rbc = rb->consumers[i];
+
+		rc = ringbuffer_consumer_ensure_space(rbc, len);
+		if (rc) {
+			ringbuffer_consumer_unregister(rbc);
+			i--;
+			continue;
+		}
+
+		assert(ringbuffer_space(rbc) >= len);
+	}
+
+	/* Now that we know we have enough space, add new data to tail */
+	wlen = min(len, rb->size - rb->tail);
+	memcpy(rb->buf + rb->tail, data, wlen);
+	rb->tail = (rb->tail + wlen) % rb->size;
+	len -= wlen;
+	data += wlen;
+
+	memcpy(rb->buf, data, len);
+	rb->tail += len;
+
+	/* Inform consumers of new data in non-blocking mode, by calling
+	 * ->poll_fn with 0 force_len */
+	for (i = 0; i < rb->n_consumers; i++) {
+		enum ringbuffer_poll_ret prc;
+
+		rbc = rb->consumers[i];
+		prc = rbc->poll_fn(rbc->poll_data, 0);
+		if (prc == RINGBUFFER_POLL_REMOVE) {
+			ringbuffer_consumer_unregister(rbc);
+			i--;
+		}
+	}
+
+	return 0;
+}
+
+size_t ringbuffer_dequeue_peek(struct ringbuffer_consumer *rbc, size_t offset,
+		uint8_t **data)
+{
+	struct ringbuffer *rb = rbc->rb;
+	size_t pos;
+	size_t len;
+
+	if (offset >= ringbuffer_len(rbc))
+		return 0;
+
+	pos = (rbc->pos + offset) % rb->size;
+	if (pos <= rb->tail)
+		len = rb->tail - pos;
+	else
+		len = rb->size - pos;
+
+	*data = rb->buf + pos;
+	return len;
+}
+
+int ringbuffer_dequeue_commit(struct ringbuffer_consumer *rbc, size_t len)
+{
+	assert(len <= ringbuffer_len(rbc));
+	rbc->pos = (rbc->pos + len) % rbc->rb->size;
+	return 0;
+}
diff --git a/socket-handler.c b/socket-handler.c
index a0b3028..11f183c 100644
--- a/socket-handler.c
+++ b/socket-handler.c
@@ -32,31 +32,24 @@
 
 #include "console-server.h"
 
-#define min(a,b) ({				\
-		const typeof(a) _a = (a);	\
-		const typeof(b) _b = (b);	\
-		_a < _b ? _a : _b;		\
-	})
-
 const size_t buffer_size = 128 * 1024;
 
 struct client {
-	struct poller	*poller;
-	int		fd;
-	size_t		buf_pos;
+	struct socket_handler		*sh;
+	struct poller			*poller;
+	struct ringbuffer_consumer	*rbc;
+	int				fd;
 };
 
 struct socket_handler {
-	struct handler	handler;
-	struct console	*console;
-	struct poller	*poller;
-	int		sd;
+	struct handler		handler;
+	struct console		*console;
+	struct poller		*poller;
+	struct ringbuffer	*ringbuffer;
+	int			sd;
 
-	uint8_t		*buf;
-	size_t		buf_len;
-
-	struct client	**clients;
-	int		n_clients;
+	struct client		**clients;
+	int			n_clients;
 };
 
 static struct socket_handler *to_socket_handler(struct handler *handler)
@@ -64,14 +57,18 @@
 	return container_of(handler, struct socket_handler, handler);
 }
 
-static void client_close(struct socket_handler *sh, struct client *client)
+static void client_close(struct client *client)
 {
+	struct socket_handler *sh = client->sh;
 	int idx;
 
 	close(client->fd);
 	if (client->poller)
 		console_unregister_poller(sh->console, client->poller);
 
+	if (client->rbc)
+		ringbuffer_consumer_unregister(client->rbc);
+
 	for (idx = 0; idx < sh->n_clients; idx++)
 		if (sh->clients[idx] == client)
 			break;
@@ -88,18 +85,6 @@
 			sizeof(*sh->clients) * sh->n_clients);
 }
 
-static size_t client_buffer_len(struct socket_handler *sh,
-		struct client *client)
-{
-	return sh->buf_len - client->buf_pos;
-}
-
-static void *client_buffer_data(struct socket_handler *sh,
-		struct client *client)
-{
-	return sh->buf + client->buf_pos;
-}
-
 static ssize_t send_all(int fd, void *buf, size_t len, bool block)
 {
 	int rc, flags;
@@ -130,37 +115,58 @@
 /* Drain the queue to the socket and update the queue buffer. If force_len is
  * set, send at least that many bytes from the queue, possibly while blocking
  */
-static int client_drain_queue(struct socket_handler *sh,
-		struct client *client, size_t force_len)
+static int client_drain_queue(struct client *client, size_t force_len)
 {
+	uint8_t *buf;
 	ssize_t wlen;
-	size_t len;
+	size_t len, total_len;
 	bool block;
 
-	len = client_buffer_len(sh, client);
-	if (!len)
-		return 0;
+	total_len = 0;
+	wlen = 0;
+	block = !!force_len;
 
-	block = false;
-	if (force_len) {
-		assert(force_len <= len);
-		block = true;
-		len = force_len;
+	for (;;) {
+		len = ringbuffer_dequeue_peek(client->rbc, total_len, &buf);
+		if (!len)
+			break;
+
+		wlen = send_all(client->fd, buf, len, block);
+		if (wlen <= 0)
+			break;
+
+		total_len += wlen;
+
+		if (force_len && total_len >= force_len)
+			break;
 	}
 
-	wlen = send_all(client->fd, client_buffer_data(sh, client), len, block);
 	if (wlen < 0)
 		return -1;
 
-	if (force_len && wlen < force_len)
+	if (force_len && total_len < force_len)
 		return -1;
 
-	client->buf_pos += wlen;
-	assert(client->buf_pos <= sh->buf_len);
-
+	ringbuffer_dequeue_commit(client->rbc, total_len);
 	return 0;
 }
 
+static enum ringbuffer_poll_ret client_ringbuffer_poll(void *arg,
+		size_t force_len)
+{
+	struct client *client = arg;
+	int rc;
+
+	rc = client_drain_queue(client, force_len);
+	if (rc) {
+		client->rbc = NULL;
+		client_close(client);
+		return RINGBUFFER_POLL_REMOVE;
+	}
+
+	return RINGBUFFER_POLL_OK;
+}
+
 static enum poller_ret client_poll(struct handler *handler,
 		int events, void *data)
 {
@@ -184,7 +190,7 @@
 	}
 
 	if (events & POLLOUT) {
-		rc = client_drain_queue(sh, client, 0);
+		rc = client_drain_queue(client, 0);
 		if (rc)
 			goto err_close;
 	}
@@ -193,7 +199,7 @@
 
 err_close:
 	client->poller = NULL;
-	client_close(sh, client);
+	client_close(client);
 	return POLLER_REMOVE;
 }
 
@@ -214,9 +220,12 @@
 	client = malloc(sizeof(*client));
 	memset(client, 0, sizeof(*client));
 
+	client->sh = sh;
 	client->fd = fd;
 	client->poller = console_register_poller(sh->console, handler,
 			client_poll, client->fd, POLLIN, client);
+	client->rbc = ringbuffer_consumer_register(sh->ringbuffer,
+			client_ringbuffer_poll, client);
 
 	n = sh->n_clients++;
 	sh->clients = realloc(sh->clients,
@@ -237,11 +246,10 @@
 	sh->console = console;
 	sh->clients = NULL;
 	sh->n_clients = 0;
-	sh->buf_len = 0;
 
-	sh->buf = malloc(buffer_size);
-	if (!sh->buf) {
-		warn("Can't allocate backlog buffer");
+	sh->ringbuffer = ringbuffer_init(buffer_size);
+	if (!sh->ringbuffer) {
+		warn("Can't allocate backlog ring buffer");
 		return -1;
 	}
 
@@ -277,78 +285,7 @@
 static int socket_data(struct handler *handler, uint8_t *buf, size_t len)
 {
 	struct socket_handler *sh = to_socket_handler(handler);
-	struct client *client;
-	size_t space, min_pos;
-	int i, rc;
-
-	space = buffer_size - sh->buf_len;
-	min_pos = sh->buf_len;
-
-	/* Ensure there is at least len bytes available in the global buffer.
-	 *
-	 * The 'space' var tells us how many bytes are available at the tail of
-	 * the buffer. However, if all clients have a non-zero buf_pos, then we
-	 * can drop bytes from the beginning (in the memmove below). So, we
-	 * account for each clients' buf_pos in the space check.
-	 *
-	 * If a client doesn't have sufficient space, perform a blocking write
-	 * to create it. This will result in incrementing client->buf_pos to
-	 * create space.
-	 */
-	for (i = 0; i < sh->n_clients; i++) {
-		ssize_t client_space;
-
-		client = sh->clients[i];
-		client_space = space + client->buf_pos;
-
-		if (len > client_space) {
-			/* Blocking send enough to create len bytes of space in
-			 * the global buffer. On success, this will increment
-			 * client->buf_pos by the number of bytes written
-			 */
-			rc = client_drain_queue(sh, client, len - client_space);
-			if (rc) {
-				client_close(sh, client);
-				i--;
-				continue;
-			}
-		}
-
-		min_pos = min(min_pos, client->buf_pos);
-	}
-
-	/* avoid pointless copying */
-	if (!sh->n_clients) {
-		sh->buf_len = 0;
-		return 0;
-	}
-
-	/* drop unneeded buffer data... */
-	sh->buf_len -= min_pos;
-	memmove(sh->buf, sh->buf + min_pos, sh->buf_len);
-
-	/* ... and add new data */
-	memcpy(sh->buf + sh->buf_len, buf, len);
-	sh->buf_len += len;
-
-	/* now that the queue contains the new data, perform non-blocking send
-	 * to all clients */
-	for (i = 0; i < sh->n_clients; i++) {
-		client = sh->clients[i];
-
-		/* We've dropped data in the global buffer, so need to update
-		 * clients' pos pointers to suit the new start of buffer
-		 * data */
-		client->buf_pos -= min_pos;
-		assert(client->buf_pos >= 0);
-
-		rc = client_drain_queue(sh, client, 0);
-		if (rc) {
-			client_close(sh, client);
-			i--;
-			continue;
-		}
-	}
+	ringbuffer_queue(sh->ringbuffer, buf, len);
 	return 0;
 }
 
@@ -357,13 +294,15 @@
 	struct socket_handler *sh = to_socket_handler(handler);
 
 	while (sh->n_clients)
-		client_close(sh, sh->clients[0]);
+		client_close(sh->clients[0]);
 
 	if (sh->poller)
 		console_unregister_poller(sh->console, sh->poller);
 
+	if (sh->ringbuffer)
+		ringbuffer_fini(sh->ringbuffer);
+
 	close(sh->sd);
-	free(sh->buf);
 }
 
 static struct socket_handler socket_handler = {
diff --git a/test/Makefile.am b/test/Makefile.am
index 45be362..e8c25d0 100644
--- a/test/Makefile.am
+++ b/test/Makefile.am
@@ -3,10 +3,15 @@
 # Run all 'check' test programs
 TESTS = $(check_PROGRAMS)
 
-#Build/add config-test to test suite
-check_PROGRAMS += config-test
-config_test_CPPFLAGS = -Igtest $(GTEST_CPPFLAGS) \
-	-DCONFIG_TEST -DSYSCONFDIR=\"\"
-config_test_LDFLAGS = -lgtest_main -lgtest $(PTHREAD_LIBS) $(OESDK_TESTCASE_FLAGS)
-config_test_SOURCES = config.c
-config_test_LDADD = $(top_builddir)/config.o
+check_PROGRAMS += \
+	test-ringbuffer-contained-read \
+	test-ringbuffer-contained-offset-read \
+	test-ringbuffer-read-commit \
+	test-ringbuffer-boundary-read \
+	test-ringbuffer-simple-poll \
+	test-ringbuffer-boundary-poll \
+	test-ringbuffer-poll-force
+
+EXTRA_DIST = ringbuffer-test-utils.c
+
+AM_CPPFLAGS = -I$(top_srcdir)
diff --git a/test/ringbuffer-test-utils.c b/test/ringbuffer-test-utils.c
new file mode 100644
index 0000000..c489f9d
--- /dev/null
+++ b/test/ringbuffer-test-utils.c
@@ -0,0 +1,97 @@
+
+struct rb_test_ctx {
+	struct ringbuffer_consumer	*rbc;
+	bool	ignore_poll;
+	bool	force_only;
+	int	count;
+	uint8_t	*data;
+	int	len;
+};
+
+void ringbuffer_test_context_init(struct rb_test_ctx *ctx)
+{
+	ctx->count = 0;
+	ctx->data = NULL;
+	ctx->len = 0;
+	ctx->ignore_poll = false;
+	ctx->force_only = false;
+}
+
+void ringbuffer_test_context_fini(struct rb_test_ctx *ctx)
+{
+	free(ctx->data);
+}
+
+enum ringbuffer_poll_ret ringbuffer_poll_nop(
+		void *data __attribute__((unused)),
+		size_t force_len __attribute__((unused)))
+{
+	return RINGBUFFER_POLL_OK;
+}
+
+enum ringbuffer_poll_ret ringbuffer_poll_append_all(void *data,
+		size_t force_len)
+{
+	struct rb_test_ctx *ctx = data;
+	size_t len, total_len;
+	uint8_t *buf;
+
+	if (ctx->ignore_poll)
+		return RINGBUFFER_POLL_OK;
+
+	if (ctx->force_only && !force_len)
+		return RINGBUFFER_POLL_OK;
+
+	ctx->count++;
+
+	total_len = 0;
+	for (;;) {
+		len = ringbuffer_dequeue_peek(ctx->rbc, total_len, &buf);
+		if (!len)
+			break;
+
+		if (ctx->force_only && total_len + len > force_len)
+			len = force_len - total_len;
+
+		ctx->data = realloc(ctx->data, ctx->len + len);
+		memcpy(ctx->data + ctx->len, buf, len);
+		ctx->len += len;
+		total_len += len;
+
+		if (ctx->force_only && total_len >= force_len)
+			break;
+	}
+	ringbuffer_dequeue_commit(ctx->rbc, total_len);
+
+	return RINGBUFFER_POLL_OK;
+}
+
+void ringbuffer_dump(struct ringbuffer *rb)
+{
+	struct ringbuffer_consumer *rbc;
+	int i, j;
+
+	printf("---- ringbuffer (%d consumer%s)\n", rb->n_consumers,
+			rb->n_consumers == 1 ? "" : "s");
+
+	for (i = 0; i < rb->size; i++) {
+		bool has_consumer = false;
+		const char *prefix = "";
+
+		if (rb->tail == i)
+			prefix = "tail=>";
+
+		printf("%6s %02x", prefix, rb->buf[i]);
+		for (j = 0; j < rb->n_consumers; j++) {
+			rbc = rb->consumers[j];
+			if (rbc->pos != i)
+				continue;
+			if (!has_consumer)
+				printf(" <=");
+			printf("c[%d],len=%zd ", j, ringbuffer_len(rbc));
+			has_consumer = true;
+		}
+		printf("\n");
+	}
+}
+
diff --git a/test/test-ringbuffer-boundary-poll.c b/test/test-ringbuffer-boundary-poll.c
new file mode 100644
index 0000000..6f6ddc7
--- /dev/null
+++ b/test/test-ringbuffer-boundary-poll.c
@@ -0,0 +1,51 @@
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include "ringbuffer.c"
+#include "ringbuffer-test-utils.c"
+
+void test_boundary_poll(void)
+{
+	uint8_t in_buf[] = { 'a', 'b', 'c', 'd', 'e', 'f' };
+	struct rb_test_ctx _ctx, *ctx = &_ctx;
+	struct ringbuffer *rb;
+	int rc;
+
+	ringbuffer_test_context_init(ctx);
+
+	rb = ringbuffer_init(10);
+
+	ctx->rbc = ringbuffer_consumer_register(rb,
+			ringbuffer_poll_append_all, ctx);
+
+	/* don't consume initial data in the poll callback */
+	ctx->ignore_poll = true;
+
+	/* queue and dequeue, so our tail is non-zero */
+	ringbuffer_queue(rb, in_buf, sizeof(in_buf));
+	ringbuffer_dequeue_commit(ctx->rbc, sizeof(in_buf));
+
+	/* start queueing data */
+	ctx->ignore_poll = false;
+
+	/* ensure we're getting the second batch of data back */
+	in_buf[0] = 'A';
+
+	rc = ringbuffer_queue(rb, in_buf, sizeof(in_buf));
+	assert(!rc);
+
+	assert(ctx->count == 1);
+	assert(ctx->len == sizeof(in_buf));
+	assert(!memcmp(in_buf, ctx->data, ctx->len));
+
+	ringbuffer_fini(rb);
+	ringbuffer_test_context_fini(ctx);
+}
+
+int main(void)
+{
+	test_boundary_poll();
+	return EXIT_SUCCESS;
+}
diff --git a/test/test-ringbuffer-boundary-read.c b/test/test-ringbuffer-boundary-read.c
new file mode 100644
index 0000000..2b982e1
--- /dev/null
+++ b/test/test-ringbuffer-boundary-read.c
@@ -0,0 +1,51 @@
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include "ringbuffer.c"
+#include "ringbuffer-test-utils.c"
+
+void test_boundary_read(void)
+{
+	uint8_t *out_buf, in_buf[] = { 'a', 'b', 'c', 'd', 'e', 'f' };
+	struct ringbuffer_consumer *rbc;
+	struct ringbuffer *rb;
+	size_t len, pos;
+	int rc;
+
+	assert(sizeof(in_buf) * 2 > 10);
+
+	rb = ringbuffer_init(10);
+	rbc = ringbuffer_consumer_register(rb, ringbuffer_poll_nop, NULL);
+
+	/* queue and dequeue, so our tail is non-zero */
+	ringbuffer_queue(rb, in_buf, sizeof(in_buf));
+	ringbuffer_dequeue_commit(rbc, sizeof(in_buf));
+
+	/* ensure we're getting the second batch of data back */
+	in_buf[0] = 'A';
+
+	/* the next queue should cross the end of the buffer */
+	rc = ringbuffer_queue(rb, in_buf, sizeof(in_buf));
+	assert(!rc);
+
+	/* dequeue everything we can */
+	pos = 0;
+	for (;;) {
+		len = ringbuffer_dequeue_peek(rbc, pos, &out_buf);
+		if (len == 0)
+			break;
+		assert(!memcmp(in_buf+pos, out_buf, len));
+		pos += len;
+	}
+	assert(pos == sizeof(in_buf));
+
+	ringbuffer_fini(rb);
+}
+
+int main(void)
+{
+	test_boundary_read();
+	return EXIT_SUCCESS;
+}
diff --git a/test/test-ringbuffer-contained-offset-read.c b/test/test-ringbuffer-contained-offset-read.c
new file mode 100644
index 0000000..5e41c6a
--- /dev/null
+++ b/test/test-ringbuffer-contained-offset-read.c
@@ -0,0 +1,38 @@
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include "ringbuffer.c"
+#include "ringbuffer-test-utils.c"
+
+void test_contained_offset_read(void)
+{
+	uint8_t *out_buf, in_buf[] = { 'a', 'b', 'c' };
+	struct ringbuffer_consumer *rbc;
+	struct ringbuffer *rb;
+	size_t len;
+	int rc, i;
+
+	rb = ringbuffer_init(10);
+	rbc = ringbuffer_consumer_register(rb, ringbuffer_poll_nop, NULL);
+
+	rc = ringbuffer_queue(rb, in_buf, sizeof(in_buf));
+	assert(!rc);
+
+	/* test all possible offsets */
+	for (i = 0; i <= sizeof(in_buf); i++) {
+		len = ringbuffer_dequeue_peek(rbc, i, &out_buf);
+		assert(len == sizeof(in_buf) - i);
+		if (len)
+			assert(!memcmp(in_buf + i, out_buf, len));
+	}
+
+	ringbuffer_fini(rb);
+}
+
+int main(void)
+{
+	test_contained_offset_read();
+	return EXIT_SUCCESS;
+}
diff --git a/test/test-ringbuffer-contained-read.c b/test/test-ringbuffer-contained-read.c
new file mode 100644
index 0000000..37df3cf
--- /dev/null
+++ b/test/test-ringbuffer-contained-read.c
@@ -0,0 +1,34 @@
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include "ringbuffer.c"
+#include "ringbuffer-test-utils.c"
+
+void test_contained_read(void)
+{
+	uint8_t *out_buf, in_buf[] = { 'a', 'b', 'c' };
+	struct ringbuffer_consumer *rbc;
+	struct ringbuffer *rb;
+	size_t len;
+	int rc;
+
+	rb = ringbuffer_init(10);
+	rbc = ringbuffer_consumer_register(rb, ringbuffer_poll_nop, NULL);
+
+	rc = ringbuffer_queue(rb, in_buf, sizeof(in_buf));
+	assert(!rc);
+
+	len = ringbuffer_dequeue_peek(rbc, 0, &out_buf);
+	assert(len == sizeof(in_buf));
+	assert(!memcmp(in_buf, out_buf, sizeof(in_buf)));
+
+	ringbuffer_fini(rb);
+}
+
+int main(void)
+{
+	test_contained_read();
+	return EXIT_SUCCESS;
+}
diff --git a/test/test-ringbuffer-poll-force.c b/test/test-ringbuffer-poll-force.c
new file mode 100644
index 0000000..0993de5
--- /dev/null
+++ b/test/test-ringbuffer-poll-force.c
@@ -0,0 +1,48 @@
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include "ringbuffer.c"
+#include "ringbuffer-test-utils.c"
+
+void test_poll_force(void)
+{
+	uint8_t in_buf[] = { 'a', 'b', 'c', 'd', 'e', 'f', };
+	struct rb_test_ctx _ctx, *ctx = &_ctx;
+	struct ringbuffer *rb;
+	int rc;
+
+	ringbuffer_test_context_init(ctx);
+
+	rb = ringbuffer_init(5);
+
+	ctx->rbc = ringbuffer_consumer_register(rb,
+			ringbuffer_poll_append_all, ctx);
+
+	ctx->force_only = true;
+
+	/* fill the ringbuffer */
+	rc = ringbuffer_queue(rb, in_buf, 4);
+	assert(!rc);
+
+	assert(ctx->count == 0);
+
+	/* add more data */
+	rc = ringbuffer_queue(rb, in_buf + 4, 2);
+	assert(!rc);
+
+	/* we should have had a forced poll for the initial two bytes */
+	assert(ctx->count == 1);
+	assert(ctx->len == 2);
+	assert(!memcmp(in_buf, ctx->data, 2));
+
+	ringbuffer_fini(rb);
+	ringbuffer_test_context_fini(ctx);
+}
+
+int main(void)
+{
+	test_poll_force();
+	return EXIT_SUCCESS;
+}
diff --git a/test/test-ringbuffer-read-commit.c b/test/test-ringbuffer-read-commit.c
new file mode 100644
index 0000000..a4be624
--- /dev/null
+++ b/test/test-ringbuffer-read-commit.c
@@ -0,0 +1,33 @@
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include "ringbuffer.c"
+#include "ringbuffer-test-utils.c"
+
+void test_read_commit(void)
+{
+	uint8_t *out_buf, in_buf[] = { 'a', 'b', 'c', };
+	struct ringbuffer_consumer *rbc;
+	struct ringbuffer *rb;
+	size_t len;
+
+	rb = ringbuffer_init(10);
+	rbc = ringbuffer_consumer_register(rb, ringbuffer_poll_nop, NULL);
+
+	ringbuffer_queue(rb, in_buf, sizeof(in_buf));
+	len = ringbuffer_dequeue_peek(rbc, 0, &out_buf);
+
+	ringbuffer_dequeue_commit(rbc, len);
+	len = ringbuffer_dequeue_peek(rbc, 0, &out_buf);
+	assert(len == 0);
+
+	ringbuffer_fini(rb);
+}
+
+int main(void)
+{
+	test_read_commit();
+	return EXIT_SUCCESS;
+}
diff --git a/test/test-ringbuffer-simple-poll.c b/test/test-ringbuffer-simple-poll.c
new file mode 100644
index 0000000..116fe8c
--- /dev/null
+++ b/test/test-ringbuffer-simple-poll.c
@@ -0,0 +1,38 @@
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include "ringbuffer.c"
+#include "ringbuffer-test-utils.c"
+
+void test_simple_poll(void)
+{
+	uint8_t in_buf[] = { 'a', 'b', 'c' };
+	struct rb_test_ctx _ctx, *ctx;
+	struct ringbuffer *rb;
+	int rc;
+
+	ctx = &_ctx;
+	ringbuffer_test_context_init(ctx);
+
+	rb = ringbuffer_init(10);
+	ctx->rbc = ringbuffer_consumer_register(rb,
+			ringbuffer_poll_append_all, ctx);
+
+	rc = ringbuffer_queue(rb, in_buf, sizeof(in_buf));
+	assert(!rc);
+
+	assert(ctx->count == 1);
+	assert(ctx->len == sizeof(in_buf));
+	assert(!memcmp(in_buf, ctx->data, ctx->len));
+
+	ringbuffer_fini(rb);
+	ringbuffer_test_context_fini(ctx);
+}
+
+int main(void)
+{
+	test_simple_poll();
+	return EXIT_SUCCESS;
+}