python: fix flake8 warnings and format with black
Most of the flake8 warnings in this repository were fairly trivial,
so fixed them. The "openbmctool" is 7000+ lines of pretty heavily
warned code, so just disabling that one. Format everything with
black.
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: Icb3f6ee9bf03dece58785f7af00617c87a84aa65
diff --git a/.linter-ignore b/.linter-ignore
new file mode 100644
index 0000000..1fd098c
--- /dev/null
+++ b/.linter-ignore
@@ -0,0 +1 @@
+openbmctool/openbmctool.py
diff --git a/altitude/altitude b/altitude/altitude
index 5500ea0..e3c4413 100755
--- a/altitude/altitude
+++ b/altitude/altitude
@@ -5,9 +5,8 @@
# https://en.wikipedia.org/wiki/Barometric_formula
-from math import exp, log
-import sys
import argparse
+from math import exp, log
Pb = 101325.00
Tb = 288.15
@@ -18,29 +17,51 @@
M = 0.0289644
C0 = 273.15
+
def P(h):
return Pb * exp((-g0 * M * (h - hb)) / (Rstar * Tb))
+
def T(h):
return (h - hb) * Lb + Tb
+
def Hp(p):
return (log(p / Pb) * (Rstar * Tb)) / (-g0 * M) + hb
+
def Ht(t):
return ((t - Tb) / Lb) + hb
+
def K(c):
return C0 + c
+
def C(k):
return k - C0
+
def main():
parser = argparse.ArgumentParser()
- parser.add_argument("--height", type=float, default=None, help="Height above sea level in metres")
- parser.add_argument("--temperature", type=float, default=None, help="Temperature in Celcius")
- parser.add_argument("--pressure", type=float, default=None, help="Atmospheric pressure in Pascals")
+ parser.add_argument(
+ "--height",
+ type=float,
+ default=None,
+ help="Height above sea level in metres",
+ )
+ parser.add_argument(
+ "--temperature",
+ type=float,
+ default=None,
+ help="Temperature in Celcius",
+ )
+ parser.add_argument(
+ "--pressure",
+ type=float,
+ default=None,
+ help="Atmospheric pressure in Pascals",
+ )
args = parser.parse_args()
out = []
if args.height is not None:
@@ -69,5 +90,6 @@
out.append("\n\t".join(local))
print("\n\n".join(out))
+
if __name__ == "__main__":
main()
diff --git a/autojson/autojson.py b/autojson/autojson.py
index e4f2818..77dba50 100755
--- a/autojson/autojson.py
+++ b/autojson/autojson.py
@@ -6,9 +6,11 @@
from sys import argv
for file in argv[1:]:
- print("formatting file {}".format(file))
- with open(file) as f:
- j = json.load(f)
+ print("formatting file {}".format(file))
+ with open(file) as f:
+ j = json.load(f)
- with open(file, 'w') as f:
- f.write(json.dumps(j, indent=4, sort_keys=True, separators=(',', ': ')))
+ with open(file, "w") as f:
+ f.write(
+ json.dumps(j, indent=4, sort_keys=True, separators=(",", ": "))
+ )
diff --git a/bi2cp/bi2cp b/bi2cp/bi2cp
index 4bfbd1d..1cb1f87 100755
--- a/bi2cp/bi2cp
+++ b/bi2cp/bi2cp
@@ -3,15 +3,15 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2020 IBM Corp.
-import csv
-import sys
-from collections import namedtuple
-from pprint import pprint
-from datetime import time, timedelta
import argparse
+import csv
import enum
+from collections import namedtuple
+from datetime import time, timedelta
+
import crc8
+
class UCD90320Command(bytes, enum.Enum):
def __new__(cls, value, width):
obj = bytes.__new__(cls, [value])
@@ -19,11 +19,12 @@
obj.width = width
return obj
- MONITOR_CONFIG = (0xd5, -1)
- NUM_PAGES = (0xd6, 1)
- GPIO_SELECT = (0xfa, 1)
- GPIO_CONFIG = (0xfb, 1)
- DEVICE_ID = (0xfd, -1)
+ MONITOR_CONFIG = (0xD5, -1)
+ NUM_PAGES = (0xD6, 1)
+ GPIO_SELECT = (0xFA, 1)
+ GPIO_CONFIG = (0xFB, 1)
+ DEVICE_ID = (0xFD, -1)
+
class PMBusCommand(bytes, enum.Enum):
def __new__(cls, value, width):
@@ -49,67 +50,87 @@
STORE_USER_CODE = (0x17, 1)
RESTORE_USER_CODE = (0x18, 1)
CAPABILITY = (0x19, 1)
- QUERY = (0x1a, 1)
- SMBALERT_MASK = (0x1b, 2)
+ QUERY = (0x1A, 1)
+ SMBALERT_MASK = (0x1B, 2)
VOUT_MODE = (0x20, 1)
VOUT_COMMAND = (0x21, 2)
VOUT_CAL_OFFSET = (0x23, 2)
POUT_MAX = (0x31, 2)
FREQUENCY_SWITCH = (0x33, 2)
VIN_OFF = (0x36, 2)
- FAN_COMMAND_1 = (0x3b, 2)
- FAN_COMMAND_4 = (0x3f, 2)
+ FAN_COMMAND_1 = (0x3B, 2)
+ FAN_COMMAND_4 = (0x3F, 2)
VOUT_OV_FAULT_LIMIT = (0x40, 2)
VOUT_OV_WARN_LIMIT = (0x42, 2)
VOUT_UV_WARN_LIMIT = (0x43, 2)
VOUT_UV_FAULT_LIMIT = (0x44, 2)
IOUT_OC_LV_FAULT_LIMIT = (0x48, 2)
IOUT_OC_LV_FAULT_RESPONSE = (0x49, 1)
- IOUT_UC_FAULT_RESPONSE = (0x4c, 1)
- OT_FAULT_LIMIT = (0x4f, 2)
+ IOUT_UC_FAULT_RESPONSE = (0x4C, 1)
+ OT_FAULT_LIMIT = (0x4F, 2)
OT_WARN_LIMIT = (0x51, 2)
UT_WARN_LIMIT = (0x52, 2)
UT_FAULT_LIMIT = (0x53, 2)
VIN_UV_FAULT_LIMIT = (0x59, 2)
- IIN_OC_FAULT_RESPONSE = (0x5c, 1)
+ IIN_OC_FAULT_RESPONSE = (0x5C, 1)
TOFF_MAX_WARN_LIMIT = (0x66, 2)
STATUS_WORD = (0x79, 2)
- STATUS_CML = (0x7e, 1)
- STATUS_OTHER = (0x7f, 1)
+ STATUS_CML = (0x7E, 1)
+ STATUS_OTHER = (0x7F, 1)
STATUS_MFR_SPECIFIC = (0x80, 1)
- READ_TEMPERATURE_3 = (0x8f, 2)
+ READ_TEMPERATURE_3 = (0x8F, 2)
PMBUS_REVISION = (0x98, 1)
- MFR_MODEL = (0x9a, -1)
- IC_DEVICE_REV = (0xae, -1)
- USER_DATA_00 = (0xb0, -1)
- USER_DATA_08 = (0xb8, -1)
- MFR_SPECIFIC_05 = (0xd5, None)
- MFR_SPECIFIC_06 = (0xd6, None)
- MFR_SPECIFIC_42 = (0xfa, None)
- MFR_SPECIFIC_43 = (0xfb, None)
- MFR_SPECIFIC_45 = (0xfd, None)
+ MFR_MODEL = (0x9A, -1)
+ IC_DEVICE_REV = (0xAE, -1)
+ USER_DATA_00 = (0xB0, -1)
+ USER_DATA_08 = (0xB8, -1)
+ MFR_SPECIFIC_05 = (0xD5, None)
+ MFR_SPECIFIC_06 = (0xD6, None)
+ MFR_SPECIFIC_42 = (0xFA, None)
+ MFR_SPECIFIC_43 = (0xFB, None)
+ MFR_SPECIFIC_45 = (0xFD, None)
class I2CCondition(enum.Enum):
START = 0
STOP = 1
+
class I2CRecord(enum.Enum):
WRITE = 0
READ = 1
+
class I2CResponse(enum.Enum):
ACK = 0
NACK = 1
+
# Level,Index,m:s.ms.us,Dur,Len,Err,S/P,Addr,Record,Data
# 0,1,0:29.722.525,210.600 us,1 B,,S,32,Write Transaction,0E
-I2CTransfer = namedtuple("I2CTransfer", ("level", "index", "timestamp", "duration", "length", "error", "conditions", "address", "record", "data"))
-Timestamp = namedtuple("Timestamp", ["minutes", "seconds", "milliseconds", "microseconds"])
+I2CTransfer = namedtuple(
+ "I2CTransfer",
+ (
+ "level",
+ "index",
+ "timestamp",
+ "duration",
+ "length",
+ "error",
+ "conditions",
+ "address",
+ "record",
+ "data",
+ ),
+)
+Timestamp = namedtuple(
+ "Timestamp", ["minutes", "seconds", "milliseconds", "microseconds"]
+)
I2CData = namedtuple("I2CData", ["response", "data"])
SMBusTransfer = namedtuple("SMBusTransfer", ["command", "response"])
+
def to_duration(field):
if field.endswith("us"):
if "." in field:
@@ -125,24 +146,31 @@
raise ValueError
return timedelta(milliseconds=int(ms), microseconds=int(us))
+
def to_timestamp(field):
ts = Timestamp(*list(int(v) for v in field.replace(":", ".").split(".")))
- return time(0, ts.minutes, ts.seconds, ts.milliseconds * 1000 + ts.microseconds)
+ return time(
+ 0, ts.minutes, ts.seconds, ts.milliseconds * 1000 + ts.microseconds
+ )
+
def to_i2cdata(field):
resp = I2CResponse.NACK if field.endswith("*") else I2CResponse.ACK
return I2CData(resp, bytes(int(v, 16) for v in field.rstrip("*").split()))
+
def to_address(field):
return int(field, 16)
+
def to_i2cconditions(field):
if "S" == field:
- return { I2CCondition.START }
+ return {I2CCondition.START}
elif "SP" == field:
- return { I2CCondition.START, I2CCondition.STOP }
+ return {I2CCondition.START, I2CCondition.STOP}
raise ValueError
+
def to_i2crecord(field):
if "Write Transaction" == field:
return I2CRecord.WRITE
@@ -150,49 +178,58 @@
return I2CRecord.READ
raise ValueError
+
def to_i2ctransfer(line):
- return I2CTransfer(*line[:2],
- to_timestamp(line[2]),
- to_duration(line[3]),
- *line[4:6],
- to_i2cconditions(line[6]),
- to_address(line[7]),
- to_i2crecord(line[8]),
- to_i2cdata(line[9]))
+ return I2CTransfer(
+ *line[:2],
+ to_timestamp(line[2]),
+ to_duration(line[3]),
+ *line[4:6],
+ to_i2cconditions(line[6]),
+ to_address(line[7]),
+ to_i2crecord(line[8]),
+ to_i2cdata(line[9])
+ )
+
def pmbuscommand_style(xfer):
return PMBusCommand(xfer.data.data[0])
+
def ucd90320command_style(xfer):
try:
return UCD90320Command(xfer.data.data[0])
- except:
+ except Exception:
return pmbuscommand_style(xfer)
+
def as_smbustransfers(i2cxfers, style):
command = None
for i2cxfer in i2cxfers:
- if i2cxfer.conditions == { I2CCondition.START }:
+ if i2cxfer.conditions == {I2CCondition.START}:
assert not command
command = i2cxfer
- if i2cxfer.conditions == { I2CCondition.START, I2CCondition.STOP }:
+ if i2cxfer.conditions == {I2CCondition.START, I2CCondition.STOP}:
if command:
yield PMBusRead(style(command), command, i2cxfer)
command = None
else:
yield PMBusWrite(style(i2cxfer), i2cxfer)
+
def smbus_pec(data):
hash = crc8.crc8()
hash.update(data)
return hash.digest()[0]
+
def smbus_pec_pack_address(address, record):
return (address << 1) | record.value
+
class PMBusTransfer(object):
def calculate_pec(self):
- raise NotImplemented
+ raise NotImplementedError
def validate_pec(self):
if self.pec is None:
@@ -202,11 +239,12 @@
return provided == derived
def validate_xfer(self):
- raise NotImplemented
+ raise NotImplementedError
def valid(self):
return self.validate_xfer() and self.validate_pec()
+
class PMBusWrite(PMBusTransfer):
def __init__(self, command, xfer):
assert xfer.record == I2CRecord.WRITE
@@ -224,15 +262,17 @@
tail = self.data[end:]
if len(tail) == 1:
- self.pec, = tail
+ (self.pec,) = tail
else:
self.pec = None
self.response = xfer.data.response
def calculate_pec(self):
- data = (smbus_pec_pack_address(self.xfer.address, self.xfer.record),
- *self.xfer.data.data[:-1])
+ data = (
+ smbus_pec_pack_address(self.xfer.address, self.xfer.record),
+ *self.xfer.data.data[:-1],
+ )
return smbus_pec(bytes(data))
def validate_xfer(self):
@@ -252,12 +292,18 @@
if status:
status = " ".join(status)
- fmt = "{0} {1:.6f} 0x{2.xfer.address:x} {3.name} {2.command.name} {4} {5}"
- return fmt.format(timestamp, duration, self, I2CRecord.WRITE, data, status)
+ fmt = (
+ "{0} {1:.6f} 0x{2.xfer.address:x} {3.name} "
+ + "{2.command.name} {4} {5}"
+ )
+ return fmt.format(
+ timestamp, duration, self, I2CRecord.WRITE, data, status
+ )
fmt = "{0} {1:.6f} 0x{2.xfer.address:x} {3.name} {2.command.name} {4}"
return fmt.format(timestamp, duration, self, I2CRecord.WRITE, data)
+
class PMBusRead(PMBusTransfer):
def __init__(self, command, start, repeat):
assert repeat.record == I2CRecord.READ
@@ -278,22 +324,26 @@
tail = repeat.data.data[end:]
if len(tail) == 1:
- self.pec, = tail
+ (self.pec,) = tail
else:
self.pec = None
self.response = repeat.data.response
def calculate_pec(self):
- data = (smbus_pec_pack_address(self.start.address, self.start.record),
- *self.start.data.data,
- smbus_pec_pack_address(self.repeat.address, self.repeat.record),
- *self.repeat.data.data[:-1])
+ data = (
+ smbus_pec_pack_address(self.start.address, self.start.record),
+ *self.start.data.data,
+ smbus_pec_pack_address(self.repeat.address, self.repeat.record),
+ *self.repeat.data.data[:-1],
+ )
return smbus_pec(bytes(data))
def validate_xfer(self):
- return (self.start.data.response == I2CResponse.ACK and
- self.repeat.data.response == I2CResponse.NACK)
+ return (
+ self.start.data.response == I2CResponse.ACK
+ and self.repeat.data.response == I2CResponse.NACK
+ )
def __str__(self):
timestamp = self.start.timestamp.strftime("%M:%S.%f")
@@ -306,7 +356,9 @@
if status:
status = " ".join(status)
fmt = "{0} {1:.6f} 0x{2.address:x} {3.name} {2.command.name} {4}"
- start = fmt.format(timestamp, duration, self, I2CRecord.READ, status)
+ start = fmt.format(
+ timestamp, duration, self, I2CRecord.READ, status
+ )
else:
fmt = "{0} {1:.6f} 0x{2.address:x} {3.name} {2.command.name}"
start = fmt.format(timestamp, duration, self, I2CRecord.READ)
@@ -329,6 +381,7 @@
return start + " | " + repeat
+
def filter_source(src):
for line in src:
if not line:
@@ -341,6 +394,7 @@
continue
yield line
+
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--after", type=str)
@@ -379,10 +433,15 @@
for xfer in xfers:
timestamp = xfer.timestamp.strftime("%M:%S.%f")
duration = xfer.duration.total_seconds()
- data = "[ " + " ".join("{:02x}".format(v) for v in xfer.data.data) + " ]"
+ data = (
+ "[ "
+ + " ".join("{:02x}".format(v) for v in xfer.data.data)
+ + " ]"
+ )
res = xfer.data.response.name
fmt = "{0} {1:.6f} 0x{2.address:x} {2.record.name} {3} {4}"
print(fmt.format(timestamp, duration, xfer, data, res))
+
if __name__ == "__main__":
main()
diff --git a/dbus-pcap/dbus-pcap b/dbus-pcap/dbus-pcap
index 7d50aef..2b4e778 100755
--- a/dbus-pcap/dbus-pcap
+++ b/dbus-pcap/dbus-pcap
@@ -3,32 +3,38 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2019 IBM Corp.
+import json
+import struct
+import sys
from argparse import ArgumentParser
-from itertools import islice, cycle
from collections import namedtuple
from enum import Enum
+from itertools import islice
+
from scapy.all import rdpcap
-import struct
-import json
-import sys
RawMessage = namedtuple("RawMessage", "endian, header, data")
-FixedHeader = namedtuple("FixedHeader", "endian, type, flags, version, length, cookie")
+FixedHeader = namedtuple(
+ "FixedHeader", "endian, type, flags, version, length, cookie"
+)
CookedHeader = namedtuple("CookedHeader", "fixed, fields")
CookedMessage = namedtuple("CookedMessage", "header, body")
TypeProperty = namedtuple("TypeProperty", "field, type, nature")
TypeContainer = namedtuple("TypeContainer", "type, members")
Field = namedtuple("Field", "type, data")
+
class MessageEndian(Enum):
- LITTLE = ord('l')
- BIG = ord('B')
+ LITTLE = ord("l")
+ BIG = ord("B")
+
StructEndianLookup = {
- MessageEndian.LITTLE.value : "<",
- MessageEndian.BIG.value : ">"
+ MessageEndian.LITTLE.value: "<",
+ MessageEndian.BIG.value: ">",
}
+
class MessageType(Enum):
INVALID = 0
METHOD_CALL = 1
@@ -36,11 +42,13 @@
ERROR = 3
SIGNAL = 4
+
class MessageFlags(Enum):
NO_REPLY_EXPECTED = 0x01
NO_AUTO_START = 0x02
ALLOW_INTERACTIVE_AUTHORIZATION = 0x04
+
class MessageFieldType(Enum):
INVALID = 0
PATH = 1
@@ -53,31 +61,34 @@
SIGNATURE = 8
UNIX_FDS = 9
+
class DBusType(Enum):
INVALID = 0
- BYTE = ord('y')
- BOOLEAN = ord('b')
- INT16 = ord('n')
- UINT16 = ord('q')
- INT32 = ord('i')
- UINT32 = ord('u')
- INT64 = ord('x')
- UINT64 = ord('t')
- DOUBLE = ord('d')
- STRING = ord('s')
- OBJECT_PATH = ord('o')
- SIGNATURE = ord('g')
- ARRAY = ord('a')
- STRUCT = ord('(')
- VARIANT = ord('v')
- DICT_ENTRY = ord('{')
- UNIX_FD = ord('h')
+ BYTE = ord("y")
+ BOOLEAN = ord("b")
+ INT16 = ord("n")
+ UINT16 = ord("q")
+ INT32 = ord("i")
+ UINT32 = ord("u")
+ INT64 = ord("x")
+ UINT64 = ord("t")
+ DOUBLE = ord("d")
+ STRING = ord("s")
+ OBJECT_PATH = ord("o")
+ SIGNATURE = ord("g")
+ ARRAY = ord("a")
+ STRUCT = ord("(")
+ VARIANT = ord("v")
+ DICT_ENTRY = ord("{")
+ UNIX_FD = ord("h")
+
DBusContainerTerminatorLookup = {
- chr(DBusType.STRUCT.value) : ')',
- chr(DBusType.DICT_ENTRY.value) : '}',
+ chr(DBusType.STRUCT.value): ")",
+ chr(DBusType.DICT_ENTRY.value): "}",
}
+
class DBusTypeCategory(Enum):
FIXED = {
DBusType.BYTE.value,
@@ -89,7 +100,7 @@
DBusType.INT64.value,
DBusType.UINT64.value,
DBusType.DOUBLE.value,
- DBusType.UNIX_FD.value
+ DBusType.UNIX_FD.value,
}
STRING = {
DBusType.STRING.value,
@@ -106,27 +117,33 @@
DBusType.INVALID.value,
}
+
TypePropertyLookup = {
- DBusType.BYTE.value : TypeProperty(DBusType.BYTE, 'B', 1),
+ DBusType.BYTE.value: TypeProperty(DBusType.BYTE, "B", 1),
# DBus booleans are 32 bit, with only the LSB used. Extract as 'I'.
- DBusType.BOOLEAN.value : TypeProperty(DBusType.BOOLEAN, 'I', 4),
- DBusType.INT16.value : TypeProperty(DBusType.INT16, 'h', 2),
- DBusType.UINT16.value : TypeProperty(DBusType.UINT16, 'H', 2),
- DBusType.INT32.value : TypeProperty(DBusType.INT32, 'i', 4),
- DBusType.UINT32.value : TypeProperty(DBusType.UINT32, 'I', 4),
- DBusType.INT64.value : TypeProperty(DBusType.INT64, 'q', 8),
- DBusType.UINT64.value : TypeProperty(DBusType.UINT64, 'Q', 8),
- DBusType.DOUBLE.value : TypeProperty(DBusType.DOUBLE, 'd', 8),
- DBusType.STRING.value : TypeProperty(DBusType.STRING, 's', DBusType.UINT32),
- DBusType.OBJECT_PATH.value : TypeProperty(DBusType.OBJECT_PATH, 's', DBusType.UINT32),
- DBusType.SIGNATURE.value : TypeProperty(DBusType.SIGNATURE, 's', DBusType.BYTE),
- DBusType.ARRAY.value : TypeProperty(DBusType.ARRAY, None, DBusType.UINT32),
- DBusType.STRUCT.value : TypeProperty(DBusType.STRUCT, None, 8),
- DBusType.VARIANT.value : TypeProperty(DBusType.VARIANT, None, 1),
- DBusType.DICT_ENTRY.value : TypeProperty(DBusType.DICT_ENTRY, None, 8),
- DBusType.UNIX_FD.value : TypeProperty(DBusType.UINT32, None, 8),
+ DBusType.BOOLEAN.value: TypeProperty(DBusType.BOOLEAN, "I", 4),
+ DBusType.INT16.value: TypeProperty(DBusType.INT16, "h", 2),
+ DBusType.UINT16.value: TypeProperty(DBusType.UINT16, "H", 2),
+ DBusType.INT32.value: TypeProperty(DBusType.INT32, "i", 4),
+ DBusType.UINT32.value: TypeProperty(DBusType.UINT32, "I", 4),
+ DBusType.INT64.value: TypeProperty(DBusType.INT64, "q", 8),
+ DBusType.UINT64.value: TypeProperty(DBusType.UINT64, "Q", 8),
+ DBusType.DOUBLE.value: TypeProperty(DBusType.DOUBLE, "d", 8),
+ DBusType.STRING.value: TypeProperty(DBusType.STRING, "s", DBusType.UINT32),
+ DBusType.OBJECT_PATH.value: TypeProperty(
+ DBusType.OBJECT_PATH, "s", DBusType.UINT32
+ ),
+ DBusType.SIGNATURE.value: TypeProperty(
+ DBusType.SIGNATURE, "s", DBusType.BYTE
+ ),
+ DBusType.ARRAY.value: TypeProperty(DBusType.ARRAY, None, DBusType.UINT32),
+ DBusType.STRUCT.value: TypeProperty(DBusType.STRUCT, None, 8),
+ DBusType.VARIANT.value: TypeProperty(DBusType.VARIANT, None, 1),
+ DBusType.DICT_ENTRY.value: TypeProperty(DBusType.DICT_ENTRY, None, 8),
+ DBusType.UNIX_FD.value: TypeProperty(DBusType.UINT32, None, 8),
}
+
def parse_signature(sigstream):
sig = ord(next(sigstream))
assert sig not in DBusTypeCategory.RESERVED.value
@@ -154,6 +171,7 @@
return TypeContainer._make(ty)
+
class AlignedStream(object):
def __init__(self, buf, offset=0):
self.stash = (buf, offset)
@@ -167,7 +185,9 @@
prop = TypePropertyLookup[prop.nature.value]
if prop.nature == DBusType.UINT32:
prop = TypePropertyLookup[prop.nature.value]
- advance = (prop.nature - (self.offset & (prop.nature - 1))) % prop.nature
+ advance = (
+ prop.nature - (self.offset & (prop.nature - 1))
+ ) % prop.nature
_ = bytes(islice(self.stream, advance))
self.offset += len(_)
@@ -188,17 +208,33 @@
offset = self.offset
self.offset += len(remaining)
if self.offset - self.stash[1] != len(self.stash[0]):
- print("(self.offset - self.stash[1]): %d, len(self.stash[0]): %d"
- % (self.offset - self.stash[1], len(self.stash[0])), file=sys.stderr)
+ print(
+ "(self.offset - self.stash[1]): %d, len(self.stash[0]): %d"
+ % (self.offset - self.stash[1], len(self.stash[0])),
+ file=sys.stderr,
+ )
raise MalformedPacketError
return remaining, offset
def dump(self):
- print("AlignedStream: absolute offset: {}".format(self.offset), file=sys.stderr)
- print("AlignedStream: relative offset: {}".format(self.offset - self.stash[1]),
- file=sys.stderr)
- print("AlignedStream: remaining buffer:\n{}".format(self.drain()[0]), file=sys.stderr)
- print("AlignedStream: provided buffer:\n{}".format(self.stash[0]), file=sys.stderr)
+ print(
+ "AlignedStream: absolute offset: {}".format(self.offset),
+ file=sys.stderr,
+ )
+ print(
+ "AlignedStream: relative offset: {}".format(
+ self.offset - self.stash[1]
+ ),
+ file=sys.stderr,
+ )
+ print(
+ "AlignedStream: remaining buffer:\n{}".format(self.drain()[0]),
+ file=sys.stderr,
+ )
+ print(
+ "AlignedStream: provided buffer:\n{}".format(self.stash[0]),
+ file=sys.stderr,
+ )
def dump_assert(self, condition):
if condition:
@@ -206,6 +242,7 @@
self.dump()
assert condition
+
def parse_fixed(endian, stream, tc):
assert tc.type.value in TypePropertyLookup
prop = TypePropertyLookup[tc.type.value]
@@ -216,10 +253,16 @@
except struct.error as e:
print(e, file=sys.stderr)
print("parse_fixed: Error unpacking {}".format(val), file=sys.stderr)
- print("parse_fixed: Attempting to unpack type {} with properties {}".format(tc.type, prop),
- file=sys.stderr)
+ print(
+ (
+ "parse_fixed: Attempting to unpack type {} "
+ + "with properties {}"
+ ).format(tc.type, prop),
+ file=sys.stderr,
+ )
stream.dump_assert(False)
+
def parse_string(endian, stream, tc):
assert tc.type.value in TypePropertyLookup
prop = TypePropertyLookup[tc.type.value]
@@ -232,15 +275,22 @@
try:
stream.dump_assert(len(val) == size + 1)
try:
- return struct.unpack("{}{}".format(size, prop.type), val[:size])[0].decode()
+ return struct.unpack("{}{}".format(size, prop.type), val[:size])[
+ 0
+ ].decode()
except struct.error as e:
stream.dump()
raise AssertionError(e)
except AssertionError as e:
- print("parse_string: Error unpacking string of length {} from {}".format(size, val),
- file=sys.stderr)
+ print(
+ "parse_string: Error unpacking string of length {} from {}".format(
+ size, val
+ ),
+ file=sys.stderr,
+ )
raise e
+
def parse_type(endian, stream, tc):
if tc.type.value in DBusTypeCategory.FIXED.value:
val = parse_fixed(endian, stream, tc)
@@ -253,6 +303,7 @@
return val
+
def parse_array(endian, stream, tc):
arr = list()
length = parse_fixed(endian, stream, TypeContainer(DBusType.UINT32, None))
@@ -265,6 +316,7 @@
stream.align(tc)
return arr
+
def parse_struct(endian, stream, tcs):
arr = list()
stream.align(TypeContainer(DBusType.STRUCT, None))
@@ -272,11 +324,13 @@
arr.append(parse_type(endian, stream, tc))
return arr
+
def parse_variant(endian, stream):
sig = parse_string(endian, stream, TypeContainer(DBusType.SIGNATURE, None))
tc = parse_signature(iter(sig))
return parse_type(endian, stream, tc)
+
def parse_container(endian, stream, tc):
if tc.type == DBusType.ARRAY:
return parse_array(endian, stream, tc.members)
@@ -287,6 +341,7 @@
else:
stream.dump_assert(False)
+
def parse_fields(endian, stream):
sig = parse_signature(iter("a(yv)"))
fields = parse_container(endian, stream, sig)
@@ -295,20 +350,25 @@
stream.align(TypeContainer(DBusType.STRUCT, None))
return list(map(lambda v: Field(MessageFieldType(v[0]), v[1]), fields))
+
class MalformedPacketError(Exception):
pass
+
def parse_header(raw, ignore_error):
assert raw.endian in StructEndianLookup.keys()
endian = StructEndianLookup[raw.endian]
- fixed = FixedHeader._make(struct.unpack("{}BBBBLL".format(endian), raw.header))
+ fixed = FixedHeader._make(
+ struct.unpack("{}BBBBLL".format(endian), raw.header)
+ )
astream = AlignedStream(raw.data, len(raw.header))
fields = parse_fields(endian, astream)
data, offset = astream.drain()
- if ignore_error == False and fixed.length > len(data):
+ if (not ignore_error) and fixed.length > len(data):
raise MalformedPacketError
return CookedHeader(fixed, fields), AlignedStream(data, offset)
+
def parse_body(header, stream):
assert header.fixed.endian in StructEndianLookup
endian = StructEndianLookup[header.fixed.endian]
@@ -326,6 +386,7 @@
break
return body
+
def parse_message(raw):
try:
header, data = parse_header(raw, False)
@@ -339,20 +400,24 @@
print(raw, file=sys.stderr)
raise e
+
def parse_packet(packet):
data = bytes(packet)
raw = RawMessage(data[0], data[:12], data[12:])
try:
msg = parse_message(raw)
- except MalformedPacketError as e:
+ except MalformedPacketError:
print("Got malformed packet: {}".format(raw), file=sys.stderr)
- # For a message that is so large that its payload data could not be parsed,
- # just parse its header, then set its data field to empty.
+ # For a message that is so large that its payload data could not be
+ # parsed, just parse its header, then set its data field to empty.
header, data = parse_header(raw, True)
msg = CookedMessage(header, [])
return msg
+
CallEnvelope = namedtuple("CallEnvelope", "cookie, origin")
+
+
def parse_session(session, matchers, track_calls):
calls = set()
for packet in session:
@@ -362,92 +427,131 @@
yield packet.time, cooked
elif any(all(r(cooked) for r in m) for m in matchers):
if cooked.header.fixed.type == MessageType.METHOD_CALL.value:
- s = [f for f in cooked.header.fields
- if f.type == MessageFieldType.SENDER][0]
+ s = [
+ f
+ for f in cooked.header.fields
+ if f.type == MessageFieldType.SENDER
+ ][0]
calls.add(CallEnvelope(cooked.header.fixed.cookie, s.data))
yield packet.time, cooked
elif track_calls:
if cooked.header.fixed.type != MessageType.METHOD_RETURN.value:
continue
- rs = [f for f in cooked.header.fields
- if f.type == MessageFieldType.REPLY_SERIAL][0]
- d = [f for f in cooked.header.fields
- if f.type == MessageFieldType.DESTINATION][0]
+ rs = [
+ f
+ for f in cooked.header.fields
+ if f.type == MessageFieldType.REPLY_SERIAL
+ ][0]
+ d = [
+ f
+ for f in cooked.header.fields
+ if f.type == MessageFieldType.DESTINATION
+ ][0]
ce = CallEnvelope(rs.data, d.data)
if ce in calls:
calls.remove(ce)
yield packet.time, cooked
- except MalformedPacketError as e:
+ except MalformedPacketError:
pass
+
def gen_match_type(rule):
mt = MessageType.__members__[rule.value.upper()]
return lambda p: p.header.fixed.type == mt.value
+
def gen_match_sender(rule):
mf = Field(MessageFieldType.SENDER, rule.value)
return lambda p: any(f == mf for f in p.header.fields)
+
def gen_match_interface(rule):
mf = Field(MessageFieldType.INTERFACE, rule.value)
return lambda p: any(f == mf for f in p.header.fields)
+
def gen_match_member(rule):
mf = Field(MessageFieldType.MEMBER, rule.value)
return lambda p: any(f == mf for f in p.header.fields)
+
def gen_match_path(rule):
mf = Field(MessageFieldType.PATH, rule.value)
return lambda p: any(f == mf for f in p.header.fields)
+
def gen_match_destination(rule):
mf = Field(MessageFieldType.DESTINATION, rule.value)
return lambda p: any(f == mf for f in p.header.fields)
+
ValidMatchKeys = {
- "type", "sender", "interface", "member", "path", "destination"
+ "type",
+ "sender",
+ "interface",
+ "member",
+ "path",
+ "destination",
}
MatchRule = namedtuple("MatchExpression", "key, value")
+
# https://dbus.freedesktop.org/doc/dbus-specification.html#message-bus-routing-match-rules
def parse_match_rules(exprs):
matchers = list()
for mexpr in exprs:
rules = list()
for rexpr in mexpr.split(","):
- rule = MatchRule._make(map(lambda s: str.strip(s, "'"), rexpr.split("=")))
- assert rule.key in ValidMatchKeys, "Invalid expression: %" % rule
+ rule = MatchRule._make(
+ map(lambda s: str.strip(s, "'"), rexpr.split("="))
+ )
+ assert rule.key in ValidMatchKeys, f"Invalid expression: {rule}"
rules.append(globals()["gen_match_{}".format(rule.key)](rule))
matchers.append(rules)
return matchers
+
def packetconv(obj):
if isinstance(obj, Enum):
return obj.value
raise TypeError
+
def main():
parser = ArgumentParser()
- parser.add_argument("--json", action="store_true",
- help="Emit a JSON representation of the messages")
- parser.add_argument("--no-track-calls", action="store_true", default=False,
- help="Make a call response pass filters")
+ parser.add_argument(
+ "--json",
+ action="store_true",
+ help="Emit a JSON representation of the messages",
+ )
+ parser.add_argument(
+ "--no-track-calls",
+ action="store_true",
+ default=False,
+ help="Make a call response pass filters",
+ )
parser.add_argument("file", help="The pcap file")
- parser.add_argument("expressions", nargs="*",
- help="DBus message match expressions")
+ parser.add_argument(
+ "expressions", nargs="*", help="DBus message match expressions"
+ )
args = parser.parse_args()
stream = rdpcap(args.file)
matchers = parse_match_rules(args.expressions)
try:
if args.json:
- for (_, msg) in parse_session(stream, matchers, not args.no_track_calls):
+ for (_, msg) in parse_session(
+ stream, matchers, not args.no_track_calls
+ ):
print("{}".format(json.dumps(msg, default=packetconv)))
else:
- for (time, msg) in parse_session(stream, matchers, not args.no_track_calls):
+ for (time, msg) in parse_session(
+ stream, matchers, not args.no_track_calls
+ ):
print("{}: {}".format(time, msg))
print()
except BrokenPipeError:
pass
+
if __name__ == "__main__":
main()
diff --git a/dbus-vis/linecount.py b/dbus-vis/linecount.py
index ef7dd83..e3d98fc 100644
--- a/dbus-vis/linecount.py
+++ b/dbus-vis/linecount.py
@@ -1,8 +1,9 @@
# This script is used for printing out the number of packets in a pcap file
-from scapy.all import rdpcap
import sys
+from scapy.all import rdpcap
+
file_name = sys.argv[1]
try:
stream = rdpcap(file_name)
@@ -10,5 +11,5 @@
for packet in stream:
n += 1
print(n)
-except Exception as e:
+except Exception:
pass
diff --git a/dbusView/dbusView.py b/dbusView/dbusView.py
index fa2396d..a8e357f 100755
--- a/dbusView/dbusView.py
+++ b/dbusView/dbusView.py
@@ -3,94 +3,98 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2020 Intel Corp.
-import subprocess
-from flask import Flask
-from flask import send_file
import argparse
+import subprocess
-REPLACE_CHAR = '~'
+from flask import Flask
+
+REPLACE_CHAR = "~"
app = Flask(__name__)
-parser = argparse.ArgumentParser(description='Remote DBus Viewer')
-parser.add_argument('-u', '--username', default='root')
-parser.add_argument('-p', '--password', default='0penBmc')
-parser.add_argument('-a', '--address', required=True)
-parser.add_argument('-x', '--port', required=True)
+parser = argparse.ArgumentParser(description="Remote DBus Viewer")
+parser.add_argument("-u", "--username", default="root")
+parser.add_argument("-p", "--password", default="0penBmc")
+parser.add_argument("-a", "--address", required=True)
+parser.add_argument("-x", "--port", required=True)
args = parser.parse_args()
-busctl = 'sshpass -p {} busctl -H {}@{} '.format(
- args.password, args.username, args.address)
-header = '<head><link rel="icon" href="https://avatars1.githubusercontent.com/u/13670043?s=200&v=4" /></head>'
+busctl = "sshpass -p {} busctl -H {}@{} ".format(
+ args.password, args.username, args.address
+)
+AVATAR_URL = "https://avatars1.githubusercontent.com/u/13670043?s=200&v=4"
+header = f'<head><link rel="icon" href="{AVATAR_URL}" /></head>'
def getBusNames():
- out = subprocess.check_output(busctl + 'list --acquired', shell=True)
- out = out.split(b'\n')
+ out = subprocess.check_output(busctl + "list --acquired", shell=True)
+ out = out.split(b"\n")
out = out[1:]
names = []
for line in out:
- name = line.split(b' ')[0]
+ name = line.split(b" ")[0]
if name:
names.append(name.decode())
return names
def doTree(busname):
- out = subprocess.check_output(busctl + 'tree ' + busname, shell=True)
- out = out.split(b'\n')
+ out = subprocess.check_output(busctl + "tree " + busname, shell=True)
+ out = out.split(b"\n")
tree = []
for line in out:
- path = line.split(b'/', 1)[-1].decode()
- path = '/' + path
+ path = line.split(b"/", 1)[-1].decode()
+ path = "/" + path
tree.append(path)
return tree
def doIntrospect(busname, path):
out = subprocess.check_output(
- busctl + 'introspect {} {}'.format(busname, path), shell=True)
- return out.decode().split('\n')
+ busctl + "introspect {} {}".format(busname, path), shell=True
+ )
+ return out.decode().split("\n")
-@app.route('/')
+@app.route("/")
def root():
out = header
- out += '<div><H2>Bus Names {}</H2></div>'.format(args.address)
+ out += "<div><H2>Bus Names {}</H2></div>".format(args.address)
for name in getBusNames():
- out += '<div> '
+ out += "<div> "
out += '<a href="{}"> {} </a>'.format(name, name)
- out += '</div>'
+ out += "</div>"
return out
-@app.route('/favicon.ico')
+@app.route("/favicon.ico")
def favicon():
- return '<link rel="icon" type="image/png" href="https://avatars1.githubusercontent.com/u/13670043?s=200&v=4" />'
+ return f'<link rel="icon" type="image/png" href="{AVATAR_URL}" />'
-@app.route('/<name>')
+@app.route("/<name>")
def busname(name):
out = header
- out += '<div><H2>tree {}</H2></div>'.format(name)
+ out += "<div><H2>tree {}</H2></div>".format(name)
for path in doTree(name):
- out += '<div> '
+ out += "<div> "
out += '<a href="{}/{}"> {} </a>'.format(
- name, path.replace('/', REPLACE_CHAR), path)
- out += '</div>'
+ name, path.replace("/", REPLACE_CHAR), path
+ )
+ out += "</div>"
return out
-@app.route('/<name>/<path>')
+@app.route("/<name>/<path>")
def path(name, path):
- path = path.replace(REPLACE_CHAR, '/')
+ path = path.replace(REPLACE_CHAR, "/")
out = header
- out += '<div><H2>introspect {} {}</H2></div>'.format(name, path)
+ out += "<div><H2>introspect {} {}</H2></div>".format(name, path)
for line in doIntrospect(name, path):
- out += '<div> '
- out += '<pre> {} </pre>'.format(line)
- out += '</div>'
+ out += "<div> "
+ out += "<pre> {} </pre>".format(line)
+ out += "</div>"
return out
-app.run(port=args.port, host='0.0.0.0')
+app.run(port=args.port, host="0.0.0.0")
diff --git a/netboot/netboot b/netboot/netboot
index 8032ea5..47c0373 100755
--- a/netboot/netboot
+++ b/netboot/netboot
@@ -3,26 +3,30 @@
import argparse
import sys
import time
-import toml
from os import path
from telnetlib import Telnet
from types import MethodType
+
+import toml
from xdg import BaseDirectory
+
def expect_or_raise(conn, patterns, timeout=None):
- i, m, d = conn.expect([bytes(p, 'ascii') for p in patterns], timeout)
+ i, m, d = conn.expect([bytes(p, "ascii") for p in patterns], timeout)
if i == -1:
msg = "Match failed, expected '%s', got '%s'" % (str(patterns), d)
print(msg, file=sys.stderr)
raise ValueError
return i, m, d
+
def encode_and_write(conn, comm="", sep="\n"):
# Slow down the writes to help poor ol' serial-over-telnet
for c in comm + sep:
- conn.write(bytes(c, 'ascii'))
+ conn.write(bytes(c, "ascii"))
time.sleep(0.01)
+
def init_telnet(host, port=0, timeout=None):
conn = Telnet(host, port, timeout)
conn.encode_and_write = MethodType(encode_and_write, conn)
@@ -58,13 +62,16 @@
try:
conn.encode_and_write()
- i, m, d = conn.expect_or_raise([
- "%s login:" % (mach["platform"]),
- "root@%s:.*#" % (mach["platform"]),
- "root@%s:.*#" % (args.machine),
- "ast#",
- "# ",
- ], 5)
+ i, m, d = conn.expect_or_raise(
+ [
+ "%s login:" % (mach["platform"]),
+ "root@%s:.*#" % (mach["platform"]),
+ "root@%s:.*#" % (args.machine),
+ "ast#",
+ "# ",
+ ],
+ 5,
+ )
if i != 3:
if i == 0:
@@ -81,27 +88,35 @@
for comm in mach["u-boot"]["commands"]:
conn.encode_and_write(comm)
if "tftp" in comm:
- i, m, d = conn.expect_or_raise([
- r"Bytes transferred = \d+ \([0-9a-f]+ hex\)",
- "Not retrying...",
- r"## Warning:",
- r"[*]{3} ERROR:",
- "Abort",
- "Retry count exceeded; starting again",
- ])
+ i, m, d = conn.expect_or_raise(
+ [
+ r"Bytes transferred = \d+ \([0-9a-f]+ hex\)",
+ "Not retrying...",
+ r"## Warning:",
+ r"[*]{3} ERROR:",
+ "Abort",
+ "Retry count exceeded; starting again",
+ ]
+ )
if i > 0:
print("Error detected, exiting", file=sys.stderr)
return
if args.initramfs:
- conn.encode_and_write("setenv bootargs console=ttyS4,115200n root=/dev/ram rw earlyprintk debug")
+ conn.encode_and_write(
+ "setenv bootargs "
+ + "console=ttyS4,115200n root=/dev/ram rw earlyprintk debug"
+ )
conn.read_until(b"ast#")
else:
conn.encode_and_write("printenv set_bootargs")
- i, m, d = conn.expect_or_raise([
- "set_bootargs=.*$",
- "## Error: \"set_bootargs\" not defined",
- ], 1)
+ i, m, d = conn.expect_or_raise(
+ [
+ "set_bootargs=.*$",
+ '## Error: "set_bootargs" not defined',
+ ],
+ 1,
+ )
if i == 0:
conn.encode_and_write("run set_bootargs")
conn.read_until(b"ast#")
@@ -111,5 +126,6 @@
finally:
conn.close()
+
if __name__ == "__main__":
main()
diff --git a/reboot-ping-pong/rpp b/reboot-ping-pong/rpp
index 251e250..78009c2 100755
--- a/reboot-ping-pong/rpp
+++ b/reboot-ping-pong/rpp
@@ -1,20 +1,20 @@
#!/usr/bin/python3
-import argparse
-import pexpect
import sys
import time
-
from collections import namedtuple
+import pexpect
+
Endpoint = namedtuple("Endpoint", "host, port")
Credentials = namedtuple("Credentials", "username, password")
Target = namedtuple("Target", "credentials, endpoint")
Entity = namedtuple("Entity", "console, ssh")
Machine = namedtuple("Machine", "bmc, host")
+
class Obmcutil(object):
- BMC_READY = "xyz.openbmc_project.State.BMC.BMCState.Ready"
+ BMC_READY = "xyz.openbmc_project.State.BMC.BMCState.Ready"
BMC_NOT_READY = "xyz.openbmc_project.State.BMC.BMCState.NotReady"
HOST_OFF = "xyz.openbmc_project.State.Host.HostState.Off"
@@ -37,10 +37,17 @@
return rc
def hoststate(self):
- return self._state("hoststate", "xyz\.openbmc_project\.State\.Host\.HostState\.(Off|Running|Quiesced)")
+ return self._state(
+ "hoststate",
+ "xyz\\.openbmc_project\\.State\\.Host\\.HostState\\."
+ + "(Off|Running|Quiesced)",
+ )
def bmcstate(self):
- return self._state("bmcstate", "xyz\.openbmc_project\.State\.BMC\.BMCState\.(Not)?Ready")
+ return self._state(
+ "bmcstate",
+ "xyz\\.openbmc_project\\.State\\.BMC\\.BMCState\\.(Not)?Ready",
+ )
def poweron(self):
self.session.sendline("obmcutil -w poweron")
@@ -50,6 +57,7 @@
self.session.sendline("obmcutil -w chassisoff")
self.session.expect(self.prompt)
+
class PexpectLogger(object):
def write(self, bstring):
try:
@@ -76,19 +84,29 @@
self.login()
def login(self):
- self.session.sendline(self.entity.console.credentials.username.encode())
+ self.session.sendline(
+ self.entity.console.credentials.username.encode()
+ )
self.session.expect("Password: ".encode())
- self.session.sendline(self.entity.console.credentials.password.encode())
+ self.session.sendline(
+ self.entity.console.credentials.password.encode()
+ )
self.session.expect(self.shell)
def reboot(self):
self.session.sendline("reboot")
- self.session.expect("Hit any key to stop autoboot:".encode(), timeout=None)
+ self.session.expect(
+ "Hit any key to stop autoboot:".encode(), timeout=None
+ )
self.session.expect(self.getty, timeout=None)
self.login()
state = self.obmcutil.bmcstate()
while state != self.obmcutil.BMC_READY:
- print("Wanted state '{}', got state '{}'".format(self.obmcutil.BMC_READY, state))
+ print(
+ "Wanted state '{}', got state '{}'".format(
+ self.obmcutil.BMC_READY, state
+ )
+ )
time.sleep(5)
state = self.obmcutil.bmcstate()
@@ -102,6 +120,7 @@
self.obmcutil.chassisoff()
self.obmcutil.poweron()
+
class Host(object):
def __init__(self, entity, bmc):
self.shell = "/? *#".encode()
@@ -112,35 +131,42 @@
self.connect()
def connect(self):
- fargs = (self.entity.console.endpoint.port,
- self.entity.console.credentials.username,
- self.entity.console.endpoint.host)
+ fargs = (
+ self.entity.console.endpoint.port,
+ self.entity.console.credentials.username,
+ self.entity.console.endpoint.host,
+ )
self.session = pexpect.spawn("ssh -p{} {}@{}".format(*fargs))
self.session.logfile = PexpectLogger()
self.session.expect("password:".encode())
- self.session.sendline(self.entity.console.credentials.password.encode())
+ self.session.sendline(
+ self.entity.console.credentials.password.encode()
+ )
def poweron(self):
self.bmc.chassisoff()
self.bmc.poweron()
- self.session.send('\f')
+ self.session.send("\f")
rc = self.session.expect([self.petitboot, self.shell], timeout=None)
if rc == 0:
self.session.sendline()
self.session.expect(self.shell)
def reboot(self):
- self.session.send('\f')
+ self.session.send("\f")
rc = self.session.expect([self.petitboot, self.shell], timeout=None)
if rc == 0:
self.session.sendline()
self.session.expect(self.shell)
self.session.sendline("reboot".encode())
- self.session.expect("INIT: Waiting for kernel...".encode(), timeout=None)
+ self.session.expect(
+ "INIT: Waiting for kernel...".encode(), timeout=None
+ )
self.session.expect("Petitboot".encode(), timeout=None)
self.session.sendline()
self.session.expect(self.shell)
+
def rpp(machine):
bmc = Bmc(machine.bmc)
host = Host(machine.host, bmc)
@@ -150,14 +176,22 @@
host.connect()
host.reboot()
+
def main():
bmccreds = Credentials("root", "0penBmc")
- b = Entity(Target(bmccreds, Endpoint("serial.concentrator.somewhere.com", 1234)),
- Target(bmccreds, Endpoint("bmc.somewhere.com", 22)))
- h = Entity(Target(bmccreds, Endpoint("bmc.somewhere.com", 2200)),
- Target(Credentials("user", "password"), Endpoint("host.somewhere.com", 22)))
+ b = Entity(
+ Target(bmccreds, Endpoint("serial.concentrator.somewhere.com", 1234)),
+ Target(bmccreds, Endpoint("bmc.somewhere.com", 22)),
+ )
+ h = Entity(
+ Target(bmccreds, Endpoint("bmc.somewhere.com", 2200)),
+ Target(
+ Credentials("user", "password"), Endpoint("host.somewhere.com", 22)
+ ),
+ )
m = Machine(b, h)
return rpp(m)
+
if __name__ == "__main__":
main()
diff --git a/rootfs_size/rootfs_size.py b/rootfs_size/rootfs_size.py
index 3e21b78..db2dd7b 100755
--- a/rootfs_size/rootfs_size.py
+++ b/rootfs_size/rootfs_size.py
@@ -1,34 +1,44 @@
#!/usr/bin/python3
-import subprocess
-import tempfile
-import os
-from os.path import join, getsize
import argparse
-from multiprocessing import Pool, cpu_count
+import os
import shutil
+import subprocess
import sys
+import tempfile
+from multiprocessing import Pool, cpu_count
+from os.path import getsize
# Set command line arguments
parser = argparse.ArgumentParser(
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter
+)
-parser.add_argument("-b", "--build_dir",
- dest="BUILD_DIR",
- default="/home/ed/openbmc-openbmc",
- help="Build directory path.")
+parser.add_argument(
+ "-b",
+ "--build_dir",
+ dest="BUILD_DIR",
+ default="/home/ed/openbmc-openbmc",
+ help="Build directory path.",
+)
-parser.add_argument("-s", "--squashfs_file",
- dest="SQUASHFS_FILE",
- default="/build/tmp/deploy/images/wolfpass" +
- "/intel-platforms-wolfpass.squashfs-xz",
- help="Squashfs file.")
+parser.add_argument(
+ "-s",
+ "--squashfs_file",
+ dest="SQUASHFS_FILE",
+ default="/build/tmp/deploy/images/wolfpass"
+ + "/intel-platforms-wolfpass.squashfs-xz",
+ help="Squashfs file.",
+)
-parser.add_argument("-t", "--threads",
- dest="threads",
- default=int(cpu_count()),
- type=int,
- help="Number of threads to use (defaults to cpu count)")
+parser.add_argument(
+ "-t",
+ "--threads",
+ dest="threads",
+ default=int(cpu_count()),
+ type=int,
+ help="Number of threads to use (defaults to cpu count)",
+)
args = parser.parse_args()
@@ -48,26 +58,39 @@
def get_unsquash_results(filepath):
with tempfile.TemporaryDirectory() as newsquashfsroot:
input_path = os.path.join(newsquashfsroot, "input")
- shutil.copytree(squashfsdir, input_path, symlinks=True,
- ignore_dangling_symlinks=True)
+ shutil.copytree(
+ squashfsdir,
+ input_path,
+ symlinks=True,
+ ignore_dangling_symlinks=True,
+ )
file_to_remove = os.path.join(input_path, filepath)
try:
os.remove(file_to_remove)
except IsADirectoryError:
shutil.rmtree(file_to_remove)
subprocess.check_output(
- ["mksquashfs", input_path,
- newsquashfsroot + "/test", "-comp", "xz", '-processors', '1'])
+ [
+ "mksquashfs",
+ input_path,
+ newsquashfsroot + "/test",
+ "-comp",
+ "xz",
+ "-processors",
+ "1",
+ ]
+ )
- return ((filepath.replace(squashfsdir, ""),
- original_size -
- getsize(newsquashfsroot + "/test")))
+ return (
+ filepath.replace(squashfsdir, ""),
+ original_size - getsize(newsquashfsroot + "/test"),
+ )
with tempfile.TemporaryDirectory() as tempsquashfsdir:
print("writing to " + tempsquashfsdir)
squashfsdir = os.path.join(tempsquashfsdir, "squashfs-root")
- #squashfsdir = os.path.join("/tmp", "squashfs-root")
+ # squashfsdir = os.path.join("/tmp", "squashfs-root")
command = ["unsquashfs", "-d", squashfsdir, SQUASHFS]
print(" ".join(command))
subprocess.check_call(command)
@@ -79,20 +102,24 @@
if not os.path.islink(filepath):
if getsize(filepath) > FILE_SIZE_LIMIT:
files_to_test.append(
- os.path.relpath(filepath, squashfsdir))
+ os.path.relpath(filepath, squashfsdir)
+ )
print("{} files to attempt removing".format(len(files_to_test)))
print("Using {} threads".format(args.threads))
with Pool(args.threads) as p:
- for i, res in enumerate(p.imap_unordered(get_unsquash_results, files_to_test)):
+ for i, res in enumerate(
+ p.imap_unordered(get_unsquash_results, files_to_test)
+ ):
results.append(res)
- sys.stderr.write('\rdone {:.1f}%'.format(
- 100 * (i/len(files_to_test))))
+ sys.stderr.write(
+ "\rdone {:.1f}%".format(100 * (i / len(files_to_test)))
+ )
results.sort(key=lambda x: x[1], reverse=True)
-with open("results.txt", 'w') as result_file:
+with open("results.txt", "w") as result_file:
for filepath, size in results:
result = "{:>10}: {}".format(size, filepath)
print(result)
diff --git a/sensor_yaml_config/sensor_yaml_config.py b/sensor_yaml_config/sensor_yaml_config.py
index 358bd65..3f81594 100755
--- a/sensor_yaml_config/sensor_yaml_config.py
+++ b/sensor_yaml_config/sensor_yaml_config.py
@@ -1,10 +1,10 @@
#!/usr/bin/env python3
-import yaml
import argparse
-
from typing import NamedTuple
+import yaml
+
class RptSensor(NamedTuple):
name: str
@@ -17,93 +17,75 @@
sampleDimmTemp = {
- 'bExp': 0,
- 'entityID': 32,
- 'entityInstance': 2,
- 'interfaces': {
- 'xyz.openbmc_project.Sensor.Value': {
- 'Value': {
- 'Offsets': {
- 255: {
- 'type': 'int64_t'
- }
- }
- }
+ "bExp": 0,
+ "entityID": 32,
+ "entityInstance": 2,
+ "interfaces": {
+ "xyz.openbmc_project.Sensor.Value": {
+ "Value": {"Offsets": {255: {"type": "int64_t"}}}
}
},
- 'multiplierM': 1,
- 'mutability': 'Mutability::Write|Mutability::Read',
- 'offsetB': -127,
- 'path': '/xyz/openbmc_project/sensors/temperature/dimm0_temp',
- 'rExp': 0,
- 'readingType': 'readingData',
- 'scale': -3,
- 'sensorNamePattern': 'nameLeaf',
- 'sensorReadingType': 1,
- 'sensorType': 1,
- 'serviceInterface': 'org.freedesktop.DBus.Properties',
- 'unit': 'xyz.openbmc_project.Sensor.Value.Unit.DegreesC'
+ "multiplierM": 1,
+ "mutability": "Mutability::Write|Mutability::Read",
+ "offsetB": -127,
+ "path": "/xyz/openbmc_project/sensors/temperature/dimm0_temp",
+ "rExp": 0,
+ "readingType": "readingData",
+ "scale": -3,
+ "sensorNamePattern": "nameLeaf",
+ "sensorReadingType": 1,
+ "sensorType": 1,
+ "serviceInterface": "org.freedesktop.DBus.Properties",
+ "unit": "xyz.openbmc_project.Sensor.Value.Unit.DegreesC",
}
sampleCoreTemp = {
- 'bExp': 0,
- 'entityID': 208,
- 'entityInstance': 2,
- 'interfaces': {
- 'xyz.openbmc_project.Sensor.Value': {
- 'Value': {
- 'Offsets': {
- 255: {
- 'type': 'int64_t'
- }
- }
- }
+ "bExp": 0,
+ "entityID": 208,
+ "entityInstance": 2,
+ "interfaces": {
+ "xyz.openbmc_project.Sensor.Value": {
+ "Value": {"Offsets": {255: {"type": "int64_t"}}}
}
},
- 'multiplierM': 1,
- 'mutability': 'Mutability::Write|Mutability::Read',
- 'offsetB': -127,
- 'path': '/xyz/openbmc_project/sensors/temperature/p0_core0_temp',
- 'rExp': 0,
- 'readingType': 'readingData',
- 'scale': -3,
- 'sensorNamePattern': 'nameLeaf',
- 'sensorReadingType': 1,
- 'sensorType': 1,
- 'serviceInterface': 'org.freedesktop.DBus.Properties',
- 'unit': 'xyz.openbmc_project.Sensor.Value.Unit.DegreesC'
+ "multiplierM": 1,
+ "mutability": "Mutability::Write|Mutability::Read",
+ "offsetB": -127,
+ "path": "/xyz/openbmc_project/sensors/temperature/p0_core0_temp",
+ "rExp": 0,
+ "readingType": "readingData",
+ "scale": -3,
+ "sensorNamePattern": "nameLeaf",
+ "sensorReadingType": 1,
+ "sensorType": 1,
+ "serviceInterface": "org.freedesktop.DBus.Properties",
+ "unit": "xyz.openbmc_project.Sensor.Value.Unit.DegreesC",
}
samplePower = {
- 'bExp': 0,
- 'entityID': 10,
- 'entityInstance': 13,
- 'interfaces': {
- 'xyz.openbmc_project.Sensor.Value': {
- 'Value': {
- 'Offsets': {
- 255: {
- 'type': 'int64_t'
- }
- }
- }
+ "bExp": 0,
+ "entityID": 10,
+ "entityInstance": 13,
+ "interfaces": {
+ "xyz.openbmc_project.Sensor.Value": {
+ "Value": {"Offsets": {255: {"type": "int64_t"}}}
}
},
- 'multiplierM': 2,
- 'offsetB': 0,
- 'path': '/xyz/openbmc_project/sensors/power/p0_power',
- 'rExp': 0,
- 'readingType': 'readingData',
- 'scale': -6,
- 'sensorNamePattern': 'nameLeaf',
- 'sensorReadingType': 1,
- 'sensorType': 8,
- 'serviceInterface': 'org.freedesktop.DBus.Properties',
- 'unit': 'xyz.openbmc_project.Sensor.Value.Unit.Watts'
+ "multiplierM": 2,
+ "offsetB": 0,
+ "path": "/xyz/openbmc_project/sensors/power/p0_power",
+ "rExp": 0,
+ "readingType": "readingData",
+ "scale": -6,
+ "sensorNamePattern": "nameLeaf",
+ "sensorReadingType": 1,
+ "sensorType": 8,
+ "serviceInterface": "org.freedesktop.DBus.Properties",
+ "unit": "xyz.openbmc_project.Sensor.Value.Unit.Watts",
}
sampleDcmiSensor = {
"instance": 1,
"dbus": "/xyz/openbmc_project/sensors/temperature/p0_core0_temp",
- "record_id": 91
+ "record_id": 91,
}
@@ -115,8 +97,9 @@
if safe:
noaliasDumper = yaml.dumper.SafeDumper
noaliasDumper.ignore_aliases = lambda self, data: True
- yaml.dump(y, open(f, "w"), default_flow_style=False,
- Dumper=noaliasDumper)
+ yaml.dump(
+ y, open(f, "w"), default_flow_style=False, Dumper=noaliasDumper
+ )
else:
yaml.dump(y, open(f, "w"))
@@ -127,11 +110,13 @@
if match is None:
# Workaround for P8's occ sensors, where the path look like
# /org/open_power/control/occ_3_0050
- if '/org/open_power/control/occ' in p \
- and 'org.open_power.OCC.Status' in intfs:
- return (210, 'nameLeaf')
- raise Exception('Unable to find sensor', key, 'from map')
- return (m[key]['entityID'], m[key]['sensorNamePattern'])
+ if (
+ "/org/open_power/control/occ" in p
+ and "org.open_power.OCC.Status" in intfs
+ ):
+ return (210, "nameLeaf")
+ raise Exception("Unable to find sensor", key, "from map")
+ return (m[key]["entityID"], m[key]["sensorNamePattern"])
# Global entity instances
@@ -152,7 +137,7 @@
next(f)
next(f)
for line in f:
- fields = line.strip().split('|')
+ fields = line.strip().split("|")
fields = list(map(str.strip, fields))
sensor = RptSensor(
fields[0],
@@ -161,7 +146,8 @@
int(fields[4], 16) if fields[4] else None,
int(fields[5], 16) if fields[5] else None,
int(fields[7], 16) if fields[7] else None,
- fields[9])
+ fields[9],
+ )
# print(sensor)
sensors.append(sensor)
return sensors
@@ -171,239 +157,303 @@
# Convert path like: /sys-0/node-0/motherboard-0/dimmconn-0/dimm-0
# to: /xyz/openbmc_project/sensors/temperature/dimm0_temp
import re
- dimmconn = re.search(r'dimmconn-\d+', p).group()
- dimmId = re.search(r'\d+', dimmconn).group()
- return '/xyz/openbmc_project/sensors/temperature/dimm{}_temp'.format(dimmId)
+
+ dimmconn = re.search(r"dimmconn-\d+", p).group()
+ dimmId = re.search(r"\d+", dimmconn).group()
+ return "/xyz/openbmc_project/sensors/temperature/dimm{}_temp".format(
+ dimmId
+ )
def getMembufTempPath(name):
# Convert names like MEMBUF0_Temp or CENTAUR0_Temp
# to: /xyz/openbmc_project/sensors/temperature/membuf0_temp
# to: /xyz/openbmc_project/sensors/temperature/centaur0_temp
- return '/xyz/openbmc_project/sensors/temperature/{}'.format(name.lower())
+ return "/xyz/openbmc_project/sensors/temperature/{}".format(name.lower())
def getCoreTempPath(name, p):
# For different rpts:
# Convert path like:
- # /sys-0/node-0/motherboard-0/proc_socket-0/module-0/p9_proc_s/eq0/ex0/core0 (for P9)
+ # /sys-0/node-0/motherboard-0/proc_socket-0/module-0/p9_proc_s/eq0/ex0/core0 (for P9) # noqa: E501
# to: /xyz/openbmc_project/sensors/temperature/p0_core0_temp
# or name like: CORE0_Temp (for P8)
# to: /xyz/openbmc_project/sensors/temperature/core0_temp (for P8)
import re
- if 'p9_proc' in p:
- splitted = p.split('/')
- socket = re.search(r'\d+', splitted[4]).group()
- core = re.search(r'\d+', splitted[9]).group()
- return '/xyz/openbmc_project/sensors/temperature/p{}_core{}_temp'.format(socket, core)
+
+ if "p9_proc" in p:
+ splitted = p.split("/")
+ socket = re.search(r"\d+", splitted[4]).group()
+ core = re.search(r"\d+", splitted[9]).group()
+ return (
+ "/xyz/openbmc_project/sensors/temperature/p{}_core{}_temp".format(
+ socket, core
+ )
+ )
else:
- core = re.search(r'\d+', name).group()
- return '/xyz/openbmc_project/sensors/temperature/core{}_temp'.format(core)
+ core = re.search(r"\d+", name).group()
+ return "/xyz/openbmc_project/sensors/temperature/core{}_temp".format(
+ core
+ )
def getPowerPath(name):
# Convert name like Proc0_Power
# to: /xyz/openbmc_project/sensors/power/p0_power
import re
- r = re.search(r'\d+', name)
+
+ r = re.search(r"\d+", name)
if r:
index = r.group()
else:
# Handle cases like IO_A_Power, Storage_Power_A
- r = re.search(r'_[A|B|C|D]', name).group()[-1]
- index = str(ord(r) - ord('A'))
- prefix = 'p'
+ r = re.search(r"_[A|B|C|D]", name).group()[-1]
+ index = str(ord(r) - ord("A"))
+ prefix = "p"
m = None
- if 'memory_proc' in name.lower():
+ if "memory_proc" in name.lower():
prefix = None
- m = 'centaur'
- elif 'pcie_proc' in name.lower():
- m = 'pcie'
- elif 'io' in name.lower():
- m = 'io'
- elif 'fan' in name.lower():
- m = 'fan'
- elif 'storage' in name.lower():
- m = 'disk'
- elif 'total' in name.lower():
+ m = "centaur"
+ elif "pcie_proc" in name.lower():
+ m = "pcie"
+ elif "io" in name.lower():
+ m = "io"
+ elif "fan" in name.lower():
+ m = "fan"
+ elif "storage" in name.lower():
+ m = "disk"
+ elif "total" in name.lower():
prefix = None
- m = 'total'
- elif 'proc' in name.lower():
+ m = "total"
+ elif "proc" in name.lower():
# Default
pass
- ret = '/xyz/openbmc_project/sensors/power/'
+ ret = "/xyz/openbmc_project/sensors/power/"
if prefix:
ret = ret + prefix + index
if m:
if prefix:
- ret = ret + '_' + m
+ ret = ret + "_" + m
else:
ret = ret + m
if prefix is None:
ret = ret + index
- ret = ret + '_power'
+ ret = ret + "_power"
return ret
def getDimmTempConfig(s):
r = sampleDimmTemp.copy()
- r['entityInstance'] = getEntityInstance(r['entityID'])
- r['path'] = getDimmTempPath(s.targetPath)
+ r["entityInstance"] = getEntityInstance(r["entityID"])
+ r["path"] = getDimmTempPath(s.targetPath)
return r
def getMembufTempConfig(s):
r = sampleDimmTemp.copy()
- r['entityID'] = 0xD1
- r['entityInstance'] = getEntityInstance(r['entityID'])
- r['path'] = getMembufTempPath(s.name)
+ r["entityID"] = 0xD1
+ r["entityInstance"] = getEntityInstance(r["entityID"])
+ r["path"] = getMembufTempPath(s.name)
return r
def getCoreTempConfig(s):
r = sampleCoreTemp.copy()
- r['entityInstance'] = getEntityInstance(r['entityID'])
- r['path'] = getCoreTempPath(s.name, s.targetPath)
+ r["entityInstance"] = getEntityInstance(r["entityID"])
+ r["path"] = getCoreTempPath(s.name, s.targetPath)
return r
def getPowerConfig(s):
r = samplePower.copy()
- r['entityInstance'] = getEntityInstance(r['entityID'])
- r['path'] = getPowerPath(s.name)
+ r["entityInstance"] = getEntityInstance(r["entityID"])
+ r["path"] = getPowerPath(s.name)
return r
def isCoreTemp(p):
import re
- m = re.search(r'p\d+_core\d+_temp', p)
+
+ m = re.search(r"p\d+_core\d+_temp", p)
return m is not None
def getDcmiSensor(i, sensor):
import re
- path = sensor['path']
- name = path.split('/')[-1]
- m = re.findall(r'\d+', name)
+
+ path = sensor["path"]
+ name = path.split("/")[-1]
+ m = re.findall(r"\d+", name)
socket, core = int(m[0]), int(m[1])
instance = socket * 24 + core + 1
r = sampleDcmiSensor.copy()
- r['instance'] = instance
- r['dbus'] = path
- r['record_id'] = i
+ r["instance"] = instance
+ r["dbus"] = path
+ r["record_id"] = i
return r
def saveJson(data, f):
import json
- with open(f, 'w') as outfile:
+
+ with open(f, "w") as outfile:
json.dump(data, outfile, indent=4)
def main():
parser = argparse.ArgumentParser(
- description='Yaml tool for updating ipmi sensor yaml config')
- parser.add_argument('-i', '--input', required=True, dest='input',
- help='The ipmi sensor yaml config')
- parser.add_argument('-o', '--output', required=True, dest='output',
- help='The output yaml file')
- parser.add_argument('-m', '--map', dest='map', default='sensor_map.yaml',
- help='The sample map yaml file')
- parser.add_argument('-r', '--rpt', dest='rpt',
- help='The .rpt file generated by op-build')
- parser.add_argument('-f', '--fix', action='store_true',
- help='Fix entities and sensorNamePattern')
+ description="Yaml tool for updating ipmi sensor yaml config"
+ )
+ parser.add_argument(
+ "-i",
+ "--input",
+ required=True,
+ dest="input",
+ help="The ipmi sensor yaml config",
+ )
+ parser.add_argument(
+ "-o",
+ "--output",
+ required=True,
+ dest="output",
+ help="The output yaml file",
+ )
+ parser.add_argument(
+ "-m",
+ "--map",
+ dest="map",
+ default="sensor_map.yaml",
+ help="The sample map yaml file",
+ )
+ parser.add_argument(
+ "-r", "--rpt", dest="rpt", help="The .rpt file generated by op-build"
+ )
+ parser.add_argument(
+ "-f",
+ "--fix",
+ action="store_true",
+ help="Fix entities and sensorNamePattern",
+ )
# -g expects output as yaml for mapping of entityID/sensorNamePattern
# -d expects output as json config for dcmi sensors
# Do not mess the output by enforcing only one argument is passed
- # TODO: -f and -r could be used together, and they are conflicted with -g or -d
+ # TODO: -f and -r could be used together, and they are conflicted with
+ # -g or -d
group = parser.add_mutually_exclusive_group()
- group.add_argument('-g', '--generate', action='store_true',
- help='Generate maps for entityID and sensorNamePattern')
- group.add_argument('-d', '--dcmi', action='store_true',
- help='Generate dcmi sensors json config')
+ group.add_argument(
+ "-g",
+ "--generate",
+ action="store_true",
+ help="Generate maps for entityID and sensorNamePattern",
+ )
+ group.add_argument(
+ "-d",
+ "--dcmi",
+ action="store_true",
+ help="Generate dcmi sensors json config",
+ )
args = parser.parse_args()
args = vars(args)
- if args['input'] is None or args['output'] is None:
+ if args["input"] is None or args["output"] is None:
parser.print_help()
exit(1)
- y = openYaml(args['input'])
+ y = openYaml(args["input"])
- if args['fix']:
+ if args["fix"]:
# Fix entities and sensorNamePattern
- m = openYaml(args['map'])
+ m = openYaml(args["map"])
for i in y:
- path = y[i]['path']
- intfs = tuple(sorted(list(y[i]['interfaces'].keys())))
+ path = y[i]["path"]
+ intfs = tuple(sorted(list(y[i]["interfaces"].keys())))
entityId, namePattern = getEntityIdAndNamePattern(path, intfs, m)
- y[i]['entityID'] = entityId
- y[i]['entityInstance'] = getEntityInstance(entityId)
- y[i]['sensorNamePattern'] = namePattern
- print(y[i]['path'], "id:", entityId,
- "instance:", y[i]['entityInstance'])
+ y[i]["entityID"] = entityId
+ y[i]["entityInstance"] = getEntityInstance(entityId)
+ y[i]["sensorNamePattern"] = namePattern
+ print(
+ y[i]["path"],
+ "id:",
+ entityId,
+ "instance:",
+ y[i]["entityInstance"],
+ )
sensorIds = list(y.keys())
- if args['rpt']:
+ if args["rpt"]:
unhandledSensors = []
- rptSensors = loadRpt(args['rpt'])
+ rptSensors = loadRpt(args["rpt"])
for s in rptSensors:
if s.sensorId is not None and s.sensorId not in sensorIds:
- print("Sensor ID", s.sensorId, "not in yaml:",
- s.name, ", path:", s.targetPath)
+ print(
+ "Sensor ID",
+ s.sensorId,
+ "not in yaml:",
+ s.name,
+ ", path:",
+ s.targetPath,
+ )
isAdded = False
- if 'temp' in s.name.lower():
- if 'dimm' in s.targetPath.lower():
+ if "temp" in s.name.lower():
+ if "dimm" in s.targetPath.lower():
y[s.sensorId] = getDimmTempConfig(s)
isAdded = True
- elif 'core' in s.targetPath.lower():
+ elif "core" in s.targetPath.lower():
y[s.sensorId] = getCoreTempConfig(s)
isAdded = True
- elif 'centaur' in s.name.lower() or 'membuf' in s.name.lower():
+ elif (
+ "centaur" in s.name.lower()
+ or "membuf" in s.name.lower()
+ ):
y[s.sensorId] = getMembufTempConfig(s)
isAdded = True
- elif s.name.lower().endswith('_power'):
+ elif s.name.lower().endswith("_power"):
y[s.sensorId] = getPowerConfig(s)
isAdded = True
if isAdded:
- print('Added sensor id:', s.sensorId,
- ', path:', y[s.sensorId]['path'])
+ print(
+ "Added sensor id:",
+ s.sensorId,
+ ", path:",
+ y[s.sensorId]["path"],
+ )
else:
unhandledSensors.append(s)
- print('Unhandled sensors:')
+ print("Unhandled sensors:")
for s in unhandledSensors:
print(s)
- if args['generate']:
+ if args["generate"]:
m = {}
for i in y:
- path = y[i]['path']
- intfs = tuple(sorted(list(y[i]['interfaces'].keys())))
- entityId = y[i]['entityID']
- sensorNamePattern = y[i]['sensorNamePattern']
- m[(path, intfs)] = {'entityID': entityId,
- 'sensorNamePattern': sensorNamePattern}
+ path = y[i]["path"]
+ intfs = tuple(sorted(list(y[i]["interfaces"].keys())))
+ entityId = y[i]["entityID"]
+ sensorNamePattern = y[i]["sensorNamePattern"]
+ m[(path, intfs)] = {
+ "entityID": entityId,
+ "sensorNamePattern": sensorNamePattern,
+ }
y = m
- if args['dcmi']:
+ if args["dcmi"]:
d = []
for i in y:
- if isCoreTemp(y[i]['path']):
+ if isCoreTemp(y[i]["path"]):
s = getDcmiSensor(i, y[i])
d.append(s)
print(s)
- saveJson(d, args['output'])
+ saveJson(d, args["output"])
return
- safe = False if args['generate'] else True
- saveYaml(y, args['output'], safe)
+ safe = False if args["generate"] else True
+ saveYaml(y, args["output"], safe)
if __name__ == "__main__":
diff --git a/tof-voters/libvoters/entry_point.py b/tof-voters/libvoters/entry_point.py
index 1dd50c3..a6cc1ea 100644
--- a/tof-voters/libvoters/entry_point.py
+++ b/tof-voters/libvoters/entry_point.py
@@ -2,7 +2,7 @@
import argparse
from importlib import import_module
-from typing import List
+
def main() -> int:
parser = argparse.ArgumentParser(description="Obtain TOF voter metrics")
@@ -22,7 +22,7 @@
import_module("libvoters.subcmd.dump-gerrit"),
import_module("libvoters.subcmd.report"),
]
- commands = [x.subcmd(subparser) for x in commands] # type: ignore
+ commands = [x.subcmd(subparser) for x in commands] # type: ignore
args = parser.parse_args()
diff --git a/tof-voters/libvoters/subcmd/analyze-commits.py b/tof-voters/libvoters/subcmd/analyze-commits.py
index 4717991..4939a0b 100644
--- a/tof-voters/libvoters/subcmd/analyze-commits.py
+++ b/tof-voters/libvoters/subcmd/analyze-commits.py
@@ -2,13 +2,14 @@
import argparse
import json
-import libvoters.acceptable as acceptable
import os
import re
from collections import defaultdict
-from libvoters.time import timestamp, TimeOfDay
from typing import Any, Dict
+import libvoters.acceptable as acceptable
+from libvoters.time import TimeOfDay, timestamp
+
class subcmd:
def __init__(self, parser: argparse._SubParsersAction) -> None:
@@ -42,7 +43,7 @@
if not os.path.isfile(path):
continue
- if not re.match("[0-9]*\.json", f):
+ if not re.match(r"[0-9]*\.json", f):
continue
with open(path, "r") as file:
diff --git a/tof-voters/libvoters/subcmd/analyze-reviews.py b/tof-voters/libvoters/subcmd/analyze-reviews.py
index b3734b9..b3323fa 100644
--- a/tof-voters/libvoters/subcmd/analyze-reviews.py
+++ b/tof-voters/libvoters/subcmd/analyze-reviews.py
@@ -4,11 +4,12 @@
import json
import os
import re
-import libvoters.acceptable as acceptable
from collections import defaultdict
-from libvoters.time import timestamp, TimeOfDay
from typing import Dict
+import libvoters.acceptable as acceptable
+from libvoters.time import TimeOfDay, timestamp
+
class subcmd:
def __init__(self, parser: argparse._SubParsersAction) -> None:
@@ -42,7 +43,7 @@
if not os.path.isfile(path):
continue
- if not re.match("[0-9]*\.json", f):
+ if not re.match(r"[0-9]*\.json", f):
continue
with open(path, "r") as file:
diff --git a/tof-voters/libvoters/subcmd/report.py b/tof-voters/libvoters/subcmd/report.py
index 13726d9..87aa713 100644
--- a/tof-voters/libvoters/subcmd/report.py
+++ b/tof-voters/libvoters/subcmd/report.py
@@ -7,13 +7,10 @@
class subcmd:
def __init__(self, parser: argparse._SubParsersAction) -> None:
- p = parser.add_parser(
- "report", help="Create final report"
- )
+ p = parser.add_parser("report", help="Create final report")
p.set_defaults(cmd=self)
-
def run(self, args: argparse.Namespace) -> int:
commits_fp = os.path.join(args.dir, "commits.json")
reviews_fp = os.path.join(args.dir, "reviews.json")
@@ -43,10 +40,14 @@
qualified = points >= 15
- results[user] = { "qualified": qualified, "points": points,
- "commits": user_commits, "reviews": user_reviews }
+ results[user] = {
+ "qualified": qualified,
+ "points": points,
+ "commits": user_commits,
+ "reviews": user_reviews,
+ }
with open(os.path.join(args.dir, "report.json"), "w") as outfile:
- outfile.write(json.dumps(results, indent = 4))
+ outfile.write(json.dumps(results, indent=4))
return 0