Andrew Jeffery | f4019fe | 2018-05-18 16:42:18 +0930 | [diff] [blame] | 1 | #!/usr/bin/python3 |
| 2 | # |
| 3 | # SPDX-License-Identifier: Apache-2.0 |
| 4 | # Copyright (C) 2018 IBM Corp. |
| 5 | |
| 6 | import argparse |
| 7 | import sys |
| 8 | from collections import namedtuple, OrderedDict |
| 9 | from enum import Enum, unique |
| 10 | from typing import (Dict, NamedTuple, Iterator, Sequence, Union, Optional, |
| 11 | List, cast, IO) |
| 12 | from pprint import pprint |
| 13 | |
| 14 | @unique |
| 15 | class LineType(Enum): |
| 16 | REPO = 1 |
| 17 | MAINTAINER = 2 |
| 18 | REVIEWER = 3 |
| 19 | FORKED = 4 |
| 20 | COMMENT = 5 |
| 21 | |
| 22 | @unique |
| 23 | class ParseState(Enum): |
| 24 | BEGIN = 1 |
| 25 | BLOCK = 2 |
| 26 | |
| 27 | Email = NamedTuple("Email", [("name", str), ("address", str)]) |
| 28 | Identity = NamedTuple("Identity", [("email", Email), ("irc", Optional[str])]) |
| 29 | Entry = NamedTuple("Entry", [("type", LineType), ("content", str)]) |
| 30 | |
| 31 | def parse_line(line: str) -> Optional[Entry]: |
| 32 | sline = line.strip() |
| 33 | if not sline: |
| 34 | return None |
| 35 | |
| 36 | if sline == "MAINTAINERS": |
| 37 | return Entry(LineType.REPO, sline) |
| 38 | |
| 39 | tag = line[:2] |
| 40 | if '@' in tag: |
| 41 | return Entry(LineType.REPO, sline[1:].split(":")[0].strip()) |
| 42 | elif tag == 'M:': |
| 43 | return Entry(LineType.MAINTAINER, sline.split(":")[1].strip()) |
| 44 | elif tag == 'R:': |
| 45 | return Entry(LineType.REVIEWER, sline.split(":")[1].strip()) |
| 46 | elif tag == 'F:': |
| 47 | return Entry(LineType.FORKED, sline[2:].strip()) |
| 48 | elif '#' in tag: |
| 49 | return Entry(LineType.COMMENT, line) |
| 50 | |
| 51 | return None |
| 52 | |
| 53 | D = Union[str, List[Identity], List[str]] |
| 54 | |
| 55 | def parse_repo(content: str) -> str: |
| 56 | return content |
| 57 | |
| 58 | def parse_forked(content: str) -> str: |
| 59 | return content |
| 60 | |
| 61 | def parse_irc(src: Iterator[str]) -> Optional[str]: |
| 62 | irc = "" |
| 63 | for c in src: |
| 64 | if c == '#': |
| 65 | return None |
| 66 | if c == '<': |
| 67 | break |
| 68 | else: |
| 69 | return None |
| 70 | |
| 71 | for c in src: |
| 72 | if c in '!#': |
| 73 | return irc.strip() |
| 74 | irc += c |
| 75 | |
| 76 | raise ValueError("Unterminated IRC handle") |
| 77 | |
| 78 | def parse_address(src: Iterator[str]) -> str: |
| 79 | addr = "" |
| 80 | for c in src: |
| 81 | if c in '>#': |
| 82 | return addr.strip() |
| 83 | addr += c |
| 84 | raise ValueError("Unterminated email address") |
| 85 | |
| 86 | def parse_name(src: Iterator[str]) -> str: |
| 87 | name = "" |
| 88 | for c in src: |
| 89 | if c in '<#': |
| 90 | return name.strip() |
| 91 | name += c |
| 92 | raise ValueError("Unterminated name") |
| 93 | |
| 94 | def parse_email(src: Iterator[str]) -> Email: |
| 95 | name = parse_name(src) |
| 96 | address = parse_address(src) |
| 97 | return Email(name, address) |
| 98 | |
| 99 | def parse_identity(content: str) -> Identity: |
| 100 | ci = iter(content) |
| 101 | email = parse_email(ci) |
| 102 | irc = parse_irc(ci) |
| 103 | return Identity(email, irc) |
| 104 | |
| 105 | B = Dict[LineType, D] |
| 106 | |
| 107 | def parse_block(src: Iterator[str]) -> Optional[B]: |
| 108 | state = ParseState.BEGIN |
| 109 | repo: Dict[LineType, D] = OrderedDict() |
| 110 | for line in src: |
| 111 | try: |
| 112 | entry = parse_line(line) |
| 113 | if state == ParseState.BEGIN and not entry: |
| 114 | continue |
| 115 | elif state == ParseState.BEGIN and entry: |
| 116 | state = ParseState.BLOCK |
| 117 | elif state == ParseState.BLOCK and not entry: |
| 118 | return repo |
| 119 | |
| 120 | assert entry |
| 121 | |
| 122 | if entry.type == LineType.REPO: |
| 123 | repo[entry.type] = parse_repo(entry.content) |
| 124 | elif entry.type in { LineType.MAINTAINER, LineType.REVIEWER }: |
| 125 | if not entry.type in repo: |
| 126 | repo[entry.type] = cast(List[Identity], list()) |
| 127 | cast(list, repo[entry.type]).append(parse_identity(entry.content)) |
| 128 | elif entry.type == LineType.FORKED: |
| 129 | repo[entry.type] = parse_forked(entry.content) |
| 130 | elif entry.type == LineType.COMMENT: |
| 131 | if not entry.type in repo: |
| 132 | repo[entry.type] = cast(List[str], list()) |
| 133 | cast(list, repo[entry.type]).append(entry.content) |
| 134 | except ValueError as e: |
| 135 | print("Failed to parse line '{}': {}".format(line.strip(), e)) |
| 136 | |
| 137 | if not repo: |
| 138 | return None |
| 139 | |
| 140 | return repo |
| 141 | |
| 142 | def trash_preamble(src: Iterator[str]) -> None: |
| 143 | s = 0 |
| 144 | for line in src: |
| 145 | sline = line.strip() |
| 146 | if "START OF MAINTAINERS LIST" == sline: |
| 147 | s = 1 |
| 148 | if s == 1 and sline == "-------------------------": |
| 149 | break |
| 150 | |
| 151 | def parse_maintainers(src: Iterator[str]) -> Dict[D, B]: |
| 152 | maintainers: Dict[D, B] = OrderedDict() |
| 153 | trash_preamble(src) |
| 154 | while True: |
| 155 | repo: B = parse_block(src) |
| 156 | if not repo: |
| 157 | break |
| 158 | maintainers[repo[LineType.REPO]] = repo |
| 159 | return maintainers |
| 160 | |
| 161 | def assemble_name(name: str, dst: IO[str]) -> None: |
| 162 | dst.write(name) |
| 163 | |
| 164 | def assemble_address(address: str, dst: IO[str]) -> None: |
| 165 | dst.write("<") |
| 166 | dst.write(address) |
| 167 | dst.write(">") |
| 168 | |
| 169 | def assemble_email(email: Email, dst: IO[str]) -> None: |
| 170 | assemble_name(email.name, dst) |
| 171 | dst.write(" ") |
| 172 | assemble_address(email.address, dst) |
| 173 | |
| 174 | def assemble_irc(irc: Optional[str], dst: IO[str]) -> None: |
| 175 | if irc: |
| 176 | dst.write(" ") |
| 177 | dst.write("<") |
| 178 | dst.write(irc) |
| 179 | dst.write("!>") |
| 180 | |
| 181 | def assemble_identity(identity: Identity, dst: IO[str]) -> None: |
| 182 | assemble_email(identity.email, dst) |
| 183 | assemble_irc(identity.irc, dst) |
| 184 | |
| 185 | def assemble_maintainers(identities: List[Identity], dst: IO[str]) -> None: |
| 186 | for i in identities: |
| 187 | dst.write("M: ") |
| 188 | assemble_identity(i, dst) |
| 189 | dst.write("\n") |
| 190 | |
| 191 | def assemble_reviewers(identities: List[Identity], dst: IO[str]) -> None: |
| 192 | for i in identities: |
| 193 | dst.write("R: ") |
| 194 | assemble_identity(i, dst) |
| 195 | dst.write("\n") |
| 196 | |
| 197 | def assemble_forked(content: str, dst: IO[str]) -> None: |
| 198 | if content: |
| 199 | dst.write("F: ") |
| 200 | dst.write(content) |
| 201 | dst.write("\n") |
| 202 | |
| 203 | def assemble_comment(content: List[str], dst: IO[str]) -> None: |
| 204 | dst.write("".join(content)) |
| 205 | |
| 206 | def assemble_block(block: B, default: B, dst: IO[str]) -> None: |
| 207 | if LineType.COMMENT in block: |
| 208 | assemble_comment(cast(List[str], block[LineType.COMMENT]), dst) |
| 209 | if LineType.MAINTAINER in block: |
| 210 | maintainers = block[LineType.MAINTAINER] |
| 211 | else: |
| 212 | maintainers = default[LineType.MAINTAINER] |
| 213 | assemble_maintainers(cast(List[Identity], maintainers), dst) |
| 214 | if LineType.REVIEWER in block: |
| 215 | assemble_reviewers(cast(List[Identity], block[LineType.REVIEWER]), dst) |
| 216 | if LineType.FORKED in block: |
| 217 | assemble_forked(cast(str, block[LineType.FORKED]), dst) |
| 218 | |
| 219 | def main() -> None: |
| 220 | parser = argparse.ArgumentParser() |
| 221 | parser.add_argument("maintainers", type=argparse.FileType('r'), |
| 222 | default=sys.stdin) |
| 223 | parser.add_argument("output", type=argparse.FileType('w'), |
| 224 | default=sys.stdout) |
| 225 | args = parser.parse_args() |
| 226 | blocks = parse_maintainers(args.maintainers) |
| 227 | for block in blocks.values(): |
| 228 | print(block[LineType.REPO]) |
| 229 | assemble_block(block, blocks['MAINTAINERS'], args.output) |
| 230 | print() |
| 231 | |
| 232 | if __name__ == "__main__": |
| 233 | main() |