Add bi2cp - a Beagle I2C Parser
bi2cp parses CSV output from the Beagle logic analyzer.
Signed-off-by: Andrew Jeffery <andrew@aj.id.au>
Change-Id: Iba993ece40f295bd5bbc213dff1d325ae8281ab6
diff --git a/amboar/obmc-scripts/bi2cp/README.md b/amboar/obmc-scripts/bi2cp/README.md
new file mode 100644
index 0000000..7596b2b
--- /dev/null
+++ b/amboar/obmc-scripts/bi2cp/README.md
@@ -0,0 +1,27 @@
+## bi2cp: Beagle I2C Parser
+
+`bi2cp` parses CSV dumps from the Beagle logic analyzer to lift the raw I2C
+transfers to expose PMBus semantics.
+
+`bi2cp` can also test PEC bytes in transfers among other features - external
+dependencies are listed in `requirements.txt`:
+
+```
+$ pip3 install --user --requirement requirements.txt
+```
+
+## Example Run
+
+```
+$ ./bi2cp --address 0x11 --pmbus 'UCD recreate beagle scan 1.csv' | head
+00:30.757419 0.000211 0x11 READ MFR_SPECIFIC_45 | 00:30.757631 0.002693 [ 1b 55 43 44 39 30 33 32 30 7c 33 2e 30 2e 30 2e 33 30 32 39 7c 31 36 30 39 31 35 00 ]
+00:30.767143 0.000207 0x11 READ MFR_SPECIFIC_06 | 00:30.767350 0.000203 [ 20 ]
+00:30.767574 0.000205 0x11 READ MFR_SPECIFIC_05 | 00:30.767780 0.003184 [ 20 20 21 22 23 24 25 26 27 28 29 2a 2b 2c 2d 2e 2f 30 31 32 33 34 35 36 37 38 39 3a 3b 3c 3d 3e 3f ]
+00:30.771015 0.000302 0x11 WRITE MFR_SPECIFIC_42 [ 00 ]
+00:30.771332 0.000205 0x11 READ MFR_SPECIFIC_43 | 00:30.771538 0.000208 [ 08 ]
+00:30.771758 0.000301 0x11 WRITE MFR_SPECIFIC_42 [ 01 ]
+00:30.772072 0.000205 0x11 READ MFR_SPECIFIC_43 | 00:30.772278 0.000204 [ 00 ]
+00:30.772493 0.000301 0x11 WRITE MFR_SPECIFIC_42 [ 02 ]
+00:30.772805 0.000205 0x11 READ MFR_SPECIFIC_43 | 00:30.773010 0.000204 [ 00 ]
+00:30.773226 0.000301 0x11 WRITE MFR_SPECIFIC_42 [ 03 ]
+```
diff --git a/amboar/obmc-scripts/bi2cp/bi2cp b/amboar/obmc-scripts/bi2cp/bi2cp
new file mode 100755
index 0000000..4bfbd1d
--- /dev/null
+++ b/amboar/obmc-scripts/bi2cp/bi2cp
@@ -0,0 +1,388 @@
+#!/usr/bin/python3
+
+# SPDX-License-Identifier: Apache-2.0
+# Copyright 2020 IBM Corp.
+
+import csv
+import sys
+from collections import namedtuple
+from pprint import pprint
+from datetime import time, timedelta
+import argparse
+import enum
+import crc8
+
+class UCD90320Command(bytes, enum.Enum):
+ def __new__(cls, value, width):
+ obj = bytes.__new__(cls, [value])
+ obj._value_ = value
+ obj.width = width
+ return obj
+
+ MONITOR_CONFIG = (0xd5, -1)
+ NUM_PAGES = (0xd6, 1)
+ GPIO_SELECT = (0xfa, 1)
+ GPIO_CONFIG = (0xfb, 1)
+ DEVICE_ID = (0xfd, -1)
+
+class PMBusCommand(bytes, enum.Enum):
+ def __new__(cls, value, width):
+ obj = bytes.__new__(cls, [value])
+ obj._value_ = value
+ obj.width = width
+ return obj
+
+ PAGE = (0x00, 1)
+ OPERATION = (0x01, 1)
+ ON_OFF_CONFIG = (0x02, 1)
+ CLEAR_FAULTS = (0x03, 0)
+ PHASE = (0x04, 1)
+ PAGE_PLUS_WRITE = (0x05, -1)
+ PAGE_PLUS_READ = (0x06, -1)
+ WRITE_PROTECT = (0x10, 1)
+ STORE_DEFAULT_ALL = (0x11, 0)
+ RESTORE_DEFAULT_ALL = (0x12, 0)
+ STORE_DEFAULT_CODE = (0x13, 1)
+ RESTORE_DEFAULT_CODE = (0x14, 1)
+ STORE_USER_ALL = (0x15, 0)
+ RESTORE_USER_ALL = (0x16, 0)
+ STORE_USER_CODE = (0x17, 1)
+ RESTORE_USER_CODE = (0x18, 1)
+ CAPABILITY = (0x19, 1)
+ QUERY = (0x1a, 1)
+ SMBALERT_MASK = (0x1b, 2)
+ VOUT_MODE = (0x20, 1)
+ VOUT_COMMAND = (0x21, 2)
+ VOUT_CAL_OFFSET = (0x23, 2)
+ POUT_MAX = (0x31, 2)
+ FREQUENCY_SWITCH = (0x33, 2)
+ VIN_OFF = (0x36, 2)
+ FAN_COMMAND_1 = (0x3b, 2)
+ FAN_COMMAND_4 = (0x3f, 2)
+ VOUT_OV_FAULT_LIMIT = (0x40, 2)
+ VOUT_OV_WARN_LIMIT = (0x42, 2)
+ VOUT_UV_WARN_LIMIT = (0x43, 2)
+ VOUT_UV_FAULT_LIMIT = (0x44, 2)
+ IOUT_OC_LV_FAULT_LIMIT = (0x48, 2)
+ IOUT_OC_LV_FAULT_RESPONSE = (0x49, 1)
+ IOUT_UC_FAULT_RESPONSE = (0x4c, 1)
+ OT_FAULT_LIMIT = (0x4f, 2)
+ OT_WARN_LIMIT = (0x51, 2)
+ UT_WARN_LIMIT = (0x52, 2)
+ UT_FAULT_LIMIT = (0x53, 2)
+ VIN_UV_FAULT_LIMIT = (0x59, 2)
+ IIN_OC_FAULT_RESPONSE = (0x5c, 1)
+ TOFF_MAX_WARN_LIMIT = (0x66, 2)
+ STATUS_WORD = (0x79, 2)
+ STATUS_CML = (0x7e, 1)
+ STATUS_OTHER = (0x7f, 1)
+ STATUS_MFR_SPECIFIC = (0x80, 1)
+ READ_TEMPERATURE_3 = (0x8f, 2)
+ PMBUS_REVISION = (0x98, 1)
+ MFR_MODEL = (0x9a, -1)
+ IC_DEVICE_REV = (0xae, -1)
+ USER_DATA_00 = (0xb0, -1)
+ USER_DATA_08 = (0xb8, -1)
+ MFR_SPECIFIC_05 = (0xd5, None)
+ MFR_SPECIFIC_06 = (0xd6, None)
+ MFR_SPECIFIC_42 = (0xfa, None)
+ MFR_SPECIFIC_43 = (0xfb, None)
+ MFR_SPECIFIC_45 = (0xfd, None)
+
+
+class I2CCondition(enum.Enum):
+ START = 0
+ STOP = 1
+
+class I2CRecord(enum.Enum):
+ WRITE = 0
+ READ = 1
+
+class I2CResponse(enum.Enum):
+ ACK = 0
+ NACK = 1
+
+# Level,Index,m:s.ms.us,Dur,Len,Err,S/P,Addr,Record,Data
+# 0,1,0:29.722.525,210.600 us,1 B,,S,32,Write Transaction,0E
+I2CTransfer = namedtuple("I2CTransfer", ("level", "index", "timestamp", "duration", "length", "error", "conditions", "address", "record", "data"))
+Timestamp = namedtuple("Timestamp", ["minutes", "seconds", "milliseconds", "microseconds"])
+I2CData = namedtuple("I2CData", ["response", "data"])
+
+SMBusTransfer = namedtuple("SMBusTransfer", ["command", "response"])
+
+def to_duration(field):
+ if field.endswith("us"):
+ if "." in field:
+ ms, us, _ = 0, *field.split(".")
+ else:
+ ms, us = 0, int(field.rstrip("us"))
+ elif field.endswith("ms"):
+ if "." in field:
+ ms, us, _ = field.split(".")
+ else:
+ ms, us = int(field.rstrip("ms")), 0
+ else:
+ raise ValueError
+ return timedelta(milliseconds=int(ms), microseconds=int(us))
+
+def to_timestamp(field):
+ ts = Timestamp(*list(int(v) for v in field.replace(":", ".").split(".")))
+ return time(0, ts.minutes, ts.seconds, ts.milliseconds * 1000 + ts.microseconds)
+
+def to_i2cdata(field):
+ resp = I2CResponse.NACK if field.endswith("*") else I2CResponse.ACK
+ return I2CData(resp, bytes(int(v, 16) for v in field.rstrip("*").split()))
+
+def to_address(field):
+ return int(field, 16)
+
+def to_i2cconditions(field):
+ if "S" == field:
+ return { I2CCondition.START }
+ elif "SP" == field:
+ return { I2CCondition.START, I2CCondition.STOP }
+ raise ValueError
+
+def to_i2crecord(field):
+ if "Write Transaction" == field:
+ return I2CRecord.WRITE
+ if "Read Transaction" == field:
+ return I2CRecord.READ
+ raise ValueError
+
+def to_i2ctransfer(line):
+ return I2CTransfer(*line[:2],
+ to_timestamp(line[2]),
+ to_duration(line[3]),
+ *line[4:6],
+ to_i2cconditions(line[6]),
+ to_address(line[7]),
+ to_i2crecord(line[8]),
+ to_i2cdata(line[9]))
+
+def pmbuscommand_style(xfer):
+ return PMBusCommand(xfer.data.data[0])
+
+def ucd90320command_style(xfer):
+ try:
+ return UCD90320Command(xfer.data.data[0])
+ except:
+ return pmbuscommand_style(xfer)
+
+def as_smbustransfers(i2cxfers, style):
+ command = None
+ for i2cxfer in i2cxfers:
+ if i2cxfer.conditions == { I2CCondition.START }:
+ assert not command
+ command = i2cxfer
+ if i2cxfer.conditions == { I2CCondition.START, I2CCondition.STOP }:
+ if command:
+ yield PMBusRead(style(command), command, i2cxfer)
+ command = None
+ else:
+ yield PMBusWrite(style(i2cxfer), i2cxfer)
+
+def smbus_pec(data):
+ hash = crc8.crc8()
+ hash.update(data)
+ return hash.digest()[0]
+
+def smbus_pec_pack_address(address, record):
+ return (address << 1) | record.value
+
+class PMBusTransfer(object):
+ def calculate_pec(self):
+ raise NotImplemented
+
+ def validate_pec(self):
+ if self.pec is None:
+ return True
+ derived = self.calculate_pec()
+ provided = self.pec
+ return provided == derived
+
+ def validate_xfer(self):
+ raise NotImplemented
+
+ def valid(self):
+ return self.validate_xfer() and self.validate_pec()
+
+class PMBusWrite(PMBusTransfer):
+ def __init__(self, command, xfer):
+ assert xfer.record == I2CRecord.WRITE
+ self.command = command
+ self.xfer = xfer
+
+ if command.width is None:
+ start, end = 1, len(xfer.data.data)
+ elif command.width == -1:
+ start, end = 1, xfer.data.data[0] + 1
+ else:
+ start, end = 1, command.width + 1
+
+ self.data = xfer.data.data[start:end]
+ tail = self.data[end:]
+
+ if len(tail) == 1:
+ self.pec, = tail
+ else:
+ self.pec = None
+
+ self.response = xfer.data.response
+
+ def calculate_pec(self):
+ data = (smbus_pec_pack_address(self.xfer.address, self.xfer.record),
+ *self.xfer.data.data[:-1])
+ return smbus_pec(bytes(data))
+
+ def validate_xfer(self):
+ return self.response == I2CResponse.ACK
+
+ def __str__(self):
+ timestamp = self.xfer.timestamp.strftime("%M:%S.%f")
+ duration = self.xfer.duration.total_seconds()
+ data = "[ " + " ".join("{:02x}".format(v) for v in self.data) + " ]"
+
+ status = []
+ if self.response != I2CResponse.ACK:
+ status.append(self.response.name)
+
+ if not self.validate_pec():
+ status.append("!PEC")
+
+ if status:
+ status = " ".join(status)
+ fmt = "{0} {1:.6f} 0x{2.xfer.address:x} {3.name} {2.command.name} {4} {5}"
+ return fmt.format(timestamp, duration, self, I2CRecord.WRITE, data, status)
+
+ fmt = "{0} {1:.6f} 0x{2.xfer.address:x} {3.name} {2.command.name} {4}"
+ return fmt.format(timestamp, duration, self, I2CRecord.WRITE, data)
+
+class PMBusRead(PMBusTransfer):
+ def __init__(self, command, start, repeat):
+ assert repeat.record == I2CRecord.READ
+ self.command = command
+ self.start = start
+ self.repeat = repeat
+ assert start.address == repeat.address
+ self.address = start.address
+
+ if self.command.width is None:
+ start, end = 0, len(repeat.data.data)
+ elif self.command.width == -1:
+ start, end = 1, repeat.data.data[0] + 1
+ else:
+ start, end = 0, command.width
+
+ self.data = repeat.data.data[start:end]
+ tail = repeat.data.data[end:]
+
+ if len(tail) == 1:
+ self.pec, = tail
+ else:
+ self.pec = None
+
+ self.response = repeat.data.response
+
+ def calculate_pec(self):
+ data = (smbus_pec_pack_address(self.start.address, self.start.record),
+ *self.start.data.data,
+ smbus_pec_pack_address(self.repeat.address, self.repeat.record),
+ *self.repeat.data.data[:-1])
+ return smbus_pec(bytes(data))
+
+ def validate_xfer(self):
+ return (self.start.data.response == I2CResponse.ACK and
+ self.repeat.data.response == I2CResponse.NACK)
+
+ def __str__(self):
+ timestamp = self.start.timestamp.strftime("%M:%S.%f")
+ duration = self.start.duration.total_seconds()
+
+ status = []
+ if self.start.data.response != I2CResponse.ACK:
+ status.append(self.start.data.response.name)
+
+ if status:
+ status = " ".join(status)
+ fmt = "{0} {1:.6f} 0x{2.address:x} {3.name} {2.command.name} {4}"
+ start = fmt.format(timestamp, duration, self, I2CRecord.READ, status)
+ else:
+ fmt = "{0} {1:.6f} 0x{2.address:x} {3.name} {2.command.name}"
+ start = fmt.format(timestamp, duration, self, I2CRecord.READ)
+
+ timestamp = self.repeat.timestamp.strftime("%M:%S.%f")
+ duration = self.repeat.duration.total_seconds()
+ data = " ".join("{:02x}".format(v) for v in self.data)
+ data = "[ " + data + " ]"
+
+ status = []
+ if self.repeat.data.response != I2CResponse.NACK:
+ status.append(self.repeat.data.response.name)
+
+ if not self.validate_pec():
+ status.append("!PEC")
+
+ status = " ".join(status)
+ fmt = "{0} {1:.6f} {2} {3}"
+ repeat = fmt.format(timestamp, duration, data, status)
+
+ return start + " | " + repeat
+
+def filter_source(src):
+ for line in src:
+ if not line:
+ continue
+ if line.startswith("#"):
+ continue
+ if "Capture started" in line:
+ continue
+ if "Capture stopped" in line:
+ continue
+ yield line
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--after", type=str)
+ parser.add_argument("--before", type=str)
+ parser.add_argument("--longer-than", type=str)
+ parser.add_argument("--address", type=lambda x: int(x, 0))
+ parser.add_argument("--pmbus", action="store_true")
+ parser.add_argument("--ucd90320", action="store_true")
+ parser.add_argument("--bad-transfers", action="store_true")
+ parser.add_argument("file", type=str)
+ args = parser.parse_args()
+ with open(args.file, "r") as src:
+ data = (line for line in filter_source(src.readlines()))
+ xfers = (to_i2ctransfer(e) for e in csv.reader(data))
+ if args.after:
+ after = to_timestamp(args.after)
+ xfers = (e for e in xfers if e.timestamp > after)
+ if args.before:
+ before = to_timestamp(args.before)
+ xfers = (e for e in xfers if e.timestamp < before)
+ if args.longer_than:
+ minimum = to_duration(args.longer_than)
+ xfers = (e for e in xfers if e.duration > minimum)
+ if args.address is not None:
+ xfers = (e for e in xfers if e.address == args.address)
+ if args.ucd90320 or args.pmbus:
+ if args.ucd90320:
+ style = ucd90320command_style
+ else:
+ style = pmbuscommand_style
+ for xfer in as_smbustransfers(xfers, style):
+ if args.bad_transfers and xfer.valid():
+ continue
+ print(xfer)
+ else:
+ for xfer in xfers:
+ timestamp = xfer.timestamp.strftime("%M:%S.%f")
+ duration = xfer.duration.total_seconds()
+ data = "[ " + " ".join("{:02x}".format(v) for v in xfer.data.data) + " ]"
+ res = xfer.data.response.name
+ fmt = "{0} {1:.6f} 0x{2.address:x} {2.record.name} {3} {4}"
+ print(fmt.format(timestamp, duration, xfer, data, res))
+
+if __name__ == "__main__":
+ main()
diff --git a/amboar/obmc-scripts/bi2cp/requirements.txt b/amboar/obmc-scripts/bi2cp/requirements.txt
new file mode 100644
index 0000000..72b44a7
--- /dev/null
+++ b/amboar/obmc-scripts/bi2cp/requirements.txt
@@ -0,0 +1 @@
+crc8