dbus-pcap: A DBus packet parser and matcher for pcap captures

busctl(1) offers a `capture` option that's useful for recording DBus
traffic on a system. The resulting pcap can be loaded into wireshark
where it can be analysed, but the wireshark dissector has (had?)
limitations (only parsed header fields and not the body of the packet).

dbus-pcap will parse the pcap using scapy to read the packets, then
implements a dbus wire-format dissector with DBus match expression
support to filter the packet stream according to the supplied match
rules.

Here's an example invocation that extracts IPMI transport messages during the
boot process of an OpenPOWER OpenBMC system:

> $ ./dbus-pcap witherspoon-boot.pcap "type='signal',path='/org/openbmc/HostIpmi/1'"
>
> [[[108, 4, 1, 1, 14, 9982], [[1, "/org/openbmc/HostIpmi/1"], [2, "org.openbmc.HostIpmi"], [3, "ReceivedMessage"], [8, "yyyyay"], [7, ":1.14"]]], [206, 58, 0, 90, [4, 56, 28, 19, 2, 0]]]
> [[[108, 4, 1, 1, 14, 9984], [[1, "/org/openbmc/HostIpmi/1"], [2, "org.openbmc.HostIpmi"], [3, "ReceivedMessage"], [8, "yyyyay"], [7, ":1.14"]]], [207, 58, 0, 90, [4, 57, 125, 16, 2, 0]]]
> [[[108, 4, 1, 1, 14, 9986], [[1, "/org/openbmc/HostIpmi/1"], [2, "org.openbmc.HostIpmi"], [3, "ReceivedMessage"], [8, "yyyyay"], [7, ":1.14"]]], [208, 58, 0, 90, [4, 58, 134, 17, 2, 0]]]

The messages are emitted in JSON form and can be parsed by `jq`.

Zero or more DBus match expressions can be provided. A match occurs when
the current packet matches all rules in an expression, for any
expression supplied on the commandline. If no expressions are provided
then all packets are matched by default. Neither argN nor namespace
match rules are supported in this initial patch.

Signed-off-by: Andrew Jeffery <andrew@aj.id.au>
Change-Id: I3130426451207f556f33a17d540d6b32794e8503
diff --git a/amboar/obmc-scripts/dbus-pcap/dbus-pcap b/amboar/obmc-scripts/dbus-pcap/dbus-pcap
new file mode 100755
index 0000000..2c9bdd5
--- /dev/null
+++ b/amboar/obmc-scripts/dbus-pcap/dbus-pcap
@@ -0,0 +1,397 @@
+#!/usr/bin/python3
+
+from itertools import islice, cycle
+from collections import namedtuple
+from enum import Enum
+from scapy.all import rdpcap
+import sys
+import struct
+import json
+
+RawMessage = namedtuple("RawMessage", "endian, header, data")
+FixedHeader = namedtuple("FixedHeader", "endian, type, flags, version, length, cookie")
+CookedHeader = namedtuple("CookedHeader", "fixed, fields")
+CookedMessage = namedtuple("CookedMessage", "header, body")
+TypeProperty = namedtuple("TypeProperty", "field, type, nature")
+TypeContainer = namedtuple("TypeContainer", "type, members")
+Field = namedtuple("Field", "type, data")
+
+class MessageEndian(Enum):
+    LITTLE = ord('l')
+    BIG = ord('B')
+
+StructEndianLookup = {
+    MessageEndian.LITTLE.value : "<",
+    MessageEndian.BIG.value : ">"
+}
+
+class MessageType(Enum):
+    INVALID = 0
+    METHOD_CALL = 1
+    METHOD_RETURN = 2
+    ERROR = 3
+    SIGNAL = 4
+
+class MessageFlags(Enum):
+    NO_REPLY_EXPECTED = 0x01
+    NO_AUTO_START = 0x02
+    ALLOW_INTERACTIVE_AUTHORIZATION = 0x04
+
+class MessageFieldType(Enum):
+    INVALID = 0
+    PATH = 1
+    INTERFACE = 2
+    MEMBER = 3
+    ERROR_NAME = 4
+    REPLY_SERIAL = 5
+    DESTINATION = 6
+    SENDER = 7
+    SIGNATURE = 8
+    UNIX_FDS = 9
+
+class DBusType(Enum):
+    INVALID = 0
+    BYTE = ord('y')
+    BOOLEAN = ord('b')
+    INT16 = ord('n')
+    UINT16 = ord('q')
+    INT32 = ord('i')
+    UINT32 = ord('u')
+    INT64 = ord('x')
+    UINT64 = ord('t')
+    DOUBLE = ord('d')
+    STRING = ord('s')
+    OBJECT_PATH = ord('o')
+    SIGNATURE = ord('g')
+    ARRAY = ord('a')
+    STRUCT = ord('(')
+    VARIANT = ord('v')
+    DICT_ENTRY = ord('{')
+    UNIX_FD = ord('h')
+
+DBusContainerTerminatorLookup = {
+    chr(DBusType.STRUCT.value) : ')',
+    chr(DBusType.DICT_ENTRY.value) : '}',
+}
+
+class DBusTypeCategory(Enum):
+    FIXED = {
+        DBusType.BYTE.value,
+        DBusType.BOOLEAN.value,
+        DBusType.INT16.value,
+        DBusType.UINT16.value,
+        DBusType.INT32.value,
+        DBusType.UINT32.value,
+        DBusType.INT64.value,
+        DBusType.UINT64.value,
+        DBusType.DOUBLE.value,
+        DBusType.UNIX_FD.value
+    }
+    STRING = {
+        DBusType.STRING.value,
+        DBusType.OBJECT_PATH.value,
+        DBusType.SIGNATURE.value,
+    }
+    CONTAINER = {
+        DBusType.ARRAY.value,
+        DBusType.STRUCT.value,
+        DBusType.VARIANT.value,
+        DBusType.DICT_ENTRY.value,
+    }
+    RESERVED = {
+        DBusType.INVALID.value,
+    }
+
+TypePropertyLookup = {
+    DBusType.BYTE.value : TypeProperty(DBusType.BYTE, 'B', 1),
+    # DBus booleans are 32 bit, with only the LSB used. Extract as 'I'.
+    DBusType.BOOLEAN.value : TypeProperty(DBusType.BOOLEAN, 'I', 4),
+    DBusType.INT16.value : TypeProperty(DBusType.INT16, 'h', 2),
+    DBusType.UINT16.value : TypeProperty(DBusType.UINT16, 'H', 2),
+    DBusType.INT32.value : TypeProperty(DBusType.INT32, 'i', 4),
+    DBusType.UINT32.value : TypeProperty(DBusType.UINT32, 'I', 4),
+    DBusType.INT64.value : TypeProperty(DBusType.INT64, 'q', 8),
+    DBusType.UINT64.value : TypeProperty(DBusType.UINT64, 'Q', 8),
+    DBusType.DOUBLE.value : TypeProperty(DBusType.DOUBLE, 'd', 8),
+    DBusType.STRING.value : TypeProperty(DBusType.STRING, 's', DBusType.UINT32),
+    DBusType.OBJECT_PATH.value : TypeProperty(DBusType.OBJECT_PATH, 's', DBusType.UINT32),
+    DBusType.SIGNATURE.value : TypeProperty(DBusType.SIGNATURE, 's', DBusType.BYTE),
+    DBusType.ARRAY.value : TypeProperty(DBusType.ARRAY, None, DBusType.UINT32),
+    DBusType.STRUCT.value : TypeProperty(DBusType.STRUCT, None, 8),
+    DBusType.VARIANT.value : TypeProperty(DBusType.VARIANT, None, 1),
+    DBusType.DICT_ENTRY.value : TypeProperty(DBusType.DICT_ENTRY, None, 8),
+}
+
+def parse_signature(sigstream):
+    sig = ord(next(sigstream))
+    assert sig not in DBusTypeCategory.RESERVED.value
+    if sig in DBusTypeCategory.FIXED.value:
+        ty = TypePropertyLookup[sig].field, None
+    elif sig in DBusTypeCategory.STRING.value:
+        ty = TypePropertyLookup[sig].field, None
+    elif sig in DBusTypeCategory.CONTAINER.value:
+        if sig == DBusType.ARRAY.value:
+            ty = DBusType.ARRAY, parse_signature(sigstream)
+        elif sig == DBusType.STRUCT.value or sig == DBusType.DICT_ENTRY.value:
+            collected = list()
+            ty = parse_signature(sigstream)
+            while ty is not StopIteration:
+                collected.append(ty)
+                ty = parse_signature(sigstream)
+            ty = DBusType.STRUCT, collected
+        elif sig == DBusType.VARIANT.value:
+            ty = TypePropertyLookup[sig].field, None
+        else:
+            assert False
+    else:
+        assert chr(sig) in DBusContainerTerminatorLookup.values()
+        return StopIteration
+
+    return TypeContainer._make(ty)
+
+class AlignedStream(object):
+    def __init__(self, buf, offset=0):
+        self.stash = (buf, offset)
+        self.stream = iter(buf)
+        self.offset = offset
+
+    def align(self, tc):
+        assert tc.type.value in TypePropertyLookup
+        prop = TypePropertyLookup[tc.type.value]
+        if prop.field.value in DBusTypeCategory.STRING.value:
+            prop = TypePropertyLookup[prop.nature.value]
+        advance = (prop.nature - (self.offset & (prop.nature - 1))) % prop.nature
+        _ = bytes(islice(self.stream, advance))
+        self.offset += advance
+
+    def take(self, size):
+        val = islice(self.stream, size)
+        self.offset += size
+        return val
+
+    def autotake(self, tc):
+        assert tc.type.value in DBusTypeCategory.FIXED.value
+        assert tc.type.value in TypePropertyLookup
+        self.align(tc)
+        prop = TypePropertyLookup[tc.type.value]
+        return self.take(prop.nature)
+
+    def drain(self):
+        remaining = bytes(self.stream)
+        offset = self.offset
+        self.offset += len(remaining)
+        assert self.offset - self.stash[1] == len(self.stash[0])
+        return remaining, offset
+
+    def dump(self):
+        print("AlignedStream: absolute offset: {}".format(self.offset))
+        print("AlignedStream: relative offset: {}".format(self.offset - self.stash[1]))
+        print("AlignedStream: remaining buffer:\n{}".format(self.drain()[0]))
+        print("AlignedStream: provided buffer:\n{}".format(self.stash[0]))
+
+    def dump_assert(self, condition):
+        if condition:
+            return
+        self.dump()
+        assert condition
+
+def parse_fixed(endian, stream, tc):
+    assert tc.type.value in TypePropertyLookup
+    prop = TypePropertyLookup[tc.type.value]
+    val = bytes(stream.autotake(tc))
+    try:
+        val = struct.unpack("{}{}".format(endian, prop.type), val)[0]
+        return bool(val) if prop.type == DBusType.BOOLEAN else val
+    except struct.error as e:
+        print(e)
+        print("parse_fixed: Error unpacking {}".format(val))
+        print("parse_fixed: Attempting to unpack type {} with properties {}".format(tc.type, prop))
+        stream.dump_assert(False)
+
+def parse_string(endian, stream, tc):
+    assert tc.type.value in TypePropertyLookup
+    prop = TypePropertyLookup[tc.type.value]
+    size = parse_fixed(endian, stream, TypeContainer(prop.nature, None))
+    # Empty DBus strings have no NUL-terminator
+    if size == 0:
+        return ""
+    # stream.dump_assert(size > 0)
+    val = bytes(stream.take(size + 1))
+    try:
+        stream.dump_assert(len(val) == size + 1)
+        try:
+            return struct.unpack("{}{}".format(size, prop.type), val[:size])[0].decode()
+        except struct.error as e:
+            stream.dump()
+            raise AssertionError(e)
+    except AssertionError as e:
+        print("parse_string: Error unpacking string of length {} from {}".format(size, val))
+        raise e
+
+def parse_type(endian, stream, tc):
+    if tc.type.value in DBusTypeCategory.FIXED.value:
+        val = parse_fixed(endian, stream, tc)
+    elif tc.type.value in DBusTypeCategory.STRING.value:
+        val = parse_string(endian, stream, tc)
+    elif tc.type.value in DBusTypeCategory.CONTAINER.value:
+        val = parse_container(endian, stream, tc)
+    else:
+        stream.dump_assert(False)
+
+    return val
+
+def parse_array(endian, stream, tc):
+    arr = list()
+    length = parse_fixed(endian, stream, TypeContainer(DBusType.UINT32, None))
+    stream.align(tc)
+    offset = stream.offset
+    while (stream.offset - offset) < length:
+        elem = parse_type(endian, stream, tc)
+        arr.append(elem)
+    return arr
+
+def parse_struct(endian, stream, tcs):
+    arr = list()
+    stream.align(TypeContainer(DBusType.STRUCT, None))
+    for tc in tcs:
+        arr.append(parse_type(endian, stream, tc))
+    return arr
+
+def parse_variant(endian, stream):
+    sig = parse_string(endian, stream, TypeContainer(DBusType.SIGNATURE, None))
+    tc = parse_signature(iter(sig))
+    return parse_type(endian, stream, tc)
+
+def parse_container(endian, stream, tc):
+    if tc.type == DBusType.ARRAY:
+        return parse_array(endian, stream, tc.members)
+    elif tc.type in (DBusType.STRUCT, DBusType.DICT_ENTRY):
+        return parse_struct(endian, stream, tc.members)
+    elif tc.type == DBusType.VARIANT:
+        return parse_variant(endian, stream)
+    else:
+        stream.dump_assert(False)
+
+def parse_fields(endian, stream):
+    sig = parse_signature(iter("a(yv)"))
+    fields = parse_container(endian, stream, sig)
+    # The header ends after its alignment padding to an 8-boundary.
+    # https://dbus.freedesktop.org/doc/dbus-specification.html#message-protocol-messages
+    stream.align(TypeContainer(DBusType.STRUCT, None))
+    return list(map(lambda v: Field(MessageFieldType(v[0]), v[1]), fields))
+
+class MalformedPacketError(Exception):
+    pass
+
+def parse_header(raw):
+    assert raw.endian in StructEndianLookup.keys()
+    endian = StructEndianLookup[raw.endian]
+    fixed = FixedHeader._make(struct.unpack("{}BBBBLL".format(endian), raw.header))
+    astream = AlignedStream(raw.data, len(raw.header))
+    fields = parse_fields(endian, astream)
+    data, offset = astream.drain()
+    if fixed.length > len(data):
+        raise MalformedPacketError
+    return CookedHeader(fixed, fields), AlignedStream(data, offset)
+
+def parse_body(header, stream):
+    assert header.fixed.endian in StructEndianLookup
+    endian = StructEndianLookup[header.fixed.endian]
+    body = list()
+    for field in header.fields:
+        if field.type == MessageFieldType.SIGNATURE:
+            sigstream = iter(field.data)
+            try:
+                while True:
+                    tc = parse_signature(sigstream)
+                    val = parse_type(endian, stream, tc)
+                    body.append(val)
+            except StopIteration:
+                pass
+            break
+    return body
+
+def parse_message(raw):
+    try:
+        header, data = parse_header(raw)
+        try:
+            body = parse_body(header, data)
+            return CookedMessage(header, body)
+        except AssertionError as e:
+            print(header)
+            raise e
+    except AssertionError as e:
+        print(raw)
+        raise e
+
+def parse_packet(packet):
+    data = bytes(packet)
+    msg = parse_message(RawMessage(data[0], data[:12], data[12:]))
+    return msg
+
+def parse_session(session, matchers):
+    for packet in session:
+        try:
+            cooked = parse_packet(packet)
+            if not matchers or any(all(r(cooked) for r in m) for m in matchers):
+                yield cooked
+        except MalformedPacketError as e:
+            pass
+
+def gen_match_type(rule):
+    mt = MessageType.__members__[rule.value.upper()]
+    return lambda p: p.header.fixed.type == mt.value
+
+def gen_match_sender(rule):
+    mf = Field(MessageFieldType.SENDER, rule.value)
+    return lambda p: any(f == mf for f in p.header.fields)
+
+def gen_match_interface(rule):
+    mf = Field(MessageFieldType.INTERFACE, rule.value)
+    return lambda p: any(f == mf for f in p.header.fields)
+
+def gen_match_member(rule):
+    mf = Field(MessageFieldType.MEMBER, rule.value)
+    return lambda p: any(f == mf for f in p.header.fields)
+
+def gen_match_path(rule):
+    mf = Field(MessageFieldType.PATH, rule.value)
+    return lambda p: any(f == mf for f in p.header.fields)
+
+def gen_match_destination(rule):
+    mf = Field(MessageFieldType.DESTINATION, rule.value)
+    return lambda p: any(f == mf for f in p.header.fields)
+
+ValidMatchKeys = {
+        "type", "sender", "interface", "member", "path", "destination"
+}
+MatchRule = namedtuple("MatchExpression", "key, value")
+
+# https://dbus.freedesktop.org/doc/dbus-specification.html#message-bus-routing-match-rules
+def parse_match_rules(exprs):
+    matchers = list()
+    for mexpr in exprs:
+        rules = list()
+        for rexpr in mexpr.split(","):
+            rule = MatchRule._make(map(lambda s: str.strip(s, "'"), rexpr.split("=")))
+            assert rule.key in ValidMatchKeys, "Invalid expression: %" % rule
+            rules.append(globals()["gen_match_{}".format(rule.key)](rule))
+        matchers.append(rules)
+    return matchers
+
+def packetconv(obj):
+    if isinstance(obj, Enum):
+        return obj.value
+    raise TypeError
+
+def main():
+    stream = rdpcap(sys.argv[1])
+    matchers = [] if len(sys.argv) < 3 else parse_match_rules(sys.argv[2:])
+    try:
+        for msg in parse_session(stream, matchers):
+            print(json.dumps(msg, default=packetconv))
+    except BrokenPipeError:
+        pass
+
+if __name__ == "__main__":
+    main()
diff --git a/amboar/obmc-scripts/dbus-pcap/requirements.txt b/amboar/obmc-scripts/dbus-pcap/requirements.txt
new file mode 100644
index 0000000..30564ab
--- /dev/null
+++ b/amboar/obmc-scripts/dbus-pcap/requirements.txt
@@ -0,0 +1 @@
+scapy