Andrew Geissler | 5f35090 | 2021-07-23 13:09:54 -0400 | [diff] [blame] | 1 | # |
Patrick Williams | 92b42cb | 2022-09-03 06:53:57 -0500 | [diff] [blame] | 2 | # Copyright BitBake Contributors |
| 3 | # |
Andrew Geissler | 5f35090 | 2021-07-23 13:09:54 -0400 | [diff] [blame] | 4 | # SPDX-License-Identifier: GPL-2.0-only |
| 5 | # |
| 6 | |
| 7 | from pathlib import Path |
| 8 | import bb.compress.lz4 |
| 9 | import bb.compress.zstd |
| 10 | import contextlib |
| 11 | import os |
| 12 | import shutil |
| 13 | import tempfile |
| 14 | import unittest |
| 15 | import subprocess |
| 16 | |
| 17 | |
| 18 | class CompressionTests(object): |
| 19 | def setUp(self): |
| 20 | self._t = tempfile.TemporaryDirectory() |
| 21 | self.tmpdir = Path(self._t.name) |
| 22 | self.addCleanup(self._t.cleanup) |
| 23 | |
| 24 | def _file_helper(self, mode_suffix, data): |
| 25 | tmp_file = self.tmpdir / "compressed" |
| 26 | |
| 27 | with self.do_open(tmp_file, mode="w" + mode_suffix) as f: |
| 28 | f.write(data) |
| 29 | |
| 30 | with self.do_open(tmp_file, mode="r" + mode_suffix) as f: |
| 31 | read_data = f.read() |
| 32 | |
| 33 | self.assertEqual(read_data, data) |
| 34 | |
| 35 | def test_text_file(self): |
| 36 | self._file_helper("t", "Hello") |
| 37 | |
| 38 | def test_binary_file(self): |
| 39 | self._file_helper("b", "Hello".encode("utf-8")) |
| 40 | |
| 41 | def _pipe_helper(self, mode_suffix, data): |
| 42 | rfd, wfd = os.pipe() |
| 43 | with open(rfd, "rb") as r, open(wfd, "wb") as w: |
| 44 | with self.do_open(r, mode="r" + mode_suffix) as decompress: |
| 45 | with self.do_open(w, mode="w" + mode_suffix) as compress: |
| 46 | compress.write(data) |
| 47 | read_data = decompress.read() |
| 48 | |
| 49 | self.assertEqual(read_data, data) |
| 50 | |
| 51 | def test_text_pipe(self): |
| 52 | self._pipe_helper("t", "Hello") |
| 53 | |
| 54 | def test_binary_pipe(self): |
| 55 | self._pipe_helper("b", "Hello".encode("utf-8")) |
| 56 | |
| 57 | def test_bad_decompress(self): |
| 58 | tmp_file = self.tmpdir / "compressed" |
| 59 | with tmp_file.open("wb") as f: |
| 60 | f.write(b"\x00") |
| 61 | |
| 62 | with self.assertRaises(OSError): |
| 63 | with self.do_open(tmp_file, mode="rb", stderr=subprocess.DEVNULL) as f: |
| 64 | data = f.read() |
| 65 | |
| 66 | |
| 67 | class LZ4Tests(CompressionTests, unittest.TestCase): |
| 68 | def setUp(self): |
| 69 | if shutil.which("lz4c") is None: |
| 70 | self.skipTest("'lz4c' not found") |
| 71 | super().setUp() |
| 72 | |
| 73 | @contextlib.contextmanager |
| 74 | def do_open(self, *args, **kwargs): |
| 75 | with bb.compress.lz4.open(*args, **kwargs) as f: |
| 76 | yield f |
| 77 | |
| 78 | |
| 79 | class ZStdTests(CompressionTests, unittest.TestCase): |
| 80 | def setUp(self): |
| 81 | if shutil.which("zstd") is None: |
| 82 | self.skipTest("'zstd' not found") |
| 83 | super().setUp() |
| 84 | |
| 85 | @contextlib.contextmanager |
| 86 | def do_open(self, *args, **kwargs): |
| 87 | with bb.compress.zstd.open(*args, **kwargs) as f: |
| 88 | yield f |
| 89 | |
| 90 | |
| 91 | class PZStdTests(CompressionTests, unittest.TestCase): |
| 92 | def setUp(self): |
| 93 | if shutil.which("pzstd") is None: |
| 94 | self.skipTest("'pzstd' not found") |
| 95 | super().setUp() |
| 96 | |
| 97 | @contextlib.contextmanager |
| 98 | def do_open(self, *args, **kwargs): |
| 99 | with bb.compress.zstd.open(*args, num_threads=2, **kwargs) as f: |
| 100 | yield f |