blob: 1cb1f87ac76a10f84bbc2aef50c96174215164f2 [file] [log] [blame]
#!/usr/bin/python3
# SPDX-License-Identifier: Apache-2.0
# Copyright 2020 IBM Corp.
import argparse
import csv
import enum
from collections import namedtuple
from datetime import time, timedelta
import crc8
class UCD90320Command(bytes, enum.Enum):
def __new__(cls, value, width):
obj = bytes.__new__(cls, [value])
obj._value_ = value
obj.width = width
return obj
MONITOR_CONFIG = (0xD5, -1)
NUM_PAGES = (0xD6, 1)
GPIO_SELECT = (0xFA, 1)
GPIO_CONFIG = (0xFB, 1)
DEVICE_ID = (0xFD, -1)
class PMBusCommand(bytes, enum.Enum):
def __new__(cls, value, width):
obj = bytes.__new__(cls, [value])
obj._value_ = value
obj.width = width
return obj
PAGE = (0x00, 1)
OPERATION = (0x01, 1)
ON_OFF_CONFIG = (0x02, 1)
CLEAR_FAULTS = (0x03, 0)
PHASE = (0x04, 1)
PAGE_PLUS_WRITE = (0x05, -1)
PAGE_PLUS_READ = (0x06, -1)
WRITE_PROTECT = (0x10, 1)
STORE_DEFAULT_ALL = (0x11, 0)
RESTORE_DEFAULT_ALL = (0x12, 0)
STORE_DEFAULT_CODE = (0x13, 1)
RESTORE_DEFAULT_CODE = (0x14, 1)
STORE_USER_ALL = (0x15, 0)
RESTORE_USER_ALL = (0x16, 0)
STORE_USER_CODE = (0x17, 1)
RESTORE_USER_CODE = (0x18, 1)
CAPABILITY = (0x19, 1)
QUERY = (0x1A, 1)
SMBALERT_MASK = (0x1B, 2)
VOUT_MODE = (0x20, 1)
VOUT_COMMAND = (0x21, 2)
VOUT_CAL_OFFSET = (0x23, 2)
POUT_MAX = (0x31, 2)
FREQUENCY_SWITCH = (0x33, 2)
VIN_OFF = (0x36, 2)
FAN_COMMAND_1 = (0x3B, 2)
FAN_COMMAND_4 = (0x3F, 2)
VOUT_OV_FAULT_LIMIT = (0x40, 2)
VOUT_OV_WARN_LIMIT = (0x42, 2)
VOUT_UV_WARN_LIMIT = (0x43, 2)
VOUT_UV_FAULT_LIMIT = (0x44, 2)
IOUT_OC_LV_FAULT_LIMIT = (0x48, 2)
IOUT_OC_LV_FAULT_RESPONSE = (0x49, 1)
IOUT_UC_FAULT_RESPONSE = (0x4C, 1)
OT_FAULT_LIMIT = (0x4F, 2)
OT_WARN_LIMIT = (0x51, 2)
UT_WARN_LIMIT = (0x52, 2)
UT_FAULT_LIMIT = (0x53, 2)
VIN_UV_FAULT_LIMIT = (0x59, 2)
IIN_OC_FAULT_RESPONSE = (0x5C, 1)
TOFF_MAX_WARN_LIMIT = (0x66, 2)
STATUS_WORD = (0x79, 2)
STATUS_CML = (0x7E, 1)
STATUS_OTHER = (0x7F, 1)
STATUS_MFR_SPECIFIC = (0x80, 1)
READ_TEMPERATURE_3 = (0x8F, 2)
PMBUS_REVISION = (0x98, 1)
MFR_MODEL = (0x9A, -1)
IC_DEVICE_REV = (0xAE, -1)
USER_DATA_00 = (0xB0, -1)
USER_DATA_08 = (0xB8, -1)
MFR_SPECIFIC_05 = (0xD5, None)
MFR_SPECIFIC_06 = (0xD6, None)
MFR_SPECIFIC_42 = (0xFA, None)
MFR_SPECIFIC_43 = (0xFB, None)
MFR_SPECIFIC_45 = (0xFD, None)
class I2CCondition(enum.Enum):
START = 0
STOP = 1
class I2CRecord(enum.Enum):
WRITE = 0
READ = 1
class I2CResponse(enum.Enum):
ACK = 0
NACK = 1
# Level,Index,m:s.ms.us,Dur,Len,Err,S/P,Addr,Record,Data
# 0,1,0:29.722.525,210.600 us,1 B,,S,32,Write Transaction,0E
I2CTransfer = namedtuple(
"I2CTransfer",
(
"level",
"index",
"timestamp",
"duration",
"length",
"error",
"conditions",
"address",
"record",
"data",
),
)
Timestamp = namedtuple(
"Timestamp", ["minutes", "seconds", "milliseconds", "microseconds"]
)
I2CData = namedtuple("I2CData", ["response", "data"])
SMBusTransfer = namedtuple("SMBusTransfer", ["command", "response"])
def to_duration(field):
if field.endswith("us"):
if "." in field:
ms, us, _ = 0, *field.split(".")
else:
ms, us = 0, int(field.rstrip("us"))
elif field.endswith("ms"):
if "." in field:
ms, us, _ = field.split(".")
else:
ms, us = int(field.rstrip("ms")), 0
else:
raise ValueError
return timedelta(milliseconds=int(ms), microseconds=int(us))
def to_timestamp(field):
ts = Timestamp(*list(int(v) for v in field.replace(":", ".").split(".")))
return time(
0, ts.minutes, ts.seconds, ts.milliseconds * 1000 + ts.microseconds
)
def to_i2cdata(field):
resp = I2CResponse.NACK if field.endswith("*") else I2CResponse.ACK
return I2CData(resp, bytes(int(v, 16) for v in field.rstrip("*").split()))
def to_address(field):
return int(field, 16)
def to_i2cconditions(field):
if "S" == field:
return {I2CCondition.START}
elif "SP" == field:
return {I2CCondition.START, I2CCondition.STOP}
raise ValueError
def to_i2crecord(field):
if "Write Transaction" == field:
return I2CRecord.WRITE
if "Read Transaction" == field:
return I2CRecord.READ
raise ValueError
def to_i2ctransfer(line):
return I2CTransfer(
*line[:2],
to_timestamp(line[2]),
to_duration(line[3]),
*line[4:6],
to_i2cconditions(line[6]),
to_address(line[7]),
to_i2crecord(line[8]),
to_i2cdata(line[9])
)
def pmbuscommand_style(xfer):
return PMBusCommand(xfer.data.data[0])
def ucd90320command_style(xfer):
try:
return UCD90320Command(xfer.data.data[0])
except Exception:
return pmbuscommand_style(xfer)
def as_smbustransfers(i2cxfers, style):
command = None
for i2cxfer in i2cxfers:
if i2cxfer.conditions == {I2CCondition.START}:
assert not command
command = i2cxfer
if i2cxfer.conditions == {I2CCondition.START, I2CCondition.STOP}:
if command:
yield PMBusRead(style(command), command, i2cxfer)
command = None
else:
yield PMBusWrite(style(i2cxfer), i2cxfer)
def smbus_pec(data):
hash = crc8.crc8()
hash.update(data)
return hash.digest()[0]
def smbus_pec_pack_address(address, record):
return (address << 1) | record.value
class PMBusTransfer(object):
def calculate_pec(self):
raise NotImplementedError
def validate_pec(self):
if self.pec is None:
return True
derived = self.calculate_pec()
provided = self.pec
return provided == derived
def validate_xfer(self):
raise NotImplementedError
def valid(self):
return self.validate_xfer() and self.validate_pec()
class PMBusWrite(PMBusTransfer):
def __init__(self, command, xfer):
assert xfer.record == I2CRecord.WRITE
self.command = command
self.xfer = xfer
if command.width is None:
start, end = 1, len(xfer.data.data)
elif command.width == -1:
start, end = 1, xfer.data.data[0] + 1
else:
start, end = 1, command.width + 1
self.data = xfer.data.data[start:end]
tail = self.data[end:]
if len(tail) == 1:
(self.pec,) = tail
else:
self.pec = None
self.response = xfer.data.response
def calculate_pec(self):
data = (
smbus_pec_pack_address(self.xfer.address, self.xfer.record),
*self.xfer.data.data[:-1],
)
return smbus_pec(bytes(data))
def validate_xfer(self):
return self.response == I2CResponse.ACK
def __str__(self):
timestamp = self.xfer.timestamp.strftime("%M:%S.%f")
duration = self.xfer.duration.total_seconds()
data = "[ " + " ".join("{:02x}".format(v) for v in self.data) + " ]"
status = []
if self.response != I2CResponse.ACK:
status.append(self.response.name)
if not self.validate_pec():
status.append("!PEC")
if status:
status = " ".join(status)
fmt = (
"{0} {1:.6f} 0x{2.xfer.address:x} {3.name} "
+ "{2.command.name} {4} {5}"
)
return fmt.format(
timestamp, duration, self, I2CRecord.WRITE, data, status
)
fmt = "{0} {1:.6f} 0x{2.xfer.address:x} {3.name} {2.command.name} {4}"
return fmt.format(timestamp, duration, self, I2CRecord.WRITE, data)
class PMBusRead(PMBusTransfer):
def __init__(self, command, start, repeat):
assert repeat.record == I2CRecord.READ
self.command = command
self.start = start
self.repeat = repeat
assert start.address == repeat.address
self.address = start.address
if self.command.width is None:
start, end = 0, len(repeat.data.data)
elif self.command.width == -1:
start, end = 1, repeat.data.data[0] + 1
else:
start, end = 0, command.width
self.data = repeat.data.data[start:end]
tail = repeat.data.data[end:]
if len(tail) == 1:
(self.pec,) = tail
else:
self.pec = None
self.response = repeat.data.response
def calculate_pec(self):
data = (
smbus_pec_pack_address(self.start.address, self.start.record),
*self.start.data.data,
smbus_pec_pack_address(self.repeat.address, self.repeat.record),
*self.repeat.data.data[:-1],
)
return smbus_pec(bytes(data))
def validate_xfer(self):
return (
self.start.data.response == I2CResponse.ACK
and self.repeat.data.response == I2CResponse.NACK
)
def __str__(self):
timestamp = self.start.timestamp.strftime("%M:%S.%f")
duration = self.start.duration.total_seconds()
status = []
if self.start.data.response != I2CResponse.ACK:
status.append(self.start.data.response.name)
if status:
status = " ".join(status)
fmt = "{0} {1:.6f} 0x{2.address:x} {3.name} {2.command.name} {4}"
start = fmt.format(
timestamp, duration, self, I2CRecord.READ, status
)
else:
fmt = "{0} {1:.6f} 0x{2.address:x} {3.name} {2.command.name}"
start = fmt.format(timestamp, duration, self, I2CRecord.READ)
timestamp = self.repeat.timestamp.strftime("%M:%S.%f")
duration = self.repeat.duration.total_seconds()
data = " ".join("{:02x}".format(v) for v in self.data)
data = "[ " + data + " ]"
status = []
if self.repeat.data.response != I2CResponse.NACK:
status.append(self.repeat.data.response.name)
if not self.validate_pec():
status.append("!PEC")
status = " ".join(status)
fmt = "{0} {1:.6f} {2} {3}"
repeat = fmt.format(timestamp, duration, data, status)
return start + " | " + repeat
def filter_source(src):
for line in src:
if not line:
continue
if line.startswith("#"):
continue
if "Capture started" in line:
continue
if "Capture stopped" in line:
continue
yield line
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--after", type=str)
parser.add_argument("--before", type=str)
parser.add_argument("--longer-than", type=str)
parser.add_argument("--address", type=lambda x: int(x, 0))
parser.add_argument("--pmbus", action="store_true")
parser.add_argument("--ucd90320", action="store_true")
parser.add_argument("--bad-transfers", action="store_true")
parser.add_argument("file", type=str)
args = parser.parse_args()
with open(args.file, "r") as src:
data = (line for line in filter_source(src.readlines()))
xfers = (to_i2ctransfer(e) for e in csv.reader(data))
if args.after:
after = to_timestamp(args.after)
xfers = (e for e in xfers if e.timestamp > after)
if args.before:
before = to_timestamp(args.before)
xfers = (e for e in xfers if e.timestamp < before)
if args.longer_than:
minimum = to_duration(args.longer_than)
xfers = (e for e in xfers if e.duration > minimum)
if args.address is not None:
xfers = (e for e in xfers if e.address == args.address)
if args.ucd90320 or args.pmbus:
if args.ucd90320:
style = ucd90320command_style
else:
style = pmbuscommand_style
for xfer in as_smbustransfers(xfers, style):
if args.bad_transfers and xfer.valid():
continue
print(xfer)
else:
for xfer in xfers:
timestamp = xfer.timestamp.strftime("%M:%S.%f")
duration = xfer.duration.total_seconds()
data = (
"[ "
+ " ".join("{:02x}".format(v) for v in xfer.data.data)
+ " ]"
)
res = xfer.data.response.name
fmt = "{0} {1:.6f} 0x{2.address:x} {2.record.name} {3} {4}"
print(fmt.format(timestamp, duration, xfer, data, res))
if __name__ == "__main__":
main()