Use non-blocking writes for clients

We may have console clients that are remote, and so the write() to those
clients may block.

Since we're single-threaded, that block will delay output to all other
clients (and read from the UART).

This change uses nonblocking IO for the client sockets, and a small
buffer to handle writes that would have blocked.

Signed-off-by: Jeremy Kerr <jk@ozlabs.org>
diff --git a/socket-handler.c b/socket-handler.c
index 2e4f5c2..7307ffe 100644
--- a/socket-handler.c
+++ b/socket-handler.c
@@ -14,9 +14,11 @@
  * limitations under the License.
  */
 
+#define _GNU_SOURCE
 
 #include <assert.h>
 #include <err.h>
+#include <errno.h>
 #include <stdbool.h>
 #include <stdio.h>
 #include <stdlib.h>
@@ -30,9 +32,14 @@
 
 #include "console-server.h"
 
+const size_t buffer_size_max = 100 * 1024;
+
 struct client {
 	struct poller	*poller;
 	int		fd;
+	uint8_t		*buf;
+	size_t		buf_alloc;
+	size_t		buf_len;
 };
 
 struct socket_handler {
@@ -74,37 +81,105 @@
 			sizeof(*sh->clients) * sh->n_clients);
 }
 
+/* Write data to the client, until error or block.
+ *
+ * Returns -1 on hard failure, otherwise number of bytes written. A zero
+ * return indicates that no bytes were written due to potential block,
+ * but isn't a failure
+ */
+static ssize_t client_write_data(struct client *client, uint8_t *buf,
+		size_t len)
+{
+	size_t pos;
+	ssize_t rc;
+
+	for (pos = 0; pos < len; pos += rc) {
+		rc = write(client->fd, buf + pos, len - pos);
+		if (rc < 0) {
+			if (errno == EAGAIN || errno == EWOULDBLOCK)
+				break;
+
+			if (errno == EINTR)
+				continue;
+
+			return -1;
+		}
+		if (rc == 0)
+			return -1;
+	}
+
+	return pos;
+}
+
 static enum poller_ret client_poll(struct handler *handler,
 		int events, void *data)
 {
 	struct socket_handler *sh = to_socket_handler(handler);
 	struct client *client = data;
 	uint8_t buf[4096];
+	ssize_t len;
 	int rc;
 
-	if (!(events & POLLIN))
-		return POLLER_OK;
+	if (events & POLLIN) {
+		rc = read(client->fd, buf, sizeof(buf));
+		if (rc <= 0)
+			goto err_close;
 
-	rc = read(client->fd, buf, sizeof(buf));
-	if (rc <= 0) {
-		client->poller = NULL;
-		client_close(sh, client);
-		return POLLER_REMOVE;
+		console_data_out(sh->console, buf, rc);
 	}
 
-	console_data_out(sh->console, buf, rc);
+	if (events & POLLOUT) {
+		len = client_write_data(client, client->buf, client->buf_len);
+		if (len < 0)
+			goto err_close;
+
+		/* consume from the queue */
+		client->buf_len -= len;
+		memmove(client->buf, client->buf + len,
+				client->buf_len);
+	}
 
 	return POLLER_OK;
+
+err_close:
+	client->poller = NULL;
+	client_close(sh, client);
+	return POLLER_REMOVE;
 }
 
-static void client_send_data(struct socket_handler *sh,
-		struct client *client, uint8_t *buf, size_t len)
+static int client_queue_data(struct client *client, uint8_t *buf, size_t len)
 {
-	int rc;
+	if (client->buf_len + len > client->buf_alloc) {
+		if (!client->buf_alloc)
+			client->buf_alloc = 2048;
+		client->buf_alloc *= 2;
 
-	rc = write_buf_to_fd(client->fd, buf, len);
-	if (rc)
-		client_close(sh, client);
+		if (client->buf_alloc > buffer_size_max)
+			return -1;
+
+		client->buf = realloc(client->buf, client->buf_alloc);
+	}
+
+	memcpy(client->buf + client->buf_len, buf, len);
+	client->buf_len += len;
+	return 0;
+}
+
+static int client_send_or_queue(struct client *client, uint8_t *buf, size_t len)
+{
+	ssize_t rc;
+
+	rc = client_write_data(client, buf, len);
+	if (rc < 0)
+		return -1;
+
+	if ((size_t)rc < len) {
+		rc = client_queue_data(client, buf + rc, len - rc);
+		if (rc)
+			return -1;
+	}
+
+	return 0;
 }
 
 static enum poller_ret socket_poll(struct handler *handler,
@@ -117,7 +192,7 @@
 	if (!(events & POLLIN))
 		return POLLER_OK;
 
-	fd = accept(sh->sd, NULL, NULL);
+	fd = accept4(sh->sd, NULL, NULL, SOCK_NONBLOCK);
 	if (fd < 0)
 		return POLLER_OK;
 
@@ -180,11 +255,20 @@
 static int socket_data(struct handler *handler, uint8_t *buf, size_t len)
 {
 	struct socket_handler *sh = to_socket_handler(handler);
-	int i;
+	int i, rc;
 
 	for (i = 0; i < sh->n_clients; i++) {
 		struct client *client = sh->clients[i];
-		client_send_data(sh, client, buf, len);
+		rc = client_send_or_queue(client, buf, len);
+		if (!rc)
+			continue;
+
+		/* if we failed to send data, close the client. This will
+		 * remove it from the clients array, so skip back to the item
+		 * that has taken its place
+		 */
+		client_close(sh, client);
+		i--;
 	}
 	return 0;
 }