blob: 5de17a82e23c935d84d936123b45db0ef593f439 [file] [log] [blame]
#
# SPDX-License-Identifier: GPL-2.0-only
#
# Helper library to implement streaming compression and decompression using an
# external process
#
# This library should be used directly by end users; a wrapper library for the
# specific compression tool should be created
import builtins
import io
import os
import subprocess
def open_wrap(
cls, filename, mode="rb", *, encoding=None, errors=None, newline=None, **kwargs
):
"""
Open a compressed file in binary or text mode.
Users should not call this directly. A specific compression library can use
this helper to provide it's own "open" command
The filename argument can be an actual filename (a str or bytes object), or
an existing file object to read from or write to.
The mode argument can be "r", "rb", "w", "wb", "x", "xb", "a" or "ab" for
binary mode, or "rt", "wt", "xt" or "at" for text mode. The default mode is
"rb".
For binary mode, this function is equivalent to the cls constructor:
cls(filename, mode). In this case, the encoding, errors and newline
arguments must not be provided.
For text mode, a cls object is created, and wrapped in an
io.TextIOWrapper instance with the specified encoding, error handling
behavior, and line ending(s).
"""
if "t" in mode:
if "b" in mode:
raise ValueError("Invalid mode: %r" % (mode,))
else:
if encoding is not None:
raise ValueError("Argument 'encoding' not supported in binary mode")
if errors is not None:
raise ValueError("Argument 'errors' not supported in binary mode")
if newline is not None:
raise ValueError("Argument 'newline' not supported in binary mode")
file_mode = mode.replace("t", "")
if isinstance(filename, (str, bytes, os.PathLike, int)):
binary_file = cls(filename, file_mode, **kwargs)
elif hasattr(filename, "read") or hasattr(filename, "write"):
binary_file = cls(None, file_mode, fileobj=filename, **kwargs)
else:
raise TypeError("filename must be a str or bytes object, or a file")
if "t" in mode:
return io.TextIOWrapper(
binary_file, encoding, errors, newline, write_through=True
)
else:
return binary_file
class CompressionError(OSError):
pass
class PipeFile(io.RawIOBase):
"""
Class that implements generically piping to/from a compression program
Derived classes should add the function get_compress() and get_decompress()
that return the required commands. Input will be piped into stdin and the
(de)compressed output should be written to stdout, e.g.:
class FooFile(PipeCompressionFile):
def get_decompress(self):
return ["fooc", "--decompress", "--stdout"]
def get_compress(self):
return ["fooc", "--compress", "--stdout"]
"""
READ = 0
WRITE = 1
def __init__(self, filename=None, mode="rb", *, stderr=None, fileobj=None):
if "t" in mode or "U" in mode:
raise ValueError("Invalid mode: {!r}".format(mode))
if not "b" in mode:
mode += "b"
if mode.startswith("r"):
self.mode = self.READ
elif mode.startswith("w"):
self.mode = self.WRITE
else:
raise ValueError("Invalid mode %r" % mode)
if fileobj is not None:
self.fileobj = fileobj
else:
self.fileobj = builtins.open(filename, mode or "rb")
if self.mode == self.READ:
self.p = subprocess.Popen(
self.get_decompress(),
stdin=self.fileobj,
stdout=subprocess.PIPE,
stderr=stderr,
close_fds=True,
)
self.pipe = self.p.stdout
else:
self.p = subprocess.Popen(
self.get_compress(),
stdin=subprocess.PIPE,
stdout=self.fileobj,
stderr=stderr,
close_fds=True,
)
self.pipe = self.p.stdin
self.__closed = False
def _check_process(self):
if self.p is None:
return
returncode = self.p.wait()
if returncode:
raise CompressionError("Process died with %d" % returncode)
self.p = None
def close(self):
if self.closed:
return
self.pipe.close()
if self.p is not None:
self._check_process()
self.fileobj.close()
self.__closed = True
@property
def closed(self):
return self.__closed
def fileno(self):
return self.pipe.fileno()
def flush(self):
self.pipe.flush()
def isatty(self):
return self.pipe.isatty()
def readable(self):
return self.mode == self.READ
def writable(self):
return self.mode == self.WRITE
def readinto(self, b):
if self.mode != self.READ:
import errno
raise OSError(
errno.EBADF, "read() on write-only %s object" % self.__class__.__name__
)
size = self.pipe.readinto(b)
if size == 0:
self._check_process()
return size
def write(self, data):
if self.mode != self.WRITE:
import errno
raise OSError(
errno.EBADF, "write() on read-only %s object" % self.__class__.__name__
)
data = self.pipe.write(data)
if not data:
self._check_process()
return data