0x1949 Team - FAZEMRX - MANAGER
Edit File: debfile.py
""" Representation of Debian binary package (.deb) files Debfile Classes =============== """ # Copyright (C) 2007-2008 Stefano Zacchiroli <zack@debian.org> # Copyright (C) 2007 Filippo Giunchedi <filippo@debian.org> # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import gzip import io import tarfile import sys import os.path try: # pylint: disable=unused-import from typing import ( Any, BinaryIO, Dict, IO, Iterator, List, Optional, Text, TypeVar, Union, overload, TYPE_CHECKING, ) from typing_extensions import ( Literal, ) except ImportError: # Missing types aren't important at runtime if not TYPE_CHECKING: overload = lambda f: None from debian.arfile import ArFile, ArError, ArMember # pylint: disable=unused-import from debian.changelog import Changelog from debian.deb822 import Deb822 DATA_PART = 'data.tar' # w/o extension CTRL_PART = 'control.tar' PART_EXTS = ['gz', 'bz2', 'xz', 'lzma', 'zst'] # possible extensions INFO_PART = 'debian-binary' MAINT_SCRIPTS = ['preinst', 'postinst', 'prerm', 'postrm', 'config'] CONTROL_FILE = 'control' CHANGELOG_NATIVE = 'usr/share/doc/%s/changelog.gz' # with package stem CHANGELOG_DEBIAN = 'usr/share/doc/%s/changelog.Debian.gz' MD5_FILE = 'md5sums' class DebError(ArError): pass class DebPart(object): """'Part' of a .deb binary package. A .deb package is considered as made of 2 parts: a 'data' part (corresponding to the possibly compressed 'data.tar' archive embedded in a .deb) and a 'control' part (the 'control.tar.gz' archive). Each of them is represented by an instance of this class. Each archive should be a compressed tar archive although an uncompressed data.tar is permitted; supported compression formats are: .tar.gz, .tar.bz2, .tar.xz . When referring to file members of the underlying .tar.gz archive, file names can be specified in one of 3 formats "file", "./file", "/file". In all cases the file is considered relative to the root of the archive. For the control part the preferred mechanism is the first one (as in deb.control.get_content('control') ); for the data part the preferred mechanism is the third one (as in deb.data.get_file('/etc/vim/vimrc') ). """ def __init__(self, member): # type: (ArMember) -> None self.__member = member # arfile.ArMember file member self.__tgz = None # type: Optional[tarfile.TarFile] def tgz(self): # type: () -> tarfile.TarFile """Return a TarFile object corresponding to this part of a .deb package. Despite the name, this method gives access to various kind of compressed tar archives, not only gzipped ones. """ def _custom_decompress(command_list): try: # pylint: disable=import-outside-toplevel import subprocess import signal import io # pylint: disable=subprocess-popen-preexec-fn proc = subprocess.Popen( command_list, stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=False, preexec_fn=lambda: signal.signal(signal.SIGPIPE, signal.SIG_DFL) ) except (OSError, ValueError) as e: raise DebError("%s" % e) data = proc.communicate(self.__member.read())[0] if proc.returncode != 0: raise DebError("command has failed with code '%s'" % proc.returncode) return io.BytesIO(data) if self.__tgz is None: name = self.__member.name extension = os.path.splitext(name)[1][1:] if extension in PART_EXTS or name == DATA_PART or name == CTRL_PART: # Permit compressed members and also uncompressed data.tar # tarfile has no zst support: https://bugs.python.org/issue37095 if extension == 'zst': buffer = _custom_decompress(['unzstd', '--stdout']) else: buffer = self.__member try: self.__tgz = tarfile.open(fileobj=buffer, mode='r:*') # type: ignore # pylint: disable = consider-using-with except (tarfile.ReadError, tarfile.CompressionError) as e: raise DebError("tarfile has returned an error: '%s'" % e) else: raise DebError("part '%s' has unexpected extension" % name) return self.__tgz @staticmethod def __normalize_member(fname): # type: (str) -> str """ try (not so hard) to obtain a member file name in a form relative to the .tar.gz root and with no heading '.' """ if fname.startswith('./'): fname = fname[2:] elif fname.startswith('/'): fname = fname[1:] return fname def has_file(self, fname): # type: (str) -> bool """Check if this part contains a given file name.""" fname = DebPart.__normalize_member(fname) names = self.tgz().getnames() return './' + fname in names @overload def get_file(self, fname, encoding=None, errors=None): # type: (str, None, Optional[str]) -> IO[bytes] pass @overload def get_file(self, fname, encoding, errors=None): # type: (str, str, Optional[str]) -> IO[str] pass def get_file(self, fname, encoding=None, errors=None): # type: (str, Optional[str], Optional[str]) -> Union[IO[bytes], IO[str]] """Return a file object corresponding to a given file name. If encoding is given, then the file object will return Unicode data; otherwise, it will return binary data. """ fname = DebPart.__normalize_member(fname) fobj = self.tgz().extractfile('./' + fname) if fobj is None: raise DebError("File not found inside package") if encoding is not None: return io.TextIOWrapper(fobj, encoding=encoding, errors=errors) return fobj @overload def get_content(self, fname, # type: str encoding=None, # type: Literal[None] errors=None, # type: Optional[str] ): # type: (...) -> Optional[bytes] pass @overload def get_content(self, fname, # type: str encoding, # type: str errors=None, # type: Optional[str] ): # type: (...) -> Optional[Text] pass def get_content(self, fname, # type: str encoding=None, # type: Optional[str] errors=None, # type: Optional[str] ): # type: (...) -> Optional[Union[Text,bytes]] """Return the string content of a given file, or None (e.g. for directories). If encoding is given, then the content will be a Unicode object; otherwise, it will contain binary data. """ f = self.get_file(fname, encoding=encoding, errors=errors) content = None if f: # can be None for non regular or link files content = f.read() f.close() return content # container emulation def __iter__(self): # type: () -> Iterator[str] return iter(self.tgz().getnames()) def __contains__(self, fname): # type: (str) -> bool return self.has_file(fname) def __getitem__(self, fname): # type: (str) -> Optional[Union[bytes, Text]] return self.get_content(fname) def close(self): # type: () -> None self.__member.close() class DebData(DebPart): pass class DebControl(DebPart): def scripts(self): # type: () -> Dict[str, bytes] """ Return a dictionary of maintainer scripts (postinst, prerm, ...) mapping script names to script text. """ scripts = {} # type: Dict[str, bytes] for fname in MAINT_SCRIPTS: if self.has_file(fname): data = self.get_content(fname) if data is not None: scripts[fname] = data return scripts def debcontrol(self): # type: () -> Deb822 """ Return the debian/control as a Deb822 (a Debian-specific dict-like class) object. For a string representation of debian/control try .get_content('control') """ return Deb822(self.get_content(CONTROL_FILE)) @overload def md5sums(self, encoding=None, errors=None): # type: (Literal[None], Optional[str]) -> Dict[bytes, str] pass @overload def md5sums(self, encoding, errors=None): # type: (str, Optional[str]) -> Dict[str, str] pass def md5sums(self, encoding=None, errors=None): # type: (Optional[str], Optional[str]) -> Union[Dict[str, str], Dict[bytes, str]] """ Return a dictionary mapping filenames (of the data part) to md5sums. Fails if the control part does not contain a 'md5sum' file. Keys of the returned dictionary are the left-hand side values of lines in the md5sums member of control.tar.gz, usually file names relative to the file system root (without heading '/' or './'). The returned keys are Unicode objects if an encoding is specified, otherwise binary. The returned values are always Unicode.""" if not self.has_file(MD5_FILE): raise DebError( "'%s' file not found, can't list MD5 sums" % MD5_FILE) md5_file = self.get_file(MD5_FILE, encoding=encoding, errors=errors) sums = {} # type: Dict[Any, str] newline = '\r\n' # type: Union[str, bytes] if encoding is None: newline = b'\r\n' for line in md5_file.readlines(): # we need to support spaces in filenames, .split() is not enough md5, fname = line.rstrip(newline).split(None, 1) # type: ignore if isinstance(md5, bytes): sums[fname] = md5.decode() else: sums[fname] = md5 md5_file.close() return sums class DebFile(ArFile): # pylint: disable=abstract-method """Representation of a .deb file (a Debian binary package) DebFile objects have the following (read-only) properties: - version debian .deb file format version (not related with the contained package version), 2.0 at the time of writing for all .deb packages in the Debian archive - data DebPart object corresponding to the data.tar.gz (or other compressed or uncompressed tar) archive contained in the .deb file - control DebPart object corresponding to the control.tar.gz (or other compressed tar) archive contained in the .deb file """ def __init__(self, filename=None, mode='r', fileobj=None): # type: (Optional[str], str, Optional[BinaryIO]) -> None ArFile.__init__(self, filename, mode, fileobj) actual_names = set(self.getnames()) def compressed_part_name(basename): # type: (str) -> str candidates = ['%s.%s' % (basename, ext) for ext in PART_EXTS] # also permit uncompressed data.tar and control.tar if basename in (DATA_PART, CTRL_PART): candidates.append(basename) parts = actual_names.intersection(set(candidates)) if not parts: raise DebError( "missing required part in given .deb" " (expected one of: %s)" % candidates) if len(parts) > 1: raise DebError( "too many parts in given .deb" " (was looking for only one of: %s)" % candidates) return list(parts)[0] # singleton list if INFO_PART not in actual_names: raise DebError( "missing required part in given .deb" " (expected: '%s')" % INFO_PART) self.__parts = {} # type: Dict[str, DebPart] self.__parts[CTRL_PART] = DebControl(self.getmember( compressed_part_name(CTRL_PART))) self.__parts[DATA_PART] = DebData(self.getmember( compressed_part_name(DATA_PART))) self.__pkgname = None # updated lazily by __updatePkgName f = self.getmember(INFO_PART) self.__version = f.read().strip() f.close() def __updatePkgName(self): # type: () -> None self.__pkgname = self.debcontrol()['package'] @property def version(self): # type: () -> bytes return self.__version @property def data(self): # type: () -> DebData return self.__parts[DATA_PART] # type: ignore @property def control(self): # type: () -> DebControl return self.__parts[CTRL_PART] # type: ignore # proxy methods for the appropriate parts def debcontrol(self): # type: () -> Deb822 """ See .control.debcontrol() """ return self.control.debcontrol() def scripts(self): # type: () -> Dict[str, bytes] """ See .control.scripts() """ return self.control.scripts() @overload def md5sums(self, encoding=None, errors=None): # type: (Literal[None], Optional[str]) -> Dict[bytes, str] pass @overload def md5sums(self, encoding, errors=None): # type: (str, Optional[str]) -> Dict[str, str] pass def md5sums(self, encoding=None, errors=None): # type: (Optional[str], Optional[str]) -> Union[Dict[str, str], Dict[bytes, str]] """ See .control.md5sums() """ return self.control.md5sums(encoding=encoding, errors=errors) def changelog(self): # type: () -> Optional[Changelog] """ Return a Changelog object for the changelog.Debian.gz of the present .deb package. Return None if no changelog can be found. """ if self.__pkgname is None: self.__updatePkgName() for fname in [CHANGELOG_DEBIAN % self.__pkgname, CHANGELOG_NATIVE % self.__pkgname]: if self.data.has_file(fname): gz = gzip.GzipFile(fileobj=self.data.get_file(fname)) raw_changelog = gz.read() gz.close() return Changelog(raw_changelog) return None def close(self): # type: () -> None self.control.close() self.data.close() def __enter__(self): # type: () -> DebFile return self def __exit__(self, exc_type, exc_val, exc_tb): # type: (Any, Any, Any) -> None self.close() if __name__ == '__main__': deb = DebFile(filename=sys.argv[1]) tgz = deb.control.tgz() print(tgz.getmember('control'))