blob: 11ae2534fa266468cc1cceb33362a9c053b2deb0 [file] [log] [blame]
#!/usr/bin/python3
# SPDX-License-Identifier: Apache-2.0
# Copyright 2019 IBM Corp.
from argparse import ArgumentParser
from itertools import islice, cycle
from collections import namedtuple
from enum import Enum
from scapy.all import rdpcap
import struct
import json
import sys
RawMessage = namedtuple("RawMessage", "endian, header, data")
FixedHeader = namedtuple("FixedHeader", "endian, type, flags, version, length, cookie")
CookedHeader = namedtuple("CookedHeader", "fixed, fields")
CookedMessage = namedtuple("CookedMessage", "header, body")
TypeProperty = namedtuple("TypeProperty", "field, type, nature")
TypeContainer = namedtuple("TypeContainer", "type, members")
Field = namedtuple("Field", "type, data")
class MessageEndian(Enum):
LITTLE = ord('l')
BIG = ord('B')
StructEndianLookup = {
MessageEndian.LITTLE.value : "<",
MessageEndian.BIG.value : ">"
}
class MessageType(Enum):
INVALID = 0
METHOD_CALL = 1
METHOD_RETURN = 2
ERROR = 3
SIGNAL = 4
class MessageFlags(Enum):
NO_REPLY_EXPECTED = 0x01
NO_AUTO_START = 0x02
ALLOW_INTERACTIVE_AUTHORIZATION = 0x04
class MessageFieldType(Enum):
INVALID = 0
PATH = 1
INTERFACE = 2
MEMBER = 3
ERROR_NAME = 4
REPLY_SERIAL = 5
DESTINATION = 6
SENDER = 7
SIGNATURE = 8
UNIX_FDS = 9
class DBusType(Enum):
INVALID = 0
BYTE = ord('y')
BOOLEAN = ord('b')
INT16 = ord('n')
UINT16 = ord('q')
INT32 = ord('i')
UINT32 = ord('u')
INT64 = ord('x')
UINT64 = ord('t')
DOUBLE = ord('d')
STRING = ord('s')
OBJECT_PATH = ord('o')
SIGNATURE = ord('g')
ARRAY = ord('a')
STRUCT = ord('(')
VARIANT = ord('v')
DICT_ENTRY = ord('{')
UNIX_FD = ord('h')
DBusContainerTerminatorLookup = {
chr(DBusType.STRUCT.value) : ')',
chr(DBusType.DICT_ENTRY.value) : '}',
}
class DBusTypeCategory(Enum):
FIXED = {
DBusType.BYTE.value,
DBusType.BOOLEAN.value,
DBusType.INT16.value,
DBusType.UINT16.value,
DBusType.INT32.value,
DBusType.UINT32.value,
DBusType.INT64.value,
DBusType.UINT64.value,
DBusType.DOUBLE.value,
DBusType.UNIX_FD.value
}
STRING = {
DBusType.STRING.value,
DBusType.OBJECT_PATH.value,
DBusType.SIGNATURE.value,
}
CONTAINER = {
DBusType.ARRAY.value,
DBusType.STRUCT.value,
DBusType.VARIANT.value,
DBusType.DICT_ENTRY.value,
}
RESERVED = {
DBusType.INVALID.value,
}
TypePropertyLookup = {
DBusType.BYTE.value : TypeProperty(DBusType.BYTE, 'B', 1),
# DBus booleans are 32 bit, with only the LSB used. Extract as 'I'.
DBusType.BOOLEAN.value : TypeProperty(DBusType.BOOLEAN, 'I', 4),
DBusType.INT16.value : TypeProperty(DBusType.INT16, 'h', 2),
DBusType.UINT16.value : TypeProperty(DBusType.UINT16, 'H', 2),
DBusType.INT32.value : TypeProperty(DBusType.INT32, 'i', 4),
DBusType.UINT32.value : TypeProperty(DBusType.UINT32, 'I', 4),
DBusType.INT64.value : TypeProperty(DBusType.INT64, 'q', 8),
DBusType.UINT64.value : TypeProperty(DBusType.UINT64, 'Q', 8),
DBusType.DOUBLE.value : TypeProperty(DBusType.DOUBLE, 'd', 8),
DBusType.STRING.value : TypeProperty(DBusType.STRING, 's', DBusType.UINT32),
DBusType.OBJECT_PATH.value : TypeProperty(DBusType.OBJECT_PATH, 's', DBusType.UINT32),
DBusType.SIGNATURE.value : TypeProperty(DBusType.SIGNATURE, 's', DBusType.BYTE),
DBusType.ARRAY.value : TypeProperty(DBusType.ARRAY, None, DBusType.UINT32),
DBusType.STRUCT.value : TypeProperty(DBusType.STRUCT, None, 8),
DBusType.VARIANT.value : TypeProperty(DBusType.VARIANT, None, 1),
DBusType.DICT_ENTRY.value : TypeProperty(DBusType.DICT_ENTRY, None, 8),
DBusType.UNIX_FD.value : TypeProperty(DBusType.UINT32, None, 8),
}
def parse_signature(sigstream):
sig = ord(next(sigstream))
assert sig not in DBusTypeCategory.RESERVED.value
if sig in DBusTypeCategory.FIXED.value:
ty = TypePropertyLookup[sig].field, None
elif sig in DBusTypeCategory.STRING.value:
ty = TypePropertyLookup[sig].field, None
elif sig in DBusTypeCategory.CONTAINER.value:
if sig == DBusType.ARRAY.value:
ty = DBusType.ARRAY, parse_signature(sigstream)
elif sig == DBusType.STRUCT.value or sig == DBusType.DICT_ENTRY.value:
collected = list()
ty = parse_signature(sigstream)
while ty is not StopIteration:
collected.append(ty)
ty = parse_signature(sigstream)
ty = DBusType.STRUCT, collected
elif sig == DBusType.VARIANT.value:
ty = TypePropertyLookup[sig].field, None
else:
assert False
else:
assert chr(sig) in DBusContainerTerminatorLookup.values()
return StopIteration
return TypeContainer._make(ty)
class AlignedStream(object):
def __init__(self, buf, offset=0):
self.stash = (buf, offset)
self.stream = iter(buf)
self.offset = offset
def align(self, tc):
assert tc.type.value in TypePropertyLookup
prop = TypePropertyLookup[tc.type.value]
if prop.field.value in DBusTypeCategory.STRING.value:
prop = TypePropertyLookup[prop.nature.value]
if prop.nature == DBusType.UINT32:
prop = TypePropertyLookup[prop.nature.value]
advance = (prop.nature - (self.offset & (prop.nature - 1))) % prop.nature
_ = bytes(islice(self.stream, advance))
self.offset += advance
def take(self, size):
val = islice(self.stream, size)
self.offset += size
return val
def autotake(self, tc):
assert tc.type.value in DBusTypeCategory.FIXED.value
assert tc.type.value in TypePropertyLookup
self.align(tc)
prop = TypePropertyLookup[tc.type.value]
return self.take(prop.nature)
def drain(self):
remaining = bytes(self.stream)
offset = self.offset
self.offset += len(remaining)
if self.offset - self.stash[1] != len(self.stash[0]):
print("(self.offset - self.stash[1]): %d, len(self.stash[0]): %d"
% (self.offset - self.stash[1], len(self.stash[0])), file=sys.stderr)
raise MalformedPacketError
return remaining, offset
def dump(self):
print("AlignedStream: absolute offset: {}".format(self.offset), file=sys.stderr)
print("AlignedStream: relative offset: {}".format(self.offset - self.stash[1]),
file=sys.stderr)
print("AlignedStream: remaining buffer:\n{}".format(self.drain()[0]), file=sys.stderr)
print("AlignedStream: provided buffer:\n{}".format(self.stash[0]), file=sys.stderr)
def dump_assert(self, condition):
if condition:
return
self.dump()
assert condition
def parse_fixed(endian, stream, tc):
assert tc.type.value in TypePropertyLookup
prop = TypePropertyLookup[tc.type.value]
val = bytes(stream.autotake(tc))
try:
val = struct.unpack("{}{}".format(endian, prop.type), val)[0]
return bool(val) if prop.type == DBusType.BOOLEAN else val
except struct.error as e:
print(e, file=sys.stderr)
print("parse_fixed: Error unpacking {}".format(val), file=sys.stderr)
print("parse_fixed: Attempting to unpack type {} with properties {}".format(tc.type, prop),
file=sys.stderr)
stream.dump_assert(False)
def parse_string(endian, stream, tc):
assert tc.type.value in TypePropertyLookup
prop = TypePropertyLookup[tc.type.value]
size = parse_fixed(endian, stream, TypeContainer(prop.nature, None))
# Empty DBus strings have no NUL-terminator
if size == 0:
return ""
# stream.dump_assert(size > 0)
val = bytes(stream.take(size + 1))
try:
stream.dump_assert(len(val) == size + 1)
try:
return struct.unpack("{}{}".format(size, prop.type), val[:size])[0].decode()
except struct.error as e:
stream.dump()
raise AssertionError(e)
except AssertionError as e:
print("parse_string: Error unpacking string of length {} from {}".format(size, val),
file=sys.stderr)
raise e
def parse_type(endian, stream, tc):
if tc.type.value in DBusTypeCategory.FIXED.value:
val = parse_fixed(endian, stream, tc)
elif tc.type.value in DBusTypeCategory.STRING.value:
val = parse_string(endian, stream, tc)
elif tc.type.value in DBusTypeCategory.CONTAINER.value:
val = parse_container(endian, stream, tc)
else:
stream.dump_assert(False)
return val
def parse_array(endian, stream, tc):
arr = list()
length = parse_fixed(endian, stream, TypeContainer(DBusType.UINT32, None))
stream.align(tc)
offset = stream.offset
while (stream.offset - offset) < length:
elem = parse_type(endian, stream, tc)
arr.append(elem)
if (stream.offset - offset) < length:
stream.align(tc)
return arr
def parse_struct(endian, stream, tcs):
arr = list()
stream.align(TypeContainer(DBusType.STRUCT, None))
for tc in tcs:
arr.append(parse_type(endian, stream, tc))
return arr
def parse_variant(endian, stream):
sig = parse_string(endian, stream, TypeContainer(DBusType.SIGNATURE, None))
tc = parse_signature(iter(sig))
return parse_type(endian, stream, tc)
def parse_container(endian, stream, tc):
if tc.type == DBusType.ARRAY:
return parse_array(endian, stream, tc.members)
elif tc.type in (DBusType.STRUCT, DBusType.DICT_ENTRY):
return parse_struct(endian, stream, tc.members)
elif tc.type == DBusType.VARIANT:
return parse_variant(endian, stream)
else:
stream.dump_assert(False)
def parse_fields(endian, stream):
sig = parse_signature(iter("a(yv)"))
fields = parse_container(endian, stream, sig)
# The header ends after its alignment padding to an 8-boundary.
# https://dbus.freedesktop.org/doc/dbus-specification.html#message-protocol-messages
stream.align(TypeContainer(DBusType.STRUCT, None))
return list(map(lambda v: Field(MessageFieldType(v[0]), v[1]), fields))
class MalformedPacketError(Exception):
pass
def parse_header(raw, ignore_error):
assert raw.endian in StructEndianLookup.keys()
endian = StructEndianLookup[raw.endian]
fixed = FixedHeader._make(struct.unpack("{}BBBBLL".format(endian), raw.header))
astream = AlignedStream(raw.data, len(raw.header))
fields = parse_fields(endian, astream)
data, offset = astream.drain()
if ignore_error == False and fixed.length > len(data):
raise MalformedPacketError
return CookedHeader(fixed, fields), AlignedStream(data, offset)
def parse_body(header, stream):
assert header.fixed.endian in StructEndianLookup
endian = StructEndianLookup[header.fixed.endian]
body = list()
for field in header.fields:
if field.type == MessageFieldType.SIGNATURE:
sigstream = iter(field.data)
try:
while True:
tc = parse_signature(sigstream)
val = parse_type(endian, stream, tc)
body.append(val)
except StopIteration:
pass
break
return body
def parse_message(raw):
try:
header, data = parse_header(raw, False)
try:
body = parse_body(header, data)
return CookedMessage(header, body)
except AssertionError as e:
print(header, file=sys.stderr)
raise e
except AssertionError as e:
print(raw, file=sys.stderr)
raise e
def parse_packet(packet):
data = bytes(packet)
raw = RawMessage(data[0], data[:12], data[12:])
try:
msg = parse_message(raw)
except MalformedPacketError as e:
print("Got malformed packet: {}".format(raw), file=sys.stderr)
# For a message that is so large that its payload data could not be parsed,
# just parse its header, then set its data field to empty.
header, data = parse_header(raw, True)
msg = CookedMessage(header, [])
return msg
CallEnvelope = namedtuple("CallEnvelope", "cookie, origin")
def parse_session(session, matchers, track_calls):
calls = set()
for packet in session:
try:
cooked = parse_packet(packet)
if not matchers:
yield packet.time, cooked
elif any(all(r(cooked) for r in m) for m in matchers):
if cooked.header.fixed.type == MessageType.METHOD_CALL.value:
s = [f for f in cooked.header.fields
if f.type == MessageFieldType.SENDER][0]
calls.add(CallEnvelope(cooked.header.fixed.cookie, s.data))
yield packet.time, cooked
elif track_calls:
if cooked.header.fixed.type != MessageType.METHOD_RETURN.value:
continue
rs = [f for f in cooked.header.fields
if f.type == MessageFieldType.REPLY_SERIAL][0]
d = [f for f in cooked.header.fields
if f.type == MessageFieldType.DESTINATION][0]
ce = CallEnvelope(rs.data, d.data)
if ce in calls:
calls.remove(ce)
yield packet.time, cooked
except MalformedPacketError as e:
pass
def gen_match_type(rule):
mt = MessageType.__members__[rule.value.upper()]
return lambda p: p.header.fixed.type == mt.value
def gen_match_sender(rule):
mf = Field(MessageFieldType.SENDER, rule.value)
return lambda p: any(f == mf for f in p.header.fields)
def gen_match_interface(rule):
mf = Field(MessageFieldType.INTERFACE, rule.value)
return lambda p: any(f == mf for f in p.header.fields)
def gen_match_member(rule):
mf = Field(MessageFieldType.MEMBER, rule.value)
return lambda p: any(f == mf for f in p.header.fields)
def gen_match_path(rule):
mf = Field(MessageFieldType.PATH, rule.value)
return lambda p: any(f == mf for f in p.header.fields)
def gen_match_destination(rule):
mf = Field(MessageFieldType.DESTINATION, rule.value)
return lambda p: any(f == mf for f in p.header.fields)
ValidMatchKeys = {
"type", "sender", "interface", "member", "path", "destination"
}
MatchRule = namedtuple("MatchExpression", "key, value")
# https://dbus.freedesktop.org/doc/dbus-specification.html#message-bus-routing-match-rules
def parse_match_rules(exprs):
matchers = list()
for mexpr in exprs:
rules = list()
for rexpr in mexpr.split(","):
rule = MatchRule._make(map(lambda s: str.strip(s, "'"), rexpr.split("=")))
assert rule.key in ValidMatchKeys, "Invalid expression: %" % rule
rules.append(globals()["gen_match_{}".format(rule.key)](rule))
matchers.append(rules)
return matchers
def packetconv(obj):
if isinstance(obj, Enum):
return obj.value
raise TypeError
def main():
parser = ArgumentParser()
parser.add_argument("--json", action="store_true",
help="Emit a JSON representation of the messages")
parser.add_argument("--no-track-calls", action="store_true", default=False,
help="Make a call response pass filters")
parser.add_argument("file", help="The pcap file")
parser.add_argument("expressions", nargs="*",
help="DBus message match expressions")
args = parser.parse_args()
stream = rdpcap(args.file)
matchers = parse_match_rules(args.expressions)
try:
if args.json:
for (_, msg) in parse_session(stream, matchers, not args.no_track_calls):
print("{}".format(json.dumps(msg, default=packetconv)))
else:
for (time, msg) in parse_session(stream, matchers, not args.no_track_calls):
print("{}: {}".format(time, msg))
print()
except BrokenPipeError:
pass
if __name__ == "__main__":
main()