Add bi2cp - a Beagle I2C Parser

bi2cp parses CSV output from the Beagle logic analyzer.

Signed-off-by: Andrew Jeffery <>
Change-Id: Iba993ece40f295bd5bbc213dff1d325ae8281ab6
diff --git a/amboar/obmc-scripts/bi2cp/ b/amboar/obmc-scripts/bi2cp/
new file mode 100644
index 0000000..7596b2b
--- /dev/null
+++ b/amboar/obmc-scripts/bi2cp/
@@ -0,0 +1,27 @@
+## bi2cp: Beagle I2C Parser
+`bi2cp` parses CSV dumps from the Beagle logic analyzer to lift the raw I2C
+transfers to expose PMBus semantics.
+`bi2cp` can also test PEC bytes in transfers among other features - external
+dependencies are listed in `requirements.txt`:
+$ pip3 install --user --requirement requirements.txt
+## Example Run
+$ ./bi2cp --address 0x11 --pmbus 'UCD recreate beagle scan 1.csv'  | head
+00:30.757419 0.000211 0x11 READ MFR_SPECIFIC_45 | 00:30.757631 0.002693 [ 1b 55 43 44 39 30 33 32 30 7c 33 2e 30 2e 30 2e 33 30 32 39 7c 31 36 30 39 31 35 00 ] 
+00:30.767143 0.000207 0x11 READ MFR_SPECIFIC_06 | 00:30.767350 0.000203 [ 20 ] 
+00:30.767574 0.000205 0x11 READ MFR_SPECIFIC_05 | 00:30.767780 0.003184 [ 20 20 21 22 23 24 25 26 27 28 29 2a 2b 2c 2d 2e 2f 30 31 32 33 34 35 36 37 38 39 3a 3b 3c 3d 3e 3f ] 
+00:30.771015 0.000302 0x11 WRITE MFR_SPECIFIC_42 [ 00 ]
+00:30.771332 0.000205 0x11 READ MFR_SPECIFIC_43 | 00:30.771538 0.000208 [ 08 ] 
+00:30.771758 0.000301 0x11 WRITE MFR_SPECIFIC_42 [ 01 ]
+00:30.772072 0.000205 0x11 READ MFR_SPECIFIC_43 | 00:30.772278 0.000204 [ 00 ] 
+00:30.772493 0.000301 0x11 WRITE MFR_SPECIFIC_42 [ 02 ]
+00:30.772805 0.000205 0x11 READ MFR_SPECIFIC_43 | 00:30.773010 0.000204 [ 00 ] 
+00:30.773226 0.000301 0x11 WRITE MFR_SPECIFIC_42 [ 03 ]
diff --git a/amboar/obmc-scripts/bi2cp/bi2cp b/amboar/obmc-scripts/bi2cp/bi2cp
new file mode 100755
index 0000000..4bfbd1d
--- /dev/null
+++ b/amboar/obmc-scripts/bi2cp/bi2cp
@@ -0,0 +1,388 @@
+# SPDX-License-Identifier: Apache-2.0
+# Copyright 2020 IBM Corp.
+import csv
+import sys
+from collections import namedtuple
+from pprint import pprint
+from datetime import time, timedelta
+import argparse
+import enum
+import crc8
+class UCD90320Command(bytes, enum.Enum):
+    def __new__(cls, value, width):
+        obj = bytes.__new__(cls, [value])
+        obj._value_ = value
+        obj.width = width
+        return obj
+    MONITOR_CONFIG = (0xd5, -1)
+    NUM_PAGES = (0xd6, 1)
+    GPIO_SELECT = (0xfa, 1)
+    GPIO_CONFIG = (0xfb, 1)
+    DEVICE_ID = (0xfd, -1)
+class PMBusCommand(bytes, enum.Enum):
+    def __new__(cls, value, width):
+        obj = bytes.__new__(cls, [value])
+        obj._value_ = value
+        obj.width = width
+        return obj
+    PAGE = (0x00, 1)
+    OPERATION = (0x01, 1)
+    ON_OFF_CONFIG = (0x02, 1)
+    CLEAR_FAULTS = (0x03, 0)
+    PHASE = (0x04, 1)
+    PAGE_PLUS_WRITE = (0x05, -1)
+    PAGE_PLUS_READ = (0x06, -1)
+    WRITE_PROTECT = (0x10, 1)
+    STORE_DEFAULT_ALL = (0x11, 0)
+    RESTORE_DEFAULT_ALL = (0x12, 0)
+    STORE_DEFAULT_CODE = (0x13, 1)
+    RESTORE_DEFAULT_CODE = (0x14, 1)
+    STORE_USER_ALL = (0x15, 0)
+    RESTORE_USER_ALL = (0x16, 0)
+    STORE_USER_CODE = (0x17, 1)
+    RESTORE_USER_CODE = (0x18, 1)
+    CAPABILITY = (0x19, 1)
+    QUERY = (0x1a, 1)
+    SMBALERT_MASK = (0x1b, 2)
+    VOUT_MODE = (0x20, 1)
+    VOUT_COMMAND = (0x21, 2)
+    VOUT_CAL_OFFSET = (0x23, 2)
+    POUT_MAX = (0x31, 2)
+    FREQUENCY_SWITCH = (0x33, 2)
+    VIN_OFF = (0x36, 2)
+    FAN_COMMAND_1 = (0x3b, 2)
+    FAN_COMMAND_4 = (0x3f, 2)
+    VOUT_OV_FAULT_LIMIT = (0x40, 2)
+    VOUT_OV_WARN_LIMIT = (0x42, 2)
+    VOUT_UV_WARN_LIMIT = (0x43, 2)
+    VOUT_UV_FAULT_LIMIT = (0x44, 2)
+    IOUT_OC_LV_FAULT_LIMIT = (0x48, 2)
+    IOUT_OC_LV_FAULT_RESPONSE = (0x49, 1)
+    IOUT_UC_FAULT_RESPONSE = (0x4c, 1)
+    OT_FAULT_LIMIT = (0x4f, 2)
+    OT_WARN_LIMIT = (0x51, 2)
+    UT_WARN_LIMIT = (0x52, 2)
+    UT_FAULT_LIMIT = (0x53, 2)
+    VIN_UV_FAULT_LIMIT = (0x59, 2)
+    IIN_OC_FAULT_RESPONSE = (0x5c, 1)
+    TOFF_MAX_WARN_LIMIT = (0x66, 2)
+    STATUS_WORD = (0x79, 2)
+    STATUS_CML = (0x7e, 1)
+    STATUS_OTHER = (0x7f, 1)
+    STATUS_MFR_SPECIFIC = (0x80, 1)
+    READ_TEMPERATURE_3 = (0x8f, 2)
+    PMBUS_REVISION = (0x98, 1)
+    MFR_MODEL = (0x9a, -1)
+    IC_DEVICE_REV = (0xae, -1)
+    USER_DATA_00 = (0xb0, -1)
+    USER_DATA_08 = (0xb8, -1)
+    MFR_SPECIFIC_05 = (0xd5, None)
+    MFR_SPECIFIC_06 = (0xd6, None)
+    MFR_SPECIFIC_42 = (0xfa, None)
+    MFR_SPECIFIC_43 = (0xfb, None)
+    MFR_SPECIFIC_45 = (0xfd, None)
+class I2CCondition(enum.Enum):
+    START = 0
+    STOP = 1
+class I2CRecord(enum.Enum):
+    WRITE = 0
+    READ = 1
+class I2CResponse(enum.Enum):
+    ACK = 0
+    NACK = 1
+# Level,Index,,Dur,Len,Err,S/P,Addr,Record,Data
+# 0,1,0:29.722.525,210.600 us,1 B,,S,32,Write Transaction,0E
+I2CTransfer = namedtuple("I2CTransfer", ("level", "index", "timestamp", "duration", "length", "error", "conditions", "address", "record", "data"))
+Timestamp = namedtuple("Timestamp", ["minutes", "seconds", "milliseconds", "microseconds"])
+I2CData = namedtuple("I2CData", ["response", "data"])
+SMBusTransfer = namedtuple("SMBusTransfer", ["command", "response"])
+def to_duration(field):
+    if field.endswith("us"):
+        if "." in field:
+            ms, us, _ = 0, *field.split(".")
+        else:
+            ms, us = 0, int(field.rstrip("us"))
+    elif field.endswith("ms"):
+        if "." in field:
+            ms, us, _ = field.split(".")
+        else:
+            ms, us = int(field.rstrip("ms")), 0
+    else:
+        raise ValueError
+    return timedelta(milliseconds=int(ms), microseconds=int(us))
+def to_timestamp(field):
+    ts = Timestamp(*list(int(v) for v in field.replace(":", ".").split(".")))
+    return time(0, ts.minutes, ts.seconds, ts.milliseconds * 1000 + ts.microseconds)
+def to_i2cdata(field):
+    resp = I2CResponse.NACK if field.endswith("*") else I2CResponse.ACK
+    return I2CData(resp, bytes(int(v, 16) for v in field.rstrip("*").split()))
+def to_address(field):
+    return int(field, 16)
+def to_i2cconditions(field):
+    if "S" == field:
+        return { I2CCondition.START }
+    elif "SP" == field:
+        return { I2CCondition.START, I2CCondition.STOP }
+    raise ValueError
+def to_i2crecord(field):
+    if "Write Transaction" == field:
+        return I2CRecord.WRITE
+    if "Read Transaction" == field:
+        return I2CRecord.READ
+    raise ValueError
+def to_i2ctransfer(line):
+    return I2CTransfer(*line[:2],
+            to_timestamp(line[2]),
+            to_duration(line[3]),
+            *line[4:6],
+            to_i2cconditions(line[6]),
+            to_address(line[7]),
+            to_i2crecord(line[8]),
+            to_i2cdata(line[9]))
+def pmbuscommand_style(xfer):
+    return PMBusCommand([0])
+def ucd90320command_style(xfer):
+    try:
+        return UCD90320Command([0])
+    except:
+        return pmbuscommand_style(xfer)
+def as_smbustransfers(i2cxfers, style):
+    command = None
+    for i2cxfer in i2cxfers:
+        if i2cxfer.conditions == { I2CCondition.START }:
+            assert not command
+            command = i2cxfer
+        if i2cxfer.conditions == { I2CCondition.START, I2CCondition.STOP }:
+            if command:
+                yield PMBusRead(style(command), command, i2cxfer)
+                command = None
+            else:
+                yield PMBusWrite(style(i2cxfer), i2cxfer)
+def smbus_pec(data):
+    hash = crc8.crc8()
+    hash.update(data)
+    return hash.digest()[0]
+def smbus_pec_pack_address(address, record):
+    return (address << 1) | record.value
+class PMBusTransfer(object):
+    def calculate_pec(self):
+        raise NotImplemented
+    def validate_pec(self):
+        if self.pec is None:
+            return True
+        derived = self.calculate_pec()
+        provided = self.pec
+        return provided == derived
+    def validate_xfer(self):
+        raise NotImplemented
+    def valid(self):
+        return self.validate_xfer() and self.validate_pec()
+class PMBusWrite(PMBusTransfer):
+    def __init__(self, command, xfer):
+        assert xfer.record == I2CRecord.WRITE
+        self.command = command
+        self.xfer = xfer
+        if command.width is None:
+            start, end = 1, len(
+        elif command.width == -1:
+            start, end = 1,[0] + 1
+        else:
+            start, end = 1, command.width + 1
+ =[start:end]
+        tail =[end:]
+        if len(tail) == 1:
+            self.pec, = tail
+        else:
+            self.pec = None
+        self.response =
+    def calculate_pec(self):
+        data = (smbus_pec_pack_address(self.xfer.address, self.xfer.record),
+                *[:-1])
+        return smbus_pec(bytes(data))
+    def validate_xfer(self):
+        return self.response == I2CResponse.ACK
+    def __str__(self):
+        timestamp = self.xfer.timestamp.strftime("%M:%S.%f")
+        duration = self.xfer.duration.total_seconds()
+        data = "[ " + " ".join("{:02x}".format(v) for v in + " ]"
+        status = []
+        if self.response != I2CResponse.ACK:
+            status.append(
+        if not self.validate_pec():
+            status.append("!PEC")
+        if status:
+            status = " ".join(status)
+            fmt = "{0} {1:.6f} 0x{2.xfer.address:x} {} {} {4} {5}"
+            return fmt.format(timestamp, duration, self, I2CRecord.WRITE, data, status)
+        fmt = "{0} {1:.6f} 0x{2.xfer.address:x} {} {} {4}"
+        return fmt.format(timestamp, duration, self, I2CRecord.WRITE, data)
+class PMBusRead(PMBusTransfer):
+    def __init__(self, command, start, repeat):
+        assert repeat.record == I2CRecord.READ
+        self.command = command
+        self.start = start
+        self.repeat = repeat
+        assert start.address == repeat.address
+        self.address = start.address
+        if self.command.width is None:
+            start, end = 0, len(
+        elif self.command.width == -1:
+            start, end = 1,[0] + 1
+        else:
+            start, end = 0, command.width
+ =[start:end]
+        tail =[end:]
+        if len(tail) == 1:
+            self.pec, = tail
+        else:
+            self.pec = None
+        self.response =
+    def calculate_pec(self):
+        data = (smbus_pec_pack_address(self.start.address, self.start.record),
+                *,
+                smbus_pec_pack_address(self.repeat.address, self.repeat.record),
+                *[:-1])
+        return smbus_pec(bytes(data))
+    def validate_xfer(self):
+        return ( == I2CResponse.ACK and
+       == I2CResponse.NACK)
+    def __str__(self):
+        timestamp = self.start.timestamp.strftime("%M:%S.%f")
+        duration = self.start.duration.total_seconds()
+        status = []
+        if != I2CResponse.ACK:
+            status.append(
+        if status:
+            status = " ".join(status)
+            fmt = "{0} {1:.6f} 0x{2.address:x} {} {} {4}"
+            start = fmt.format(timestamp, duration, self, I2CRecord.READ, status)
+        else:
+            fmt = "{0} {1:.6f} 0x{2.address:x} {} {}"
+            start = fmt.format(timestamp, duration, self, I2CRecord.READ)
+        timestamp = self.repeat.timestamp.strftime("%M:%S.%f")
+        duration = self.repeat.duration.total_seconds()
+        data = " ".join("{:02x}".format(v) for v in
+        data = "[ " + data + " ]"
+        status = []
+        if != I2CResponse.NACK:
+            status.append(
+        if not self.validate_pec():
+            status.append("!PEC")
+        status = " ".join(status)
+        fmt = "{0} {1:.6f} {2} {3}"
+        repeat = fmt.format(timestamp, duration, data, status)
+        return start + " | " + repeat
+def filter_source(src):
+    for line in src:
+        if not line:
+            continue
+        if line.startswith("#"):
+            continue
+        if "Capture started" in line:
+            continue
+        if "Capture stopped" in line:
+            continue
+        yield line
+def main():
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--after", type=str)
+    parser.add_argument("--before", type=str)
+    parser.add_argument("--longer-than", type=str)
+    parser.add_argument("--address", type=lambda x: int(x, 0))
+    parser.add_argument("--pmbus", action="store_true")
+    parser.add_argument("--ucd90320", action="store_true")
+    parser.add_argument("--bad-transfers", action="store_true")
+    parser.add_argument("file", type=str)
+    args = parser.parse_args()
+    with open(args.file, "r") as src:
+        data = (line for line in filter_source(src.readlines()))
+        xfers = (to_i2ctransfer(e) for e in csv.reader(data))
+        if args.after:
+            after = to_timestamp(args.after)
+            xfers = (e for e in xfers if e.timestamp > after)
+        if args.before:
+            before = to_timestamp(args.before)
+            xfers = (e for e in xfers if e.timestamp < before)
+        if args.longer_than:
+            minimum = to_duration(args.longer_than)
+            xfers = (e for e in xfers if e.duration > minimum)
+        if args.address is not None:
+            xfers = (e for e in xfers if e.address == args.address)
+        if args.ucd90320 or args.pmbus:
+            if args.ucd90320:
+                style = ucd90320command_style
+            else:
+                style = pmbuscommand_style
+            for xfer in as_smbustransfers(xfers, style):
+                if args.bad_transfers and xfer.valid():
+                    continue
+                print(xfer)
+        else:
+            for xfer in xfers:
+                timestamp = xfer.timestamp.strftime("%M:%S.%f")
+                duration = xfer.duration.total_seconds()
+                data = "[ " + " ".join("{:02x}".format(v) for v in + " ]"
+                res =
+                fmt = "{0} {1:.6f} 0x{2.address:x} {} {3} {4}"
+                print(fmt.format(timestamp, duration, xfer, data, res))
+if __name__ == "__main__":
+    main()
diff --git a/amboar/obmc-scripts/bi2cp/requirements.txt b/amboar/obmc-scripts/bi2cp/requirements.txt
new file mode 100644
index 0000000..72b44a7
--- /dev/null
+++ b/amboar/obmc-scripts/bi2cp/requirements.txt
@@ -0,0 +1 @@