Andrew Geissler | 635e0e4 | 2020-08-21 15:58:33 -0500 | [diff] [blame] | 1 | # |
| 2 | # BitBake Test for ANSI color code filtering |
| 3 | # |
| 4 | # Copyright (C) 2020 Agilent Technologies, Inc. |
| 5 | # Author: Chris Laplante <chris.laplante@agilent.com> |
| 6 | # |
| 7 | # SPDX-License-Identifier: MIT |
| 8 | # |
| 9 | |
| 10 | import unittest |
| 11 | import bb.progress |
| 12 | import bb.data |
| 13 | import bb.event |
| 14 | from bb.progress import filter_color, filter_color_n |
| 15 | import io |
| 16 | import re |
| 17 | |
| 18 | |
| 19 | class ProgressWatcher: |
| 20 | def __init__(self): |
| 21 | self._reports = [] |
| 22 | |
| 23 | def handle_event(self, event): |
| 24 | self._reports.append((event.progress, event.rate)) |
| 25 | |
| 26 | def reports(self): |
| 27 | return self._reports |
| 28 | |
| 29 | |
| 30 | class ColorCodeTests(unittest.TestCase): |
| 31 | def setUp(self): |
| 32 | self.d = bb.data.init() |
| 33 | self._progress_watcher = ProgressWatcher() |
Andrew Geissler | 95ac1b8 | 2021-03-31 14:34:31 -0500 | [diff] [blame] | 34 | bb.event.register("bb.build.TaskProgress", self._progress_watcher.handle_event, data=self.d) |
Andrew Geissler | 635e0e4 | 2020-08-21 15:58:33 -0500 | [diff] [blame] | 35 | |
| 36 | def tearDown(self): |
| 37 | bb.event.remove("bb.build.TaskProgress", None) |
| 38 | |
| 39 | def test_filter_color(self): |
| 40 | input_string = "[01;35m[K~~~~~~~~~~~~^~~~~~~~[m[K" |
| 41 | filtered = filter_color(input_string) |
| 42 | self.assertEqual(filtered, "~~~~~~~~~~~~^~~~~~~~") |
| 43 | |
| 44 | def test_filter_color_n(self): |
| 45 | input_string = "[01;35m[K~~~~~~~~~~~~^~~~~~~~[m[K" |
| 46 | filtered, code_count = filter_color_n(input_string) |
| 47 | self.assertEqual(filtered, "~~~~~~~~~~~~^~~~~~~~") |
| 48 | self.assertEqual(code_count, 4) |
| 49 | |
| 50 | def test_LineFilterProgressHandler_color_filtering(self): |
| 51 | class CustomProgressHandler(bb.progress.LineFilterProgressHandler): |
| 52 | PROGRESS_REGEX = re.compile(r"Progress: (?P<progress>\d+)%") |
| 53 | |
| 54 | def writeline(self, line): |
| 55 | match = self.PROGRESS_REGEX.match(line) |
| 56 | if match: |
| 57 | self.update(int(match.group("progress"))) |
| 58 | return False |
| 59 | return True |
| 60 | |
| 61 | buffer = io.StringIO() |
| 62 | handler = CustomProgressHandler(self.d, buffer) |
| 63 | handler.write("Program output!\n") |
| 64 | handler.write("More output!\n") |
| 65 | handler.write("Progress: [01;35m[K10[m[K%\n") # 10% |
| 66 | handler.write("Even more\n") |
| 67 | handler.write("[01;35m[KProgress: 50[m[K%\n") # 50% |
| 68 | handler.write("[01;35m[KProgress: 60[m[K%\n") # 60% |
| 69 | handler.write("Pro[01;35m[Kgress: [m[K100%\n") # 100% |
| 70 | |
| 71 | expected = [(10, None), (50, None), (60, None), (100, None)] |
| 72 | self.assertEqual(self._progress_watcher.reports(), expected) |
| 73 | |
| 74 | self.assertEqual(buffer.getvalue(), "Program output!\nMore output!\nEven more\n") |
| 75 | |
| 76 | def test_BasicProgressHandler_color_filtering(self): |
| 77 | buffer = io.StringIO() |
| 78 | handler = bb.progress.BasicProgressHandler(self.d, outfile=buffer) |
| 79 | handler.write("[01;35m[K1[m[K%\n") # 1% |
| 80 | handler.write("[01;35m[K2[m[K%\n") # 2% |
| 81 | handler.write("[01;35m[K10[m[K%\n") # 10% |
| 82 | handler.write("[01;35m[K100[m[K%\n") # 100% |
| 83 | |
| 84 | expected = [(0, None), (1, None), (2, None), (10, None), (100, None)] |
| 85 | self.assertListEqual(self._progress_watcher.reports(), expected) |
| 86 | |
| 87 | def test_OutOfProgressHandler_color_filtering(self): |
| 88 | buffer = io.StringIO() |
| 89 | handler = bb.progress.OutOfProgressHandler(self.d, r'(\d+) of (\d+)', outfile=buffer) |
| 90 | handler.write("[01;35m[KText text 1 of[m[K 5") # 1/5 |
| 91 | handler.write("[01;35m[KText text 3 of[m[K 5") # 3/5 |
| 92 | handler.write("[01;35m[KText text 5 of[m[K 5") # 5/5 |
| 93 | |
| 94 | expected = [(0, None), (20.0, None), (60.0, None), (100.0, None)] |
| 95 | self.assertListEqual(self._progress_watcher.reports(), expected) |