0x1949 Team - FAZEMRX - MANAGER
Edit File: add-apt-repository
#!/usr/bin/python3 from __future__ import print_function import apt_pkg import io import os import re import sys import gettext import locale import argparse import subprocess from softwareproperties.shortcuthandler import ShortcutException from softwareproperties.shortcuts import shortcut_handler from softwareproperties.ppa import PPAShortcutHandler from softwareproperties.cloudarchive import CloudArchiveShortcutHandler from softwareproperties.sourceslist import SourcesListShortcutHandler from softwareproperties.uri import URIShortcutHandler from softwareproperties.extendedsourceslist import (SourceEntry, SourcesList, CollapsedSourcesList) from aptsources.distro import get_distro from copy import copy from gettext import gettext as _ apt_pkg.init() SOURCESLIST = apt_pkg.config.find_file("Dir::Etc::sourcelist") class AddAptRepository(object): def __init__(self): gettext.textdomain("software-properties") self.distro = get_distro() self.sourceslist = SourcesList() self.distro.get_sources(self.sourceslist) def parse_args(self, args): description = "Only ONE of -P, -C, -U, -S, or old-style 'line' can be specified" parser = argparse.ArgumentParser(description=description) parser.add_argument("-d", "--debug", action="store_true", help=_("Print debug")) parser.add_argument("-r", "--remove", action="store_true", help=_("Disable repository")) parser.add_argument("-s", "--enable-source", action="count", default=0, help=_("Allow downloading of the source packages from the repository")) parser.add_argument("-c", "--component", action="append", default=[], help=_("Components to use with the repository")) parser.add_argument("-p", "--pocket", help=_("Add entry for this pocket")) parser.add_argument("-y", "--yes", action="store_true", help=_("Assume yes to all queries")) parser.add_argument("-n", "--no-update", dest="update", action="store_false", help=_("Do not update package cache after adding")) parser.add_argument("-u", "--update", action="store_true", default=True, help=argparse.SUPPRESS) parser.add_argument("-l", "--login", action="store_true", help=_("Login to Launchpad.")) parser.add_argument("--dry-run", action="store_true", help=_("Don't actually make any changes.")) group = parser.add_mutually_exclusive_group() group.add_argument("-L", "--list", action="store_true", help=_("List currently configured repositories")) group.add_argument("-P", "--ppa", help=_("PPA to add")) group.add_argument("-C", "--cloud", help=_("Cloud Archive to add")) group.add_argument("-U", "--uri", help=_("Archive URI to add")) group.add_argument("-S", "--sourceslist", nargs='+', default=[], help=_("Full sources.list entry line to add")) group.add_argument("line", nargs='*', default=[], help=_("sources.list line to add (deprecated)")) self.parser = parser self.options = self.parser.parse_args(args) @property def dry_run(self): return self.options.dry_run @property def enable_source(self): return self.options.enable_source > 0 @property def components(self): return self.options.component @property def pocket(self): if self.options.pocket: return self.options.pocket.lower() return None @property def source_type(self): return self.distro.source_type @property def binary_type(self): return self.distro.binary_type def is_components(self, comps): if not comps: return False return set(comps.split()) <= set([comp.name for comp in self.distro.source_template.components]) def apt_update(self): if self.options.update and not self.dry_run: # We prefer to run apt-get update here. The built-in update support # does not have any progress, and only works for shortcuts. Moving # it to something like save() and using apt.progress.text would # solve the problem, but the new errors might cause problems with # the dbus server or other users of the API. Also, it's unclear # how good the text progress is or how to pass it best. subprocess.run(['apt-get', 'update']) def prompt_user(self): if self.dry_run: print(_("DRY-RUN mode: no modifications will be made")) return if not self.options.yes and sys.stdin.isatty() and not "FORCE_ADD_APT_REPOSITORY" in os.environ: try: input(_("Press [ENTER] to continue or Ctrl-c to cancel.")) except KeyboardInterrupt: print(_("Aborted.")) sys.exit(1) def prompt_user_shortcut(self, shortcut): '''Display more information about the shortcut / ppa info''' print(_("Repository: '%s'") % shortcut.SourceEntry().line) if shortcut.description: # strip ANSI escape sequences description = re.sub(r"(\x9B|\x1B\[)[0-?]*[ -/]*[@-~]", "", shortcut.description) print(_("Description:")) print(description) if shortcut.web_link: print(_("More info: %s") % shortcut.web_link) if self.options.remove: print(_("Removing repository.")) else: print(_("Adding repository.")) self.prompt_user() def change_components(self): collapsedlist = CollapsedSourcesList(self.sourceslist, files=[SOURCESLIST]) for entry in collapsedlist.filter(invalid=False, disabled=False): if self.options.remove: removed = set(entry.comps) & set(self.components) if removed: entry.comps = list(set(entry.comps) - set(self.components)) print(_("Removed %s from: %s") % (' '.join(removed), str(entry))) else: added = set(self.components) - set(entry.comps) if added: entry.comps = list(set(entry.comps) | set(self.components)) print(_("Added %s to: %s") % (' '.join(added), str(entry))) if not self.dry_run: self.sourceslist.save() def _add_pocket(self, collapsedlist): binary_entries = collapsedlist.filter(invalid=False, disabled=False, type=self.binary_type) for uri in set([e.uri for e in binary_entries]): pockets = {e.pocket: e for e in binary_entries if e.uri == uri} if self.pocket in pockets: print(_("Existing: %s") % str(pockets[self.pocket])) continue if 'release' in pockets: entry = copy(pockets['release']) else: # without existing release pocket, use default comps comps = ['main', 'restricted'] entry = next(iter(pockets.values()))._replace(comps=comps) entry = entry._replace(pocket=self.pocket) print(_("Adding: %s") % str(entry)) collapsedlist.add_entry(entry) def _remove_pocket(self, collapsedlist): for entry in collapsedlist.filter(invalid=False, disabled=False, pocket=self.pocket): entry.set_enabled(False) print(_("Disabled: %s") % str(entry)) def change_pocket(self): collapsedlist = CollapsedSourcesList(self.sourceslist, files=[SOURCESLIST]) if self.options.remove: self._remove_pocket(collapsedlist) else: self._add_pocket(collapsedlist) if not self.dry_run: self.sourceslist.save() def _enable_source(self): collapsedlist = CollapsedSourcesList(self.sourceslist) # Check each disabled deb-src line, enable if matching deb line exists for s in self.sourceslist.filter(invalid=False, disabled=True, type=self.source_type): b_merged_entry = collapsedlist.get_merged_entry(s._replace(type=self.binary_type, disabled=False)) if not b_merged_entry: # no matching binary lines, leave the source line disabled continue disabled_comps = list(set(s.comps) - set(b_merged_entry.comps)) enabled_comps = list(set(s.comps) & set(b_merged_entry.comps)) if not enabled_comps: # we can't enable any of the line continue if disabled_comps: index = self.sourceslist.list.index(s) self.sourceslist.list.insert(index + 1, s._replace(comps=disabled_comps)) s.comps = enabled_comps s.set_enabled(True) collapsedlist.refresh() print(_("Enabled: %s") % str(s).strip()) # Check each enabled deb line, to warn about missing deb-src lines, or add one if -ss for b in self.sourceslist.filter(invalid=False, disabled=False, type=self.binary_type): s = b._replace(type=self.source_type) s_merged_entry = collapsedlist.get_merged_entry(s) scomps = set(s_merged_entry.comps if s_merged_entry else []) missing_comps = list(set(b.comps) - scomps) if not missing_comps: continue s.comps = missing_comps if self.options.enable_source > 1: # with multiple -s, add new deb-src entries if needed for all deb entries collapsedlist.add_entry(s, after=b) print(_("Added: %s") % str(s).strip()) else: # if only one -s used, don't add missing deb-src, just notify print(_("Warning, missing deb-src for: %s") % str(s).strip()) def _disable_source(self): for s in self.sourceslist.filter(invalid=False, disabled=False, type=self.source_type): s.set_enabled(False) print(_("Disabled: %s") % str(s).strip()) def change_source(self): if self.options.remove: self._disable_source() else: self._enable_source() if not self.dry_run: self.sourceslist.save() def global_change(self): if self.components: if self.options.remove: print(_("Removing component(s) '%s' from all repositories.") % ', '.join(self.components)) else: print(_("Adding component(s) '%s' to all repositories.") % ', '.join(self.components)) if self.pocket: if self.options.remove: print(_("Removing pocket %s for all repositories.") % self.pocket) else: print(_("Adding pocket %s for all repositories.") % self.pocket) if self.enable_source: if self.options.remove: print(_("Disabling %s for all repositories.") % self.source_type) else: print(_("Enabling %s for all repositories.") % self.source_type) self.prompt_user() if self.components: self.change_components() if self.pocket: self.change_pocket() if self.enable_source: self.change_source() def show_list(self): for s in CollapsedSourcesList(self.sourceslist): if s.invalid or s.disabled: continue if not self.enable_source and s.type == self.source_type: continue print(s) def main(self, args=sys.argv[1:]): self.parse_args(args) if not any((self.dry_run, self.options.list, os.geteuid() == 0)): print(_("Error: must run as root")) return False line = ' '.join(self.options.line) if line == '-': line = sys.stdin.readline().strip() # if 'line' is only (valid) components, handle as if only -c was used with no line if self.is_components(line): self.options.component += line.split() line = '' if self.options.ppa: source = self.options.ppa if not ':' in source: source = 'ppa:' + source handler = PPAShortcutHandler elif self.options.cloud: source = self.options.cloud if not ':' in source: source = 'uca:' + source handler = CloudArchiveShortcutHandler elif self.options.uri: source = self.options.uri handler = URIShortcutHandler elif self.options.sourceslist: source = ' '.join(self.options.sourceslist) handler = SourcesListShortcutHandler elif line: source = line handler = shortcut_handler elif self.options.list: self.show_list() return True elif any((self.enable_source, self.components, self.pocket)): self.global_change() self.apt_update() return True else: print(_("Error: no actions requested.")) self.parser.print_help() return False try: shortcut_params = { 'login': self.options.login, 'enable_source': self.enable_source, 'dry_run': self.dry_run, 'components': self.components, 'pocket': self.pocket, } shortcut = handler(source, **shortcut_params) except ShortcutException as e: print(e) return False self.prompt_user_shortcut(shortcut) if self.options.remove: shortcut.remove() else: shortcut.add() self.apt_update() return True if __name__ == '__main__': addaptrepo = AddAptRepository() sys.exit(0 if addaptrepo.main() else 1)