server: Use ringbuffer for socket backlog
Currently, the socket handler uses a linear buffer for the backlog data;
this means we need to shift up to 128kB of data after each socket
write().
This change introduces a single-producer-multiple-consumer ringbuffer,
to avoid the need for memmove()ing data around; we can simply update
pointers instead of shifting data.
We add this as a new file (ringbuffer.c), to make it a little more
modular. To mitigate the risk of subtle pointer arithmetic issues, we
add a set of tests too.
Change-Id: Ib7c5151d3cf1f588436f5461000b6fed22d0681c
Signed-off-by: Jeremy Kerr <jk@ozlabs.org>
diff --git a/Makefile.am b/Makefile.am
index e07d4ad..96fa9fb 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -11,6 +11,7 @@
console-server.c \
console-server.h \
util.c \
+ ringbuffer.c \
config.c \
log-handler.c \
socket-handler.c \
@@ -23,4 +24,4 @@
console-socket.c \
util.c
-#SUBDIRS = test
+SUBDIRS = test
diff --git a/configure.ac b/configure.ac
index 04bf36a..a306b31 100644
--- a/configure.ac
+++ b/configure.ac
@@ -44,5 +44,5 @@
)
# Create configured output
-AC_CONFIG_FILES([Makefile])
+AC_CONFIG_FILES([Makefile test/Makefile])
AC_OUTPUT
diff --git a/console-server.h b/console-server.h
index 2658e89..4d0d5b4 100644
--- a/console-server.h
+++ b/console-server.h
@@ -67,6 +67,33 @@
void console_unregister_poller(struct console *console, struct poller *poller);
+/* ringbuffer API */
+enum ringbuffer_poll_ret {
+ RINGBUFFER_POLL_OK = 0,
+ RINGBUFFER_POLL_REMOVE,
+};
+
+typedef enum ringbuffer_poll_ret (*ringbuffer_poll_fn_t)(void *data,
+ size_t force_len);
+
+struct ringbuffer;
+struct ringbuffer_consumer;
+
+struct ringbuffer *ringbuffer_init(size_t size);
+void ringbuffer_fini(struct ringbuffer *rb);
+
+struct ringbuffer_consumer *ringbuffer_consumer_register(struct ringbuffer *rb,
+ ringbuffer_poll_fn_t poll_fn, void *data);
+
+void ringbuffer_consumer_unregister(struct ringbuffer_consumer *rbc);
+
+int ringbuffer_queue(struct ringbuffer *rb, uint8_t *data, size_t len);
+
+size_t ringbuffer_dequeue_peek(struct ringbuffer_consumer *rbc, size_t offset,
+ uint8_t **data);
+
+int ringbuffer_dequeue_commit(struct ringbuffer_consumer *rbc, size_t len);
+
/* config API */
struct config;
const char *config_get_value(struct config *config, const char *name);
diff --git a/ringbuffer.c b/ringbuffer.c
new file mode 100644
index 0000000..3edab0d
--- /dev/null
+++ b/ringbuffer.c
@@ -0,0 +1,220 @@
+/**
+ * Copyright © 2017 IBM Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define _GNU_SOURCE
+
+#include <assert.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "console-server.h"
+
+#define min(a,b) ({ \
+ const typeof(a) _a = (a); \
+ const typeof(b) _b = (b); \
+ _a < _b ? _a : _b; \
+ })
+
+struct ringbuffer {
+ uint8_t *buf;
+ size_t size;
+ size_t tail;
+ struct ringbuffer_consumer **consumers;
+ int n_consumers;
+};
+
+struct ringbuffer_consumer {
+ struct ringbuffer *rb;
+ ringbuffer_poll_fn_t poll_fn;
+ void *poll_data;
+ size_t pos;
+};
+
+struct ringbuffer *ringbuffer_init(size_t size)
+{
+ struct ringbuffer *rb;
+
+ rb = malloc(sizeof(*rb) + size);
+ if (!rb)
+ return NULL;
+
+ memset(rb, 0, sizeof(*rb));
+ rb->size = size;
+ rb->buf = (void *)(rb + 1);
+
+ return rb;
+}
+
+void ringbuffer_fini(struct ringbuffer *rb)
+{
+ while (rb->n_consumers)
+ ringbuffer_consumer_unregister(rb->consumers[0]);
+ free(rb);
+}
+
+struct ringbuffer_consumer *ringbuffer_consumer_register(struct ringbuffer *rb,
+ ringbuffer_poll_fn_t fn, void *data)
+{
+ struct ringbuffer_consumer *rbc;
+ int n;
+
+ rbc = malloc(sizeof(*rbc));
+ rbc->rb = rb;
+ rbc->poll_fn = fn;
+ rbc->poll_data = data;
+ rbc->pos = rb->tail;
+
+ n = rb->n_consumers++;
+ rb->consumers = realloc(rb->consumers,
+ sizeof(*rb->consumers) * rb->n_consumers);
+ rb->consumers[n] = rbc;
+
+ return rbc;
+}
+
+void ringbuffer_consumer_unregister(struct ringbuffer_consumer *rbc)
+{
+ struct ringbuffer *rb = rbc->rb;
+ int i;
+
+ for (i = 0; i < rb->n_consumers; i++)
+ if (rb->consumers[i] == rbc)
+ break;
+
+ assert(i < rb->n_consumers);
+
+ rb->n_consumers--;
+
+ memmove(&rb->consumers[i], &rb->consumers[i+1],
+ sizeof(*rb->consumers) * (rb->n_consumers - i));
+
+ rb->consumers = realloc(rb->consumers,
+ sizeof(*rb->consumers) * rb->n_consumers);
+
+ free(rbc);
+}
+
+static size_t ringbuffer_len(struct ringbuffer_consumer *rbc)
+{
+ if (rbc->pos <= rbc->rb->tail)
+ return rbc->rb->tail - rbc->pos;
+ else
+ return rbc->rb->tail + rbc->rb->size - rbc->pos;
+}
+
+static size_t ringbuffer_space(struct ringbuffer_consumer *rbc)
+{
+ return rbc->rb->size - ringbuffer_len(rbc) - 1;
+}
+
+static int ringbuffer_consumer_ensure_space(
+ struct ringbuffer_consumer *rbc, size_t len)
+{
+ enum ringbuffer_poll_ret prc;
+ int force_len;
+
+ if (ringbuffer_space(rbc) >= len)
+ return 0;
+
+ force_len = len - ringbuffer_space(rbc);
+
+ prc = rbc->poll_fn(rbc->poll_data, force_len);
+ if (prc != RINGBUFFER_POLL_OK)
+ return -1;
+
+ return 0;
+}
+
+int ringbuffer_queue(struct ringbuffer *rb, uint8_t *data, size_t len)
+{
+ struct ringbuffer_consumer *rbc;
+ size_t wlen;
+ int i, rc;
+
+ if (len >= rb->size)
+ return -1;
+
+ /* Ensure there is at least len bytes of space available.
+ *
+ * If a client doesn't have sufficient space, perform a blocking write
+ * (by calling ->poll_fn with force_len) to create it.
+ */
+ for (i = 0; i < rb->n_consumers; i++) {
+ rbc = rb->consumers[i];
+
+ rc = ringbuffer_consumer_ensure_space(rbc, len);
+ if (rc) {
+ ringbuffer_consumer_unregister(rbc);
+ i--;
+ continue;
+ }
+
+ assert(ringbuffer_space(rbc) >= len);
+ }
+
+ /* Now that we know we have enough space, add new data to tail */
+ wlen = min(len, rb->size - rb->tail);
+ memcpy(rb->buf + rb->tail, data, wlen);
+ rb->tail = (rb->tail + wlen) % rb->size;
+ len -= wlen;
+ data += wlen;
+
+ memcpy(rb->buf, data, len);
+ rb->tail += len;
+
+ /* Inform consumers of new data in non-blocking mode, by calling
+ * ->poll_fn with 0 force_len */
+ for (i = 0; i < rb->n_consumers; i++) {
+ enum ringbuffer_poll_ret prc;
+
+ rbc = rb->consumers[i];
+ prc = rbc->poll_fn(rbc->poll_data, 0);
+ if (prc == RINGBUFFER_POLL_REMOVE) {
+ ringbuffer_consumer_unregister(rbc);
+ i--;
+ }
+ }
+
+ return 0;
+}
+
+size_t ringbuffer_dequeue_peek(struct ringbuffer_consumer *rbc, size_t offset,
+ uint8_t **data)
+{
+ struct ringbuffer *rb = rbc->rb;
+ size_t pos;
+ size_t len;
+
+ if (offset >= ringbuffer_len(rbc))
+ return 0;
+
+ pos = (rbc->pos + offset) % rb->size;
+ if (pos <= rb->tail)
+ len = rb->tail - pos;
+ else
+ len = rb->size - pos;
+
+ *data = rb->buf + pos;
+ return len;
+}
+
+int ringbuffer_dequeue_commit(struct ringbuffer_consumer *rbc, size_t len)
+{
+ assert(len <= ringbuffer_len(rbc));
+ rbc->pos = (rbc->pos + len) % rbc->rb->size;
+ return 0;
+}
diff --git a/socket-handler.c b/socket-handler.c
index a0b3028..11f183c 100644
--- a/socket-handler.c
+++ b/socket-handler.c
@@ -32,31 +32,24 @@
#include "console-server.h"
-#define min(a,b) ({ \
- const typeof(a) _a = (a); \
- const typeof(b) _b = (b); \
- _a < _b ? _a : _b; \
- })
-
const size_t buffer_size = 128 * 1024;
struct client {
- struct poller *poller;
- int fd;
- size_t buf_pos;
+ struct socket_handler *sh;
+ struct poller *poller;
+ struct ringbuffer_consumer *rbc;
+ int fd;
};
struct socket_handler {
- struct handler handler;
- struct console *console;
- struct poller *poller;
- int sd;
+ struct handler handler;
+ struct console *console;
+ struct poller *poller;
+ struct ringbuffer *ringbuffer;
+ int sd;
- uint8_t *buf;
- size_t buf_len;
-
- struct client **clients;
- int n_clients;
+ struct client **clients;
+ int n_clients;
};
static struct socket_handler *to_socket_handler(struct handler *handler)
@@ -64,14 +57,18 @@
return container_of(handler, struct socket_handler, handler);
}
-static void client_close(struct socket_handler *sh, struct client *client)
+static void client_close(struct client *client)
{
+ struct socket_handler *sh = client->sh;
int idx;
close(client->fd);
if (client->poller)
console_unregister_poller(sh->console, client->poller);
+ if (client->rbc)
+ ringbuffer_consumer_unregister(client->rbc);
+
for (idx = 0; idx < sh->n_clients; idx++)
if (sh->clients[idx] == client)
break;
@@ -88,18 +85,6 @@
sizeof(*sh->clients) * sh->n_clients);
}
-static size_t client_buffer_len(struct socket_handler *sh,
- struct client *client)
-{
- return sh->buf_len - client->buf_pos;
-}
-
-static void *client_buffer_data(struct socket_handler *sh,
- struct client *client)
-{
- return sh->buf + client->buf_pos;
-}
-
static ssize_t send_all(int fd, void *buf, size_t len, bool block)
{
int rc, flags;
@@ -130,37 +115,58 @@
/* Drain the queue to the socket and update the queue buffer. If force_len is
* set, send at least that many bytes from the queue, possibly while blocking
*/
-static int client_drain_queue(struct socket_handler *sh,
- struct client *client, size_t force_len)
+static int client_drain_queue(struct client *client, size_t force_len)
{
+ uint8_t *buf;
ssize_t wlen;
- size_t len;
+ size_t len, total_len;
bool block;
- len = client_buffer_len(sh, client);
- if (!len)
- return 0;
+ total_len = 0;
+ wlen = 0;
+ block = !!force_len;
- block = false;
- if (force_len) {
- assert(force_len <= len);
- block = true;
- len = force_len;
+ for (;;) {
+ len = ringbuffer_dequeue_peek(client->rbc, total_len, &buf);
+ if (!len)
+ break;
+
+ wlen = send_all(client->fd, buf, len, block);
+ if (wlen <= 0)
+ break;
+
+ total_len += wlen;
+
+ if (force_len && total_len >= force_len)
+ break;
}
- wlen = send_all(client->fd, client_buffer_data(sh, client), len, block);
if (wlen < 0)
return -1;
- if (force_len && wlen < force_len)
+ if (force_len && total_len < force_len)
return -1;
- client->buf_pos += wlen;
- assert(client->buf_pos <= sh->buf_len);
-
+ ringbuffer_dequeue_commit(client->rbc, total_len);
return 0;
}
+static enum ringbuffer_poll_ret client_ringbuffer_poll(void *arg,
+ size_t force_len)
+{
+ struct client *client = arg;
+ int rc;
+
+ rc = client_drain_queue(client, force_len);
+ if (rc) {
+ client->rbc = NULL;
+ client_close(client);
+ return RINGBUFFER_POLL_REMOVE;
+ }
+
+ return RINGBUFFER_POLL_OK;
+}
+
static enum poller_ret client_poll(struct handler *handler,
int events, void *data)
{
@@ -184,7 +190,7 @@
}
if (events & POLLOUT) {
- rc = client_drain_queue(sh, client, 0);
+ rc = client_drain_queue(client, 0);
if (rc)
goto err_close;
}
@@ -193,7 +199,7 @@
err_close:
client->poller = NULL;
- client_close(sh, client);
+ client_close(client);
return POLLER_REMOVE;
}
@@ -214,9 +220,12 @@
client = malloc(sizeof(*client));
memset(client, 0, sizeof(*client));
+ client->sh = sh;
client->fd = fd;
client->poller = console_register_poller(sh->console, handler,
client_poll, client->fd, POLLIN, client);
+ client->rbc = ringbuffer_consumer_register(sh->ringbuffer,
+ client_ringbuffer_poll, client);
n = sh->n_clients++;
sh->clients = realloc(sh->clients,
@@ -237,11 +246,10 @@
sh->console = console;
sh->clients = NULL;
sh->n_clients = 0;
- sh->buf_len = 0;
- sh->buf = malloc(buffer_size);
- if (!sh->buf) {
- warn("Can't allocate backlog buffer");
+ sh->ringbuffer = ringbuffer_init(buffer_size);
+ if (!sh->ringbuffer) {
+ warn("Can't allocate backlog ring buffer");
return -1;
}
@@ -277,78 +285,7 @@
static int socket_data(struct handler *handler, uint8_t *buf, size_t len)
{
struct socket_handler *sh = to_socket_handler(handler);
- struct client *client;
- size_t space, min_pos;
- int i, rc;
-
- space = buffer_size - sh->buf_len;
- min_pos = sh->buf_len;
-
- /* Ensure there is at least len bytes available in the global buffer.
- *
- * The 'space' var tells us how many bytes are available at the tail of
- * the buffer. However, if all clients have a non-zero buf_pos, then we
- * can drop bytes from the beginning (in the memmove below). So, we
- * account for each clients' buf_pos in the space check.
- *
- * If a client doesn't have sufficient space, perform a blocking write
- * to create it. This will result in incrementing client->buf_pos to
- * create space.
- */
- for (i = 0; i < sh->n_clients; i++) {
- ssize_t client_space;
-
- client = sh->clients[i];
- client_space = space + client->buf_pos;
-
- if (len > client_space) {
- /* Blocking send enough to create len bytes of space in
- * the global buffer. On success, this will increment
- * client->buf_pos by the number of bytes written
- */
- rc = client_drain_queue(sh, client, len - client_space);
- if (rc) {
- client_close(sh, client);
- i--;
- continue;
- }
- }
-
- min_pos = min(min_pos, client->buf_pos);
- }
-
- /* avoid pointless copying */
- if (!sh->n_clients) {
- sh->buf_len = 0;
- return 0;
- }
-
- /* drop unneeded buffer data... */
- sh->buf_len -= min_pos;
- memmove(sh->buf, sh->buf + min_pos, sh->buf_len);
-
- /* ... and add new data */
- memcpy(sh->buf + sh->buf_len, buf, len);
- sh->buf_len += len;
-
- /* now that the queue contains the new data, perform non-blocking send
- * to all clients */
- for (i = 0; i < sh->n_clients; i++) {
- client = sh->clients[i];
-
- /* We've dropped data in the global buffer, so need to update
- * clients' pos pointers to suit the new start of buffer
- * data */
- client->buf_pos -= min_pos;
- assert(client->buf_pos >= 0);
-
- rc = client_drain_queue(sh, client, 0);
- if (rc) {
- client_close(sh, client);
- i--;
- continue;
- }
- }
+ ringbuffer_queue(sh->ringbuffer, buf, len);
return 0;
}
@@ -357,13 +294,15 @@
struct socket_handler *sh = to_socket_handler(handler);
while (sh->n_clients)
- client_close(sh, sh->clients[0]);
+ client_close(sh->clients[0]);
if (sh->poller)
console_unregister_poller(sh->console, sh->poller);
+ if (sh->ringbuffer)
+ ringbuffer_fini(sh->ringbuffer);
+
close(sh->sd);
- free(sh->buf);
}
static struct socket_handler socket_handler = {
diff --git a/test/Makefile.am b/test/Makefile.am
index 45be362..e8c25d0 100644
--- a/test/Makefile.am
+++ b/test/Makefile.am
@@ -3,10 +3,15 @@
# Run all 'check' test programs
TESTS = $(check_PROGRAMS)
-#Build/add config-test to test suite
-check_PROGRAMS += config-test
-config_test_CPPFLAGS = -Igtest $(GTEST_CPPFLAGS) \
- -DCONFIG_TEST -DSYSCONFDIR=\"\"
-config_test_LDFLAGS = -lgtest_main -lgtest $(PTHREAD_LIBS) $(OESDK_TESTCASE_FLAGS)
-config_test_SOURCES = config.c
-config_test_LDADD = $(top_builddir)/config.o
+check_PROGRAMS += \
+ test-ringbuffer-contained-read \
+ test-ringbuffer-contained-offset-read \
+ test-ringbuffer-read-commit \
+ test-ringbuffer-boundary-read \
+ test-ringbuffer-simple-poll \
+ test-ringbuffer-boundary-poll \
+ test-ringbuffer-poll-force
+
+EXTRA_DIST = ringbuffer-test-utils.c
+
+AM_CPPFLAGS = -I$(top_srcdir)
diff --git a/test/ringbuffer-test-utils.c b/test/ringbuffer-test-utils.c
new file mode 100644
index 0000000..c489f9d
--- /dev/null
+++ b/test/ringbuffer-test-utils.c
@@ -0,0 +1,97 @@
+
+struct rb_test_ctx {
+ struct ringbuffer_consumer *rbc;
+ bool ignore_poll;
+ bool force_only;
+ int count;
+ uint8_t *data;
+ int len;
+};
+
+void ringbuffer_test_context_init(struct rb_test_ctx *ctx)
+{
+ ctx->count = 0;
+ ctx->data = NULL;
+ ctx->len = 0;
+ ctx->ignore_poll = false;
+ ctx->force_only = false;
+}
+
+void ringbuffer_test_context_fini(struct rb_test_ctx *ctx)
+{
+ free(ctx->data);
+}
+
+enum ringbuffer_poll_ret ringbuffer_poll_nop(
+ void *data __attribute__((unused)),
+ size_t force_len __attribute__((unused)))
+{
+ return RINGBUFFER_POLL_OK;
+}
+
+enum ringbuffer_poll_ret ringbuffer_poll_append_all(void *data,
+ size_t force_len)
+{
+ struct rb_test_ctx *ctx = data;
+ size_t len, total_len;
+ uint8_t *buf;
+
+ if (ctx->ignore_poll)
+ return RINGBUFFER_POLL_OK;
+
+ if (ctx->force_only && !force_len)
+ return RINGBUFFER_POLL_OK;
+
+ ctx->count++;
+
+ total_len = 0;
+ for (;;) {
+ len = ringbuffer_dequeue_peek(ctx->rbc, total_len, &buf);
+ if (!len)
+ break;
+
+ if (ctx->force_only && total_len + len > force_len)
+ len = force_len - total_len;
+
+ ctx->data = realloc(ctx->data, ctx->len + len);
+ memcpy(ctx->data + ctx->len, buf, len);
+ ctx->len += len;
+ total_len += len;
+
+ if (ctx->force_only && total_len >= force_len)
+ break;
+ }
+ ringbuffer_dequeue_commit(ctx->rbc, total_len);
+
+ return RINGBUFFER_POLL_OK;
+}
+
+void ringbuffer_dump(struct ringbuffer *rb)
+{
+ struct ringbuffer_consumer *rbc;
+ int i, j;
+
+ printf("---- ringbuffer (%d consumer%s)\n", rb->n_consumers,
+ rb->n_consumers == 1 ? "" : "s");
+
+ for (i = 0; i < rb->size; i++) {
+ bool has_consumer = false;
+ const char *prefix = "";
+
+ if (rb->tail == i)
+ prefix = "tail=>";
+
+ printf("%6s %02x", prefix, rb->buf[i]);
+ for (j = 0; j < rb->n_consumers; j++) {
+ rbc = rb->consumers[j];
+ if (rbc->pos != i)
+ continue;
+ if (!has_consumer)
+ printf(" <=");
+ printf("c[%d],len=%zd ", j, ringbuffer_len(rbc));
+ has_consumer = true;
+ }
+ printf("\n");
+ }
+}
+
diff --git a/test/test-ringbuffer-boundary-poll.c b/test/test-ringbuffer-boundary-poll.c
new file mode 100644
index 0000000..6f6ddc7
--- /dev/null
+++ b/test/test-ringbuffer-boundary-poll.c
@@ -0,0 +1,51 @@
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include "ringbuffer.c"
+#include "ringbuffer-test-utils.c"
+
+void test_boundary_poll(void)
+{
+ uint8_t in_buf[] = { 'a', 'b', 'c', 'd', 'e', 'f' };
+ struct rb_test_ctx _ctx, *ctx = &_ctx;
+ struct ringbuffer *rb;
+ int rc;
+
+ ringbuffer_test_context_init(ctx);
+
+ rb = ringbuffer_init(10);
+
+ ctx->rbc = ringbuffer_consumer_register(rb,
+ ringbuffer_poll_append_all, ctx);
+
+ /* don't consume initial data in the poll callback */
+ ctx->ignore_poll = true;
+
+ /* queue and dequeue, so our tail is non-zero */
+ ringbuffer_queue(rb, in_buf, sizeof(in_buf));
+ ringbuffer_dequeue_commit(ctx->rbc, sizeof(in_buf));
+
+ /* start queueing data */
+ ctx->ignore_poll = false;
+
+ /* ensure we're getting the second batch of data back */
+ in_buf[0] = 'A';
+
+ rc = ringbuffer_queue(rb, in_buf, sizeof(in_buf));
+ assert(!rc);
+
+ assert(ctx->count == 1);
+ assert(ctx->len == sizeof(in_buf));
+ assert(!memcmp(in_buf, ctx->data, ctx->len));
+
+ ringbuffer_fini(rb);
+ ringbuffer_test_context_fini(ctx);
+}
+
+int main(void)
+{
+ test_boundary_poll();
+ return EXIT_SUCCESS;
+}
diff --git a/test/test-ringbuffer-boundary-read.c b/test/test-ringbuffer-boundary-read.c
new file mode 100644
index 0000000..2b982e1
--- /dev/null
+++ b/test/test-ringbuffer-boundary-read.c
@@ -0,0 +1,51 @@
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include "ringbuffer.c"
+#include "ringbuffer-test-utils.c"
+
+void test_boundary_read(void)
+{
+ uint8_t *out_buf, in_buf[] = { 'a', 'b', 'c', 'd', 'e', 'f' };
+ struct ringbuffer_consumer *rbc;
+ struct ringbuffer *rb;
+ size_t len, pos;
+ int rc;
+
+ assert(sizeof(in_buf) * 2 > 10);
+
+ rb = ringbuffer_init(10);
+ rbc = ringbuffer_consumer_register(rb, ringbuffer_poll_nop, NULL);
+
+ /* queue and dequeue, so our tail is non-zero */
+ ringbuffer_queue(rb, in_buf, sizeof(in_buf));
+ ringbuffer_dequeue_commit(rbc, sizeof(in_buf));
+
+ /* ensure we're getting the second batch of data back */
+ in_buf[0] = 'A';
+
+ /* the next queue should cross the end of the buffer */
+ rc = ringbuffer_queue(rb, in_buf, sizeof(in_buf));
+ assert(!rc);
+
+ /* dequeue everything we can */
+ pos = 0;
+ for (;;) {
+ len = ringbuffer_dequeue_peek(rbc, pos, &out_buf);
+ if (len == 0)
+ break;
+ assert(!memcmp(in_buf+pos, out_buf, len));
+ pos += len;
+ }
+ assert(pos == sizeof(in_buf));
+
+ ringbuffer_fini(rb);
+}
+
+int main(void)
+{
+ test_boundary_read();
+ return EXIT_SUCCESS;
+}
diff --git a/test/test-ringbuffer-contained-offset-read.c b/test/test-ringbuffer-contained-offset-read.c
new file mode 100644
index 0000000..5e41c6a
--- /dev/null
+++ b/test/test-ringbuffer-contained-offset-read.c
@@ -0,0 +1,38 @@
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include "ringbuffer.c"
+#include "ringbuffer-test-utils.c"
+
+void test_contained_offset_read(void)
+{
+ uint8_t *out_buf, in_buf[] = { 'a', 'b', 'c' };
+ struct ringbuffer_consumer *rbc;
+ struct ringbuffer *rb;
+ size_t len;
+ int rc, i;
+
+ rb = ringbuffer_init(10);
+ rbc = ringbuffer_consumer_register(rb, ringbuffer_poll_nop, NULL);
+
+ rc = ringbuffer_queue(rb, in_buf, sizeof(in_buf));
+ assert(!rc);
+
+ /* test all possible offsets */
+ for (i = 0; i <= sizeof(in_buf); i++) {
+ len = ringbuffer_dequeue_peek(rbc, i, &out_buf);
+ assert(len == sizeof(in_buf) - i);
+ if (len)
+ assert(!memcmp(in_buf + i, out_buf, len));
+ }
+
+ ringbuffer_fini(rb);
+}
+
+int main(void)
+{
+ test_contained_offset_read();
+ return EXIT_SUCCESS;
+}
diff --git a/test/test-ringbuffer-contained-read.c b/test/test-ringbuffer-contained-read.c
new file mode 100644
index 0000000..37df3cf
--- /dev/null
+++ b/test/test-ringbuffer-contained-read.c
@@ -0,0 +1,34 @@
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include "ringbuffer.c"
+#include "ringbuffer-test-utils.c"
+
+void test_contained_read(void)
+{
+ uint8_t *out_buf, in_buf[] = { 'a', 'b', 'c' };
+ struct ringbuffer_consumer *rbc;
+ struct ringbuffer *rb;
+ size_t len;
+ int rc;
+
+ rb = ringbuffer_init(10);
+ rbc = ringbuffer_consumer_register(rb, ringbuffer_poll_nop, NULL);
+
+ rc = ringbuffer_queue(rb, in_buf, sizeof(in_buf));
+ assert(!rc);
+
+ len = ringbuffer_dequeue_peek(rbc, 0, &out_buf);
+ assert(len == sizeof(in_buf));
+ assert(!memcmp(in_buf, out_buf, sizeof(in_buf)));
+
+ ringbuffer_fini(rb);
+}
+
+int main(void)
+{
+ test_contained_read();
+ return EXIT_SUCCESS;
+}
diff --git a/test/test-ringbuffer-poll-force.c b/test/test-ringbuffer-poll-force.c
new file mode 100644
index 0000000..0993de5
--- /dev/null
+++ b/test/test-ringbuffer-poll-force.c
@@ -0,0 +1,48 @@
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include "ringbuffer.c"
+#include "ringbuffer-test-utils.c"
+
+void test_poll_force(void)
+{
+ uint8_t in_buf[] = { 'a', 'b', 'c', 'd', 'e', 'f', };
+ struct rb_test_ctx _ctx, *ctx = &_ctx;
+ struct ringbuffer *rb;
+ int rc;
+
+ ringbuffer_test_context_init(ctx);
+
+ rb = ringbuffer_init(5);
+
+ ctx->rbc = ringbuffer_consumer_register(rb,
+ ringbuffer_poll_append_all, ctx);
+
+ ctx->force_only = true;
+
+ /* fill the ringbuffer */
+ rc = ringbuffer_queue(rb, in_buf, 4);
+ assert(!rc);
+
+ assert(ctx->count == 0);
+
+ /* add more data */
+ rc = ringbuffer_queue(rb, in_buf + 4, 2);
+ assert(!rc);
+
+ /* we should have had a forced poll for the initial two bytes */
+ assert(ctx->count == 1);
+ assert(ctx->len == 2);
+ assert(!memcmp(in_buf, ctx->data, 2));
+
+ ringbuffer_fini(rb);
+ ringbuffer_test_context_fini(ctx);
+}
+
+int main(void)
+{
+ test_poll_force();
+ return EXIT_SUCCESS;
+}
diff --git a/test/test-ringbuffer-read-commit.c b/test/test-ringbuffer-read-commit.c
new file mode 100644
index 0000000..a4be624
--- /dev/null
+++ b/test/test-ringbuffer-read-commit.c
@@ -0,0 +1,33 @@
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include "ringbuffer.c"
+#include "ringbuffer-test-utils.c"
+
+void test_read_commit(void)
+{
+ uint8_t *out_buf, in_buf[] = { 'a', 'b', 'c', };
+ struct ringbuffer_consumer *rbc;
+ struct ringbuffer *rb;
+ size_t len;
+
+ rb = ringbuffer_init(10);
+ rbc = ringbuffer_consumer_register(rb, ringbuffer_poll_nop, NULL);
+
+ ringbuffer_queue(rb, in_buf, sizeof(in_buf));
+ len = ringbuffer_dequeue_peek(rbc, 0, &out_buf);
+
+ ringbuffer_dequeue_commit(rbc, len);
+ len = ringbuffer_dequeue_peek(rbc, 0, &out_buf);
+ assert(len == 0);
+
+ ringbuffer_fini(rb);
+}
+
+int main(void)
+{
+ test_read_commit();
+ return EXIT_SUCCESS;
+}
diff --git a/test/test-ringbuffer-simple-poll.c b/test/test-ringbuffer-simple-poll.c
new file mode 100644
index 0000000..116fe8c
--- /dev/null
+++ b/test/test-ringbuffer-simple-poll.c
@@ -0,0 +1,38 @@
+
+#include <stdlib.h>
+#include <stdint.h>
+#include <stdio.h>
+
+#include "ringbuffer.c"
+#include "ringbuffer-test-utils.c"
+
+void test_simple_poll(void)
+{
+ uint8_t in_buf[] = { 'a', 'b', 'c' };
+ struct rb_test_ctx _ctx, *ctx;
+ struct ringbuffer *rb;
+ int rc;
+
+ ctx = &_ctx;
+ ringbuffer_test_context_init(ctx);
+
+ rb = ringbuffer_init(10);
+ ctx->rbc = ringbuffer_consumer_register(rb,
+ ringbuffer_poll_append_all, ctx);
+
+ rc = ringbuffer_queue(rb, in_buf, sizeof(in_buf));
+ assert(!rc);
+
+ assert(ctx->count == 1);
+ assert(ctx->len == sizeof(in_buf));
+ assert(!memcmp(in_buf, ctx->data, ctx->len));
+
+ ringbuffer_fini(rb);
+ ringbuffer_test_context_fini(ctx);
+}
+
+int main(void)
+{
+ test_simple_poll();
+ return EXIT_SUCCESS;
+}