| # |
| # SPDX-License-Identifier: GPL-2.0-only |
| # |
| # Helper library to implement streaming compression and decompression using an |
| # external process |
| # |
| # This library should be used directly by end users; a wrapper library for the |
| # specific compression tool should be created |
| |
| import builtins |
| import io |
| import os |
| import subprocess |
| |
| |
| def open_wrap( |
| cls, filename, mode="rb", *, encoding=None, errors=None, newline=None, **kwargs |
| ): |
| """ |
| Open a compressed file in binary or text mode. |
| |
| Users should not call this directly. A specific compression library can use |
| this helper to provide it's own "open" command |
| |
| The filename argument can be an actual filename (a str or bytes object), or |
| an existing file object to read from or write to. |
| |
| The mode argument can be "r", "rb", "w", "wb", "x", "xb", "a" or "ab" for |
| binary mode, or "rt", "wt", "xt" or "at" for text mode. The default mode is |
| "rb". |
| |
| For binary mode, this function is equivalent to the cls constructor: |
| cls(filename, mode). In this case, the encoding, errors and newline |
| arguments must not be provided. |
| |
| For text mode, a cls object is created, and wrapped in an |
| io.TextIOWrapper instance with the specified encoding, error handling |
| behavior, and line ending(s). |
| """ |
| if "t" in mode: |
| if "b" in mode: |
| raise ValueError("Invalid mode: %r" % (mode,)) |
| else: |
| if encoding is not None: |
| raise ValueError("Argument 'encoding' not supported in binary mode") |
| if errors is not None: |
| raise ValueError("Argument 'errors' not supported in binary mode") |
| if newline is not None: |
| raise ValueError("Argument 'newline' not supported in binary mode") |
| |
| file_mode = mode.replace("t", "") |
| if isinstance(filename, (str, bytes, os.PathLike, int)): |
| binary_file = cls(filename, file_mode, **kwargs) |
| elif hasattr(filename, "read") or hasattr(filename, "write"): |
| binary_file = cls(None, file_mode, fileobj=filename, **kwargs) |
| else: |
| raise TypeError("filename must be a str or bytes object, or a file") |
| |
| if "t" in mode: |
| return io.TextIOWrapper( |
| binary_file, encoding, errors, newline, write_through=True |
| ) |
| else: |
| return binary_file |
| |
| |
| class CompressionError(OSError): |
| pass |
| |
| |
| class PipeFile(io.RawIOBase): |
| """ |
| Class that implements generically piping to/from a compression program |
| |
| Derived classes should add the function get_compress() and get_decompress() |
| that return the required commands. Input will be piped into stdin and the |
| (de)compressed output should be written to stdout, e.g.: |
| |
| class FooFile(PipeCompressionFile): |
| def get_decompress(self): |
| return ["fooc", "--decompress", "--stdout"] |
| |
| def get_compress(self): |
| return ["fooc", "--compress", "--stdout"] |
| |
| """ |
| |
| READ = 0 |
| WRITE = 1 |
| |
| def __init__(self, filename=None, mode="rb", *, stderr=None, fileobj=None): |
| if "t" in mode or "U" in mode: |
| raise ValueError("Invalid mode: {!r}".format(mode)) |
| |
| if not "b" in mode: |
| mode += "b" |
| |
| if mode.startswith("r"): |
| self.mode = self.READ |
| elif mode.startswith("w"): |
| self.mode = self.WRITE |
| else: |
| raise ValueError("Invalid mode %r" % mode) |
| |
| if fileobj is not None: |
| self.fileobj = fileobj |
| else: |
| self.fileobj = builtins.open(filename, mode or "rb") |
| |
| if self.mode == self.READ: |
| self.p = subprocess.Popen( |
| self.get_decompress(), |
| stdin=self.fileobj, |
| stdout=subprocess.PIPE, |
| stderr=stderr, |
| close_fds=True, |
| ) |
| self.pipe = self.p.stdout |
| else: |
| self.p = subprocess.Popen( |
| self.get_compress(), |
| stdin=subprocess.PIPE, |
| stdout=self.fileobj, |
| stderr=stderr, |
| close_fds=True, |
| ) |
| self.pipe = self.p.stdin |
| |
| self.__closed = False |
| |
| def _check_process(self): |
| if self.p is None: |
| return |
| |
| returncode = self.p.wait() |
| if returncode: |
| raise CompressionError("Process died with %d" % returncode) |
| self.p = None |
| |
| def close(self): |
| if self.closed: |
| return |
| |
| self.pipe.close() |
| if self.p is not None: |
| self._check_process() |
| self.fileobj.close() |
| |
| self.__closed = True |
| |
| @property |
| def closed(self): |
| return self.__closed |
| |
| def fileno(self): |
| return self.pipe.fileno() |
| |
| def flush(self): |
| self.pipe.flush() |
| |
| def isatty(self): |
| return self.pipe.isatty() |
| |
| def readable(self): |
| return self.mode == self.READ |
| |
| def writable(self): |
| return self.mode == self.WRITE |
| |
| def readinto(self, b): |
| if self.mode != self.READ: |
| import errno |
| |
| raise OSError( |
| errno.EBADF, "read() on write-only %s object" % self.__class__.__name__ |
| ) |
| size = self.pipe.readinto(b) |
| if size == 0: |
| self._check_process() |
| return size |
| |
| def write(self, data): |
| if self.mode != self.WRITE: |
| import errno |
| |
| raise OSError( |
| errno.EBADF, "write() on read-only %s object" % self.__class__.__name__ |
| ) |
| data = self.pipe.write(data) |
| |
| if not data: |
| self._check_process() |
| |
| return data |