maintainers: reformat with black
A number of pep8/pycodestyle issues were reported with the
existing code, so reformat with 'black'.
Signed-off-by: Patrick Williams <patrick@stwcx.xyz>
Change-Id: I8c3a103c1600ab06dde07ff0a469c83ffbd6573f
diff --git a/maintainers/obmc-gerrit b/maintainers/obmc-gerrit
index fae6725..b5bbbb9 100755
--- a/maintainers/obmc-gerrit
+++ b/maintainers/obmc-gerrit
@@ -16,28 +16,34 @@
git = sh.git.bake()
-def get_reviewers(root: Optional[str]=None, mname: str='MAINTAINERS') -> List[str]:
+
+def get_reviewers(root: Optional[str] = None, mname: str = "MAINTAINERS") -> List[str]:
reviewers = cast(List[str], list())
if not root:
- root = git('rev-parse', '--show-toplevel').strip()
+ root = git("rev-parse", "--show-toplevel").strip()
mfile = os.path.join(root, mname)
if not os.path.exists(mfile):
return reviewers
- with open(mfile, 'r') as mstream:
+ with open(mfile, "r") as mstream:
maintainers.trash_preamble(mstream)
block = maintainers.parse_block(mstream)
if not block:
return reviewers
- mlist = cast(List[maintainers.Identity],
- block[maintainers.LineType.MAINTAINER])
+ mlist = cast(
+ List[maintainers.Identity], block[maintainers.LineType.MAINTAINER]
+ )
reviewers.extend(i.email.address for i in mlist)
if maintainers.LineType.REVIEWER in block:
- rlist = cast(List[maintainers.Identity],
- block[maintainers.LineType.REVIEWER])
+ rlist = cast(
+ List[maintainers.Identity], block[maintainers.LineType.REVIEWER]
+ )
reviewers.extend(i.email.address for i in rlist)
return reviewers
-def gerrit_refspec_args(reviewers: Optional[List[str]]=None, topic: str=None) -> str:
+
+def gerrit_refspec_args(
+ reviewers: Optional[List[str]] = None, topic: str = None
+) -> str:
argl = []
if reviewers:
argl.extend("r={}".format(addr) for addr in reviewers)
@@ -45,25 +51,33 @@
argl.append("topic={}".format(topic))
return ",".join(argl)
+
def decorate_refspec(refspec: str, topic: str) -> str:
gargs = gerrit_refspec_args(get_reviewers(), topic)
if not gargs:
return refspec
- if '%' in refspec:
+ if "%" in refspec:
return "{},{}".format(refspec, gargs)
return "{}%{}".format(refspec, gargs)
+
def do_push(args: argparse.Namespace) -> None:
- git.push(args.remote, decorate_refspec(args.refspec, args.topic),
- _in=sys.stdin, _out=sys.stdout, _err=sys.stderr)
+ git.push(
+ args.remote,
+ decorate_refspec(args.refspec, args.topic),
+ _in=sys.stdin,
+ _out=sys.stdout,
+ _err=sys.stderr,
+ )
+
parser = argparse.ArgumentParser()
-subbies = parser.add_subparsers(dest='subcommand')
+subbies = parser.add_subparsers(dest="subcommand")
subbies.required = True
push = subbies.add_parser("push", help="Push changes to Gerrit with reviewers")
push.add_argument("remote")
push.add_argument("refspec")
-push.add_argument("topic", nargs='?', default=None)
+push.add_argument("topic", nargs="?", default=None)
push.set_defaults(func=do_push)
args = parser.parse_args()
diff --git a/maintainers/obmc/maintainers.py b/maintainers/obmc/maintainers.py
index 2b261af..fe93d5d 100755
--- a/maintainers/obmc/maintainers.py
+++ b/maintainers/obmc/maintainers.py
@@ -7,10 +7,20 @@
import sys
from collections import namedtuple, OrderedDict
from enum import Enum, unique
-from typing import (Dict, NamedTuple, Iterator, Sequence, Union, Optional,
- List, cast, IO)
+from typing import (
+ Dict,
+ NamedTuple,
+ Iterator,
+ Sequence,
+ Union,
+ Optional,
+ List,
+ cast,
+ IO,
+)
from pprint import pprint
+
@unique
class LineType(Enum):
REPO = 1
@@ -19,15 +29,18 @@
FORKED = 4
COMMENT = 5
+
@unique
class ParseState(Enum):
BEGIN = 1
BLOCK = 2
+
Email = NamedTuple("Email", [("name", str), ("address", str)])
Identity = NamedTuple("Identity", [("email", Email), ("irc", Optional[str])])
Entry = NamedTuple("Entry", [("type", LineType), ("content", str)])
+
def parse_line(line: str) -> Optional[Entry]:
sline = line.strip()
if not sline:
@@ -37,73 +50,83 @@
return Entry(LineType.REPO, sline)
tag = line[:2]
- if '@' in tag:
+ if "@" in tag:
return Entry(LineType.REPO, sline[1:].split(":")[0].strip())
- elif tag == 'M:':
+ elif tag == "M:":
return Entry(LineType.MAINTAINER, sline.split(":")[1].strip())
- elif tag == 'R:':
+ elif tag == "R:":
return Entry(LineType.REVIEWER, sline.split(":")[1].strip())
- elif tag == 'F:':
+ elif tag == "F:":
return Entry(LineType.FORKED, sline[2:].strip())
- elif '#' in tag:
+ elif "#" in tag:
return Entry(LineType.COMMENT, line)
return None
+
D = Union[str, List[Identity], List[str]]
+
def parse_repo(content: str) -> str:
return content
+
def parse_forked(content: str) -> str:
return content
+
def parse_irc(src: Iterator[str]) -> Optional[str]:
irc = ""
for c in src:
- if c == '#':
+ if c == "#":
return None
- if c == '<':
+ if c == "<":
break
else:
return None
for c in src:
- if c in '!#':
+ if c in "!#":
return irc.strip()
irc += c
raise ValueError("Unterminated IRC handle")
+
def parse_address(src: Iterator[str]) -> str:
addr = ""
for c in src:
- if c in '>#':
+ if c in ">#":
return addr.strip()
addr += c
raise ValueError("Unterminated email address")
+
def parse_name(src: Iterator[str]) -> str:
name = ""
for c in src:
- if c in '<#':
+ if c in "<#":
return name.strip()
name += c
raise ValueError("Unterminated name")
+
def parse_email(src: Iterator[str]) -> Email:
name = parse_name(src)
address = parse_address(src)
return Email(name, address)
+
def parse_identity(content: str) -> Identity:
ci = iter(content)
email = parse_email(ci)
irc = parse_irc(ci)
return Identity(email, irc)
+
B = Dict[LineType, D]
+
def parse_block(src: Iterator[str]) -> Optional[B]:
state = ParseState.BEGIN
repo = cast(B, OrderedDict())
@@ -121,7 +144,7 @@
if entry.type == LineType.REPO:
repo[entry.type] = parse_repo(entry.content)
- elif entry.type in { LineType.MAINTAINER, LineType.REVIEWER }:
+ elif entry.type in {LineType.MAINTAINER, LineType.REVIEWER}:
if not entry.type in repo:
repo[entry.type] = cast(List[Identity], list())
cast(list, repo[entry.type]).append(parse_identity(entry.content))
@@ -139,6 +162,7 @@
return repo
+
def trash_preamble(src: Iterator[str]) -> None:
s = 0
for line in src:
@@ -148,6 +172,7 @@
if s == 1 and sline == "-------------------------":
break
+
def parse_maintainers(src: Iterator[str]) -> Dict[D, B]:
maintainers = cast(Dict[D, B], OrderedDict())
trash_preamble(src)
@@ -158,19 +183,23 @@
maintainers[repo[LineType.REPO]] = repo
return maintainers
+
def assemble_name(name: str, dst: IO[str]) -> None:
dst.write(name)
+
def assemble_address(address: str, dst: IO[str]) -> None:
dst.write("<")
dst.write(address)
dst.write(">")
+
def assemble_email(email: Email, dst: IO[str]) -> None:
assemble_name(email.name, dst)
dst.write(" ")
assemble_address(email.address, dst)
+
def assemble_irc(irc: Optional[str], dst: IO[str]) -> None:
if irc:
dst.write(" ")
@@ -178,31 +207,37 @@
dst.write(irc)
dst.write("!>")
+
def assemble_identity(identity: Identity, dst: IO[str]) -> None:
assemble_email(identity.email, dst)
assemble_irc(identity.irc, dst)
+
def assemble_maintainers(identities: List[Identity], dst: IO[str]) -> None:
for i in identities:
dst.write("M: ")
assemble_identity(i, dst)
dst.write("\n")
+
def assemble_reviewers(identities: List[Identity], dst: IO[str]) -> None:
for i in identities:
dst.write("R: ")
assemble_identity(i, dst)
dst.write("\n")
+
def assemble_forked(content: str, dst: IO[str]) -> None:
if content:
dst.write("F: ")
dst.write(content)
dst.write("\n")
+
def assemble_comment(content: List[str], dst: IO[str]) -> None:
dst.write("".join(content))
+
def assemble_block(block: B, default: B, dst: IO[str]) -> None:
if LineType.COMMENT in block:
assemble_comment(cast(List[str], block[LineType.COMMENT]), dst)
@@ -216,18 +251,18 @@
if LineType.FORKED in block:
assemble_forked(cast(str, block[LineType.FORKED]), dst)
+
def main() -> None:
parser = argparse.ArgumentParser()
- parser.add_argument("maintainers", type=argparse.FileType('r'),
- default=sys.stdin)
- parser.add_argument("output", type=argparse.FileType('w'),
- default=sys.stdout)
+ parser.add_argument("maintainers", type=argparse.FileType("r"), default=sys.stdin)
+ parser.add_argument("output", type=argparse.FileType("w"), default=sys.stdout)
args = parser.parse_args()
blocks = parse_maintainers(args.maintainers)
for block in blocks.values():
print(block[LineType.REPO])
- assemble_block(block, blocks['MAINTAINERS'], args.output)
+ assemble_block(block, blocks["MAINTAINERS"], args.output)
print()
+
if __name__ == "__main__":
main()