| #!/usr/bin/python3 |
| # |
| # SPDX-License-Identifier: Apache-2.0 |
| # Copyright (C) 2018 IBM Corp. |
| |
| import argparse |
| import sys |
| from collections import namedtuple, OrderedDict |
| from enum import Enum, unique |
| from typing import ( |
| Dict, |
| NamedTuple, |
| Iterator, |
| Sequence, |
| Union, |
| Optional, |
| List, |
| cast, |
| IO, |
| ) |
| from pprint import pprint |
| from .reviewlist import ReviewList |
| |
| |
| @unique |
| class LineType(Enum): |
| REPO = 1 |
| MAINTAINER = 2 |
| REVIEWER = 3 |
| FORKED = 4 |
| COMMENT = 5 |
| |
| |
| @unique |
| class ParseState(Enum): |
| BEGIN = 1 |
| BLOCK = 2 |
| |
| |
| Email = NamedTuple("Email", [("name", str), ("address", str)]) |
| Identity = NamedTuple("Identity", [("email", Email), ("irc", Optional[str])]) |
| Entry = NamedTuple("Entry", [("type", LineType), ("content", str)]) |
| |
| |
| def parse_line(line: str) -> Optional[Entry]: |
| sline = line.strip() |
| if not sline: |
| return None |
| |
| if sline == "MAINTAINERS": |
| return Entry(LineType.REPO, sline) |
| |
| tag = line[:2] |
| if "@" in tag: |
| return Entry(LineType.REPO, sline[1:].split(":")[0].strip()) |
| elif tag == "M:": |
| return Entry(LineType.MAINTAINER, sline.split(":")[1].strip()) |
| elif tag == "R:": |
| return Entry(LineType.REVIEWER, sline.split(":")[1].strip()) |
| elif tag == "F:": |
| return Entry(LineType.FORKED, sline[2:].strip()) |
| elif "#" in tag: |
| return Entry(LineType.COMMENT, line) |
| |
| return None |
| |
| |
| D = Union[str, List[Identity], List[str]] |
| |
| |
| def parse_repo(content: str) -> str: |
| return content |
| |
| |
| def parse_forked(content: str) -> str: |
| return content |
| |
| |
| def parse_irc(src: Iterator[str]) -> Optional[str]: |
| irc = "" |
| for c in src: |
| if c == "#": |
| return None |
| if c == "<": |
| break |
| else: |
| return None |
| |
| for c in src: |
| if c in "!#": |
| return irc.strip() |
| irc += c |
| |
| raise ValueError("Unterminated IRC handle") |
| |
| |
| def parse_address(src: Iterator[str]) -> str: |
| addr = "" |
| for c in src: |
| if c in ">#": |
| return addr.strip() |
| addr += c |
| raise ValueError("Unterminated email address") |
| |
| |
| def parse_name(src: Iterator[str]) -> str: |
| name = "" |
| for c in src: |
| if c in "<#": |
| return name.strip() |
| name += c |
| raise ValueError("Unterminated name") |
| |
| |
| def parse_email(src: Iterator[str]) -> Email: |
| name = parse_name(src) |
| address = parse_address(src) |
| return Email(name, address) |
| |
| |
| def parse_identity(content: str) -> Identity: |
| ci = iter(content) |
| email = parse_email(ci) |
| irc = parse_irc(ci) |
| return Identity(email, irc) |
| |
| |
| B = Dict[LineType, D] |
| |
| |
| def parse_block(src: Iterator[str]) -> Optional[B]: |
| state = ParseState.BEGIN |
| repo = cast(B, OrderedDict()) |
| for line in src: |
| try: |
| entry = parse_line(line) |
| if state == ParseState.BEGIN and not entry: |
| continue |
| elif state == ParseState.BEGIN and entry: |
| state = ParseState.BLOCK |
| elif state == ParseState.BLOCK and not entry: |
| return repo |
| |
| assert entry |
| |
| if entry.type == LineType.REPO: |
| repo[entry.type] = parse_repo(entry.content) |
| elif entry.type in {LineType.MAINTAINER, LineType.REVIEWER}: |
| if not entry.type in repo: |
| repo[entry.type] = cast(List[Identity], list()) |
| cast(list, repo[entry.type]).append(parse_identity(entry.content)) |
| elif entry.type == LineType.FORKED: |
| repo[entry.type] = parse_forked(entry.content) |
| elif entry.type == LineType.COMMENT: |
| if not entry.type in repo: |
| repo[entry.type] = cast(List[str], list()) |
| cast(list, repo[entry.type]).append(entry.content) |
| except ValueError as e: |
| print("Failed to parse line '{}': {}".format(line.strip(), e)) |
| |
| if not repo: |
| return None |
| |
| return repo |
| |
| |
| def trash_preamble(src: Iterator[str]) -> None: |
| s = 0 |
| for line in src: |
| sline = line.strip() |
| if "START OF MAINTAINERS LIST" == sline: |
| s = 1 |
| if s == 1 and sline == "-------------------------": |
| break |
| |
| |
| def parse_maintainers(src: Iterator[str]) -> Dict[D, B]: |
| maintainers = cast(Dict[D, B], OrderedDict()) |
| trash_preamble(src) |
| while True: |
| repo = cast(B, parse_block(src)) |
| if not repo: |
| break |
| maintainers[repo[LineType.REPO]] = repo |
| return maintainers |
| |
| |
| def get_reviewers(mfile: str) -> ReviewList: |
| revlist = ReviewList() |
| |
| with open(mfile) as mstream: |
| trash_preamble(mstream) |
| block = parse_block(mstream) |
| if not block: |
| return revlist |
| mlist = cast(List[Identity], block[LineType.MAINTAINER]) |
| revlist.maintainers.extend(i.email.address for i in mlist) |
| if LineType.REVIEWER in block: |
| rlist = cast(List[Identity], block[LineType.REVIEWER]) |
| revlist.reviewers.extend(i.email.address for i in rlist) |
| return revlist |
| |
| |
| def assemble_name(name: str, dst: IO[str]) -> None: |
| dst.write(name) |
| |
| |
| def assemble_address(address: str, dst: IO[str]) -> None: |
| dst.write("<") |
| dst.write(address) |
| dst.write(">") |
| |
| |
| def assemble_email(email: Email, dst: IO[str]) -> None: |
| assemble_name(email.name, dst) |
| dst.write(" ") |
| assemble_address(email.address, dst) |
| |
| |
| def assemble_irc(irc: Optional[str], dst: IO[str]) -> None: |
| if irc: |
| dst.write(" ") |
| dst.write("<") |
| dst.write(irc) |
| dst.write("!>") |
| |
| |
| def assemble_identity(identity: Identity, dst: IO[str]) -> None: |
| assemble_email(identity.email, dst) |
| assemble_irc(identity.irc, dst) |
| |
| |
| def assemble_maintainers(identities: List[Identity], dst: IO[str]) -> None: |
| for i in identities: |
| dst.write("M: ") |
| assemble_identity(i, dst) |
| dst.write("\n") |
| |
| |
| def assemble_reviewers(identities: List[Identity], dst: IO[str]) -> None: |
| for i in identities: |
| dst.write("R: ") |
| assemble_identity(i, dst) |
| dst.write("\n") |
| |
| |
| def assemble_forked(content: str, dst: IO[str]) -> None: |
| if content: |
| dst.write("F: ") |
| dst.write(content) |
| dst.write("\n") |
| |
| |
| def assemble_comment(content: List[str], dst: IO[str]) -> None: |
| dst.write("".join(content)) |
| |
| |
| def assemble_block(block: B, default: B, dst: IO[str]) -> None: |
| if LineType.COMMENT in block: |
| assemble_comment(cast(List[str], block[LineType.COMMENT]), dst) |
| if LineType.MAINTAINER in block: |
| maintainers = block[LineType.MAINTAINER] |
| else: |
| maintainers = default[LineType.MAINTAINER] |
| assemble_maintainers(cast(List[Identity], maintainers), dst) |
| if LineType.REVIEWER in block: |
| assemble_reviewers(cast(List[Identity], block[LineType.REVIEWER]), dst) |
| if LineType.FORKED in block: |
| assemble_forked(cast(str, block[LineType.FORKED]), dst) |
| |
| |
| def main() -> None: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("maintainers", type=argparse.FileType("r"), default=sys.stdin) |
| parser.add_argument("output", type=argparse.FileType("w"), default=sys.stdout) |
| args = parser.parse_args() |
| blocks = parse_maintainers(args.maintainers) |
| for block in blocks.values(): |
| print(block[LineType.REPO]) |
| assemble_block(block, blocks["MAINTAINERS"], args.output) |
| print() |
| |
| |
| if __name__ == "__main__": |
| main() |