blob: d18d676c6ef0276a74323b7d8db8ce50b8a35b0a [file] [log] [blame]
Andrew Jefferyf4019fe2018-05-18 16:42:18 +09301#!/usr/bin/python3
2#
3# SPDX-License-Identifier: Apache-2.0
4# Copyright (C) 2018 IBM Corp.
5
6import argparse
7import sys
8from collections import namedtuple, OrderedDict
9from enum import Enum, unique
10from typing import (Dict, NamedTuple, Iterator, Sequence, Union, Optional,
11 List, cast, IO)
12from pprint import pprint
13
14@unique
15class LineType(Enum):
16 REPO = 1
17 MAINTAINER = 2
18 REVIEWER = 3
19 FORKED = 4
20 COMMENT = 5
21
22@unique
23class ParseState(Enum):
24 BEGIN = 1
25 BLOCK = 2
26
27Email = NamedTuple("Email", [("name", str), ("address", str)])
28Identity = NamedTuple("Identity", [("email", Email), ("irc", Optional[str])])
29Entry = NamedTuple("Entry", [("type", LineType), ("content", str)])
30
31def parse_line(line: str) -> Optional[Entry]:
32 sline = line.strip()
33 if not sline:
34 return None
35
36 if sline == "MAINTAINERS":
37 return Entry(LineType.REPO, sline)
38
39 tag = line[:2]
40 if '@' in tag:
41 return Entry(LineType.REPO, sline[1:].split(":")[0].strip())
42 elif tag == 'M:':
43 return Entry(LineType.MAINTAINER, sline.split(":")[1].strip())
44 elif tag == 'R:':
45 return Entry(LineType.REVIEWER, sline.split(":")[1].strip())
46 elif tag == 'F:':
47 return Entry(LineType.FORKED, sline[2:].strip())
48 elif '#' in tag:
49 return Entry(LineType.COMMENT, line)
50
51 return None
52
53D = Union[str, List[Identity], List[str]]
54
55def parse_repo(content: str) -> str:
56 return content
57
58def parse_forked(content: str) -> str:
59 return content
60
61def parse_irc(src: Iterator[str]) -> Optional[str]:
62 irc = ""
63 for c in src:
64 if c == '#':
65 return None
66 if c == '<':
67 break
68 else:
69 return None
70
71 for c in src:
72 if c in '!#':
73 return irc.strip()
74 irc += c
75
76 raise ValueError("Unterminated IRC handle")
77
78def parse_address(src: Iterator[str]) -> str:
79 addr = ""
80 for c in src:
81 if c in '>#':
82 return addr.strip()
83 addr += c
84 raise ValueError("Unterminated email address")
85
86def parse_name(src: Iterator[str]) -> str:
87 name = ""
88 for c in src:
89 if c in '<#':
90 return name.strip()
91 name += c
92 raise ValueError("Unterminated name")
93
94def parse_email(src: Iterator[str]) -> Email:
95 name = parse_name(src)
96 address = parse_address(src)
97 return Email(name, address)
98
99def parse_identity(content: str) -> Identity:
100 ci = iter(content)
101 email = parse_email(ci)
102 irc = parse_irc(ci)
103 return Identity(email, irc)
104
105B = Dict[LineType, D]
106
107def parse_block(src: Iterator[str]) -> Optional[B]:
108 state = ParseState.BEGIN
109 repo: Dict[LineType, D] = OrderedDict()
110 for line in src:
111 try:
112 entry = parse_line(line)
113 if state == ParseState.BEGIN and not entry:
114 continue
115 elif state == ParseState.BEGIN and entry:
116 state = ParseState.BLOCK
117 elif state == ParseState.BLOCK and not entry:
118 return repo
119
120 assert entry
121
122 if entry.type == LineType.REPO:
123 repo[entry.type] = parse_repo(entry.content)
124 elif entry.type in { LineType.MAINTAINER, LineType.REVIEWER }:
125 if not entry.type in repo:
126 repo[entry.type] = cast(List[Identity], list())
127 cast(list, repo[entry.type]).append(parse_identity(entry.content))
128 elif entry.type == LineType.FORKED:
129 repo[entry.type] = parse_forked(entry.content)
130 elif entry.type == LineType.COMMENT:
131 if not entry.type in repo:
132 repo[entry.type] = cast(List[str], list())
133 cast(list, repo[entry.type]).append(entry.content)
134 except ValueError as e:
135 print("Failed to parse line '{}': {}".format(line.strip(), e))
136
137 if not repo:
138 return None
139
140 return repo
141
142def trash_preamble(src: Iterator[str]) -> None:
143 s = 0
144 for line in src:
145 sline = line.strip()
146 if "START OF MAINTAINERS LIST" == sline:
147 s = 1
148 if s == 1 and sline == "-------------------------":
149 break
150
151def parse_maintainers(src: Iterator[str]) -> Dict[D, B]:
152 maintainers: Dict[D, B] = OrderedDict()
153 trash_preamble(src)
154 while True:
155 repo: B = parse_block(src)
156 if not repo:
157 break
158 maintainers[repo[LineType.REPO]] = repo
159 return maintainers
160
161def assemble_name(name: str, dst: IO[str]) -> None:
162 dst.write(name)
163
164def assemble_address(address: str, dst: IO[str]) -> None:
165 dst.write("<")
166 dst.write(address)
167 dst.write(">")
168
169def assemble_email(email: Email, dst: IO[str]) -> None:
170 assemble_name(email.name, dst)
171 dst.write(" ")
172 assemble_address(email.address, dst)
173
174def assemble_irc(irc: Optional[str], dst: IO[str]) -> None:
175 if irc:
176 dst.write(" ")
177 dst.write("<")
178 dst.write(irc)
179 dst.write("!>")
180
181def assemble_identity(identity: Identity, dst: IO[str]) -> None:
182 assemble_email(identity.email, dst)
183 assemble_irc(identity.irc, dst)
184
185def assemble_maintainers(identities: List[Identity], dst: IO[str]) -> None:
186 for i in identities:
187 dst.write("M: ")
188 assemble_identity(i, dst)
189 dst.write("\n")
190
191def assemble_reviewers(identities: List[Identity], dst: IO[str]) -> None:
192 for i in identities:
193 dst.write("R: ")
194 assemble_identity(i, dst)
195 dst.write("\n")
196
197def assemble_forked(content: str, dst: IO[str]) -> None:
198 if content:
199 dst.write("F: ")
200 dst.write(content)
201 dst.write("\n")
202
203def assemble_comment(content: List[str], dst: IO[str]) -> None:
204 dst.write("".join(content))
205
206def assemble_block(block: B, default: B, dst: IO[str]) -> None:
207 if LineType.COMMENT in block:
208 assemble_comment(cast(List[str], block[LineType.COMMENT]), dst)
209 if LineType.MAINTAINER in block:
210 maintainers = block[LineType.MAINTAINER]
211 else:
212 maintainers = default[LineType.MAINTAINER]
213 assemble_maintainers(cast(List[Identity], maintainers), dst)
214 if LineType.REVIEWER in block:
215 assemble_reviewers(cast(List[Identity], block[LineType.REVIEWER]), dst)
216 if LineType.FORKED in block:
217 assemble_forked(cast(str, block[LineType.FORKED]), dst)
218
219def main() -> None:
220 parser = argparse.ArgumentParser()
221 parser.add_argument("maintainers", type=argparse.FileType('r'),
222 default=sys.stdin)
223 parser.add_argument("output", type=argparse.FileType('w'),
224 default=sys.stdout)
225 args = parser.parse_args()
226 blocks = parse_maintainers(args.maintainers)
227 for block in blocks.values():
228 print(block[LineType.REPO])
229 assemble_block(block, blocks['MAINTAINERS'], args.output)
230 print()
231
232if __name__ == "__main__":
233 main()