core: implement packetisation and reassembly
Add a naive implementation of packetisation and reassembly of MCTP
messages. We use a (unbounded!) message buffer for reception, and a
queue of messages for transmission.
Signed-off-by: Jeremy Kerr <jk@ozlabs.org>
diff --git a/core.c b/core.c
index 575d14f..0a5a135 100644
--- a/core.c
+++ b/core.c
@@ -24,15 +24,30 @@
/* todo: routing */
};
+struct mctp_msg_ctx {
+ uint8_t src;
+ uint8_t tag;
+ uint8_t last_seq;
+ void *buf;
+ size_t buf_size;
+ size_t buf_alloc_size;
+};
+
struct mctp {
/* todo: multiple busses */
struct mctp_bus busses[1];
- struct mctp_pktbuf txbuf;
+ struct mctp_pktbuf *tx_queue_head;
+ struct mctp_pktbuf *tx_queue_tail;
/* Message RX callback */
mctp_rx_fn message_rx;
void *message_rx_data;
+
+ /* Message reassembly.
+ * @todo: flexible context count
+ */
+ struct mctp_msg_ctx msg_ctxs[16];
};
#ifndef BUILD_ASSERT
@@ -40,6 +55,10 @@
do { (void)sizeof(char[0-(!(x))]); } while (0)
#endif
+#ifndef ARRAY_SIZE
+#define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0]))
+#endif
+
struct mctp_pktbuf *mctp_pktbuf_alloc(uint8_t len)
{
struct mctp_pktbuf *buf;
@@ -110,6 +129,83 @@
return 0;
}
+/* Message reassembly */
+static struct mctp_msg_ctx *mctp_msg_ctx_lookup(struct mctp *mctp,
+ uint8_t src, uint8_t tag)
+{
+ unsigned int i;
+
+ /* @todo: better lookup, if we add support for more outstanding
+ * message contexts */
+ for (i = 0; i < ARRAY_SIZE(mctp->msg_ctxs); i++) {
+ struct mctp_msg_ctx *ctx = &mctp->msg_ctxs[i];
+ if (ctx->src == src && ctx->tag == tag)
+ return ctx;
+ }
+
+ return NULL;
+}
+
+static struct mctp_msg_ctx *mctp_msg_ctx_create(struct mctp *mctp,
+ uint8_t src, uint8_t tag)
+{
+ struct mctp_msg_ctx *ctx;
+ unsigned int i;
+
+ for (i = 0; i < ARRAY_SIZE(mctp->msg_ctxs); i++) {
+ struct mctp_msg_ctx *tmp = &mctp->msg_ctxs[i];
+ if (!tmp->src) {
+ ctx = tmp;
+ break;
+ }
+ }
+
+ if (!ctx)
+ return NULL;
+
+ ctx->src = src;
+ ctx->tag = tag;
+
+ return ctx;
+}
+
+static void mctp_msg_ctx_drop(struct mctp_msg_ctx *ctx)
+{
+ ctx->src = 0;
+}
+
+static void mctp_msg_ctx_reset(struct mctp_msg_ctx *ctx)
+{
+ ctx->buf_size = 0;
+}
+
+static int mctp_msg_ctx_add_pkt(struct mctp_msg_ctx *ctx,
+ struct mctp_pktbuf *pkt)
+{
+ size_t len;
+
+ len = mctp_pktbuf_size(pkt) - sizeof(struct mctp_hdr);
+
+ if (ctx->buf_size + len > ctx->buf_alloc_size) {
+ size_t new_alloc_size;
+
+ /* @todo: finer-grained allocation, size limits */
+ if (!ctx->buf_alloc_size) {
+ new_alloc_size = 4096;
+ } else {
+ new_alloc_size = ctx->buf_alloc_size * 2;
+ }
+ ctx->buf = __mctp_realloc(ctx->buf, new_alloc_size);
+ ctx->buf_alloc_size = new_alloc_size;
+ }
+
+ memcpy(ctx->buf + ctx->buf_size, mctp_pktbuf_data(pkt), len);
+ ctx->buf_size += len;
+
+ return 0;
+}
+
+/* Core API functions */
struct mctp *mctp_init(void)
{
struct mctp *mctp;
@@ -147,12 +243,75 @@
struct mctp_pktbuf *pkt)
{
struct mctp_bus *bus = &mctp->busses[bus_id];
+ struct mctp_msg_ctx *ctx;
+ struct mctp_hdr *hdr;
+ uint8_t flags, seq, tag;
size_t len;
void *p;
+ int rc;
- len = pkt->end - pkt->mctp_hdr_off - sizeof(struct mctp_hdr);
- p = pkt->data + pkt->mctp_hdr_off + sizeof(struct mctp_hdr),
- mctp->message_rx(bus->eid, mctp->message_rx_data, p, len);
+ hdr = mctp_pktbuf_hdr(pkt);
+
+ if (hdr->dest != bus->eid)
+ /* @todo: non-local packet routing */
+ return;
+
+ flags = hdr->flags_seq_tag & (MCTP_HDR_FLAG_SOM | MCTP_HDR_FLAG_EOM);
+ tag = (hdr->flags_seq_tag >> MCTP_HDR_TAG_SHIFT) & MCTP_HDR_TAG_MASK;
+ seq = (hdr->flags_seq_tag >> MCTP_HDR_SEQ_SHIFT) & MCTP_HDR_SEQ_MASK;
+
+ switch (flags) {
+ case MCTP_HDR_FLAG_SOM | MCTP_HDR_FLAG_EOM:
+ /* single-packet message - send straight up to rx function,
+ * no need to create a message context */
+ len = pkt->end - pkt->mctp_hdr_off - sizeof(struct mctp_hdr);
+ p = pkt->data + pkt->mctp_hdr_off + sizeof(struct mctp_hdr),
+ mctp->message_rx(bus->eid, mctp->message_rx_data, p, len);
+ break;
+
+ case MCTP_HDR_FLAG_SOM:
+ /* start of a new message - start the new context for
+ * future message reception. If an existing context is
+ * already present, drop it. */
+ ctx = mctp_msg_ctx_lookup(mctp, hdr->src, tag);
+ if (ctx) {
+ mctp_msg_ctx_reset(ctx);
+ } else {
+ ctx = mctp_msg_ctx_create(mctp, hdr->src, tag);
+ if (((ctx->last_seq + 1) % 4) != seq) {
+ mctp_msg_ctx_drop(ctx);
+ return;
+ }
+ }
+
+ rc = mctp_msg_ctx_add_pkt(ctx, pkt);
+ if (rc) {
+ mctp_msg_ctx_drop(ctx);
+ } else {
+ ctx->last_seq = seq;
+ }
+
+ break;
+
+ case MCTP_HDR_FLAG_EOM:
+ ctx = mctp_msg_ctx_lookup(mctp, hdr->src, tag);
+ if (!ctx)
+ return;
+
+ if (((ctx->last_seq + 1) % 4) != seq) {
+ mctp_msg_ctx_drop(ctx);
+ return;
+ }
+
+ rc = mctp_msg_ctx_add_pkt(ctx, pkt);
+ if (!rc) {
+ mctp->message_rx(bus->eid, mctp->message_rx_data,
+ ctx->buf, ctx->buf_size);
+ }
+
+ mctp_msg_ctx_drop(ctx);
+ break;
+ }
}
static int mctp_packet_tx(struct mctp *mctp __attribute__((unused)),
@@ -165,33 +324,56 @@
int mctp_message_tx(struct mctp *mctp, mctp_eid_t eid,
void *msg, size_t msg_len)
{
- struct mctp_pktbuf *pkt;
+ struct mctp_pktbuf *pkt, *tmp;
struct mctp_hdr *hdr;
struct mctp_bus *bus;
- int rc;
-
- /* todo: multiple-packet messages, sequence numbers */
- assert(msg_len <= MCTP_MTU);
+ size_t pkt_len, p;
bus = find_bus_for_eid(mctp, eid);
- pkt = mctp_pktbuf_alloc(msg_len + sizeof(*hdr));
- hdr = mctp_pktbuf_hdr(pkt);
+ /* queue up packets, each of max MCTP_MTU size */
+ for (p = 0; p < msg_len; ) {
+ pkt_len = msg_len - p;
+ if (pkt_len > MCTP_MTU)
+ pkt_len = MCTP_MTU;
- /* todo: tags */
- hdr->ver = bus->binding->version & 0xf;
- hdr->dest = eid;
- hdr->src = bus->eid;
- hdr->flags_seq_tag = MCTP_HDR_FLAG_SOM |
- MCTP_HDR_FLAG_EOM |
- (0 << MCTP_HDR_SEQ_SHIFT) |
- MCTP_HDR_FLAG_TO |
- (0 << MCTP_HDR_TAG_SHIFT);
+ pkt = mctp_pktbuf_alloc(pkt_len + sizeof(*hdr));
+ hdr = mctp_pktbuf_hdr(pkt);
- /* todo: zero copy? */
- memcpy(mctp_pktbuf_data(pkt), msg, msg_len);
+ /* todo: tags */
+ hdr->ver = bus->binding->version & 0xf;
+ hdr->dest = eid;
+ hdr->src = bus->eid;
+ hdr->flags_seq_tag = MCTP_HDR_FLAG_SOM |
+ MCTP_HDR_FLAG_EOM |
+ (0 << MCTP_HDR_SEQ_SHIFT) |
+ MCTP_HDR_FLAG_TO |
+ (0 << MCTP_HDR_TAG_SHIFT);
- rc = mctp_packet_tx(mctp, bus, pkt);
+ memcpy(mctp_pktbuf_data(pkt), msg + p, pkt_len);
- return rc;
+ /* add to tx queue */
+ if (mctp->tx_queue_tail)
+ mctp->tx_queue_tail->next = pkt;
+ else
+ mctp->tx_queue_head = pkt;
+ mctp->tx_queue_tail = pkt;
+
+ p += pkt_len;
+ }
+
+ /* send queued packets */
+ for (pkt = mctp->tx_queue_head; pkt;) {
+ mctp_prdebug("sending pkt, len %d",
+ mctp_pktbuf_size(pkt));
+ mctp_packet_tx(mctp, bus, pkt);
+ tmp = pkt->next;
+ mctp_pktbuf_free(pkt);
+ pkt = tmp;
+ }
+
+ mctp->tx_queue_tail = NULL;
+ mctp->tx_queue_head = NULL;
+
+ return 0;
}