0x1949 Team - FAZEMRX - MANAGER
Edit File: MyCache.py
# MyCache.py # -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*- # # Copyright (c) 2004-2008 Canonical # # Author: Michael Vogt <mvo@debian.org> # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA from __future__ import absolute_import, print_function import warnings warnings.filterwarnings("ignore", "apt API not stable yet", FutureWarning) import apt import apt_pkg import logging import os try: from urllib.error import HTTPError from urllib.request import urlopen from urllib.parse import urlsplit except ImportError: from urllib2 import HTTPError, urlopen from urlparse import urlsplit try: from http.client import BadStatusLine except ImportError: from httplib import BadStatusLine import socket import subprocess import re import DistUpgrade.DistUpgradeCache from gettext import gettext as _ try: from launchpadlib.launchpad import Launchpad except ImportError: Launchpad = None SYNAPTIC_PINFILE = "/var/lib/synaptic/preferences" CHANGELOGS_POOL = "https://changelogs.ubuntu.com/changelogs/pool/" CHANGELOGS_URI = CHANGELOGS_POOL + "%s/%s/%s/%s_%s/%s" class HttpsChangelogsUnsupportedError(Exception): """ https changelogs with credentials are unsupported because of the lack of certitifcation validation in urllib2 which allows MITM attacks to steal the credentials """ pass class MyCache(DistUpgrade.DistUpgradeCache.MyCache): CHANGELOG_ORIGIN = "Ubuntu" def __init__(self, progress, rootdir=None): apt.Cache.__init__(self, progress, rootdir) # save for later self.rootdir = rootdir # raise if we have packages in reqreinst state # and let the caller deal with that (runs partial upgrade) assert len(self.req_reinstall_pkgs) == 0 # check if the dpkg journal is ok (we need to do that here # too because libapt will only do it when it tries to lock # the packaging system) assert(not self._dpkgJournalDirty()) # init the regular cache self._initDepCache() self.all_changes = {} self.all_news = {} self.pro_versions = {} # on broken packages, try to fix via saveDistUpgrade() if self._depcache.broken_count > 0: self.saveDistUpgrade() assert (self._depcache.broken_count == 0 and self._depcache.del_count == 0) self.launchpad = None # generate versioned_kernel_pkgs_regexp for later use apt_versioned_kernel_pkgs = apt_pkg.config.value_list( "APT::VersionedKernelPackages") if apt_versioned_kernel_pkgs: self.versioned_kernel_pkgs_regexp = re.compile("(" + "|".join( ["^" + p for p in apt_versioned_kernel_pkgs]) + ")") running_kernel_version = subprocess.check_output( ["uname", "-r"], universal_newlines=True).rstrip() self.running_kernel_pkgs_regexp = re.compile("(" + "|".join( [("^" + p + ".*" + running_kernel_version) if not p.startswith(".*") else (running_kernel_version + p) for p in apt_versioned_kernel_pkgs]) + ")") else: self.versioned_kernel_pkgs_regexp = None self.running_kernel_pkgs_regexp = None def _dpkgJournalDirty(self): """ test if the dpkg journal is dirty (similar to debSystem::CheckUpdates) """ d = os.path.dirname( apt_pkg.config.find_file("Dir::State::status")) + "/updates" for f in os.listdir(d): if re.match("[0-9]+", f): return True return False def _initDepCache(self): #apt_pkg.config.set("Debug::pkgPolicy","1") #self.depcache = apt_pkg.GetDepCache(self.cache) #self._depcache = apt_pkg.GetDepCache(self._cache) self._depcache.read_pinfile() if os.path.exists(SYNAPTIC_PINFILE): self._depcache.read_pinfile(SYNAPTIC_PINFILE) self._depcache.init() def clear(self): self._initDepCache() @property def required_download(self): """ get the size of the packages that are required to download """ pm = apt_pkg.PackageManager(self._depcache) fetcher = apt_pkg.Acquire() pm.get_archives(fetcher, self._list, self._records) return fetcher.fetch_needed @property def install_count(self): return self._depcache.inst_count def keep_count(self): return self._depcache.keep_count @property def del_count(self): return self._depcache.del_count def _check_dependencies(self, target, deps): """Return True if any of the dependencies in deps match target.""" # TODO: handle virtual packages for dep_or in deps: if not dep_or: continue match = True for base_dep in dep_or: if (base_dep.name != target.package.shortname or not apt_pkg.check_dep( target.version, base_dep.relation, base_dep.version)): match = False if match: return True return False def find_removal_justification(self, pkg): target = pkg.installed if not target: return False for cpkg in self: candidate = cpkg.candidate if candidate is not None: if (self._check_dependencies( target, candidate.get_dependencies("Conflicts")) and self._check_dependencies( target, candidate.get_dependencies("Replaces"))): logging.info( "%s Conflicts/Replaces %s; allowing removal" % ( candidate.package.shortname, pkg.shortname)) return True return False def saveDistUpgrade(self): """ this functions mimics a upgrade but will never remove anything """ #self._apply_dselect_upgrade() self._depcache.upgrade(True) wouldDelete = self._depcache.del_count if wouldDelete > 0: deleted_pkgs = [pkg for pkg in self if pkg.marked_delete] assert wouldDelete == len(deleted_pkgs) for pkg in deleted_pkgs: if self.find_removal_justification(pkg): wouldDelete -= 1 if wouldDelete > 0: self.clear() assert (self._depcache.broken_count == 0 and self._depcache.del_count == 0) else: assert self._depcache.broken_count == 0 #self._apply_dselect_upgrade() self._depcache.upgrade() return wouldDelete def _strip_epoch(self, verstr): " strip of the epoch " vers_no_epoch = verstr.split(":") if len(vers_no_epoch) > 1: verstr = "".join(vers_no_epoch[1:]) return verstr def _get_changelog_or_news(self, name, fname, strict_versioning=False, changelogs_uri=None): " helper that fetches the file in question " # don't touch the gui in this function, it needs to be thread-safe pkg = self[name] # get the src package name srcpkg = pkg.candidate.source_name # assume "main" section src_section = "main" # use the section of the candidate as a starting point section = pkg._pcache._depcache.get_candidate_ver(pkg._pkg).section # get the source version srcver_epoch = pkg.candidate.source_version srcver = self._strip_epoch(srcver_epoch) split_section = section.split("/") if len(split_section) > 1: src_section = split_section[0] # lib is handled special prefix = srcpkg[0] if srcpkg.startswith("lib"): prefix = "lib" + srcpkg[3] # the changelogs_uri argument overrides the default changelogs_uri, # this is useful for e.g. PPAs where we construct the changelogs # path differently if changelogs_uri: uri = changelogs_uri else: uri = CHANGELOGS_URI % (src_section, prefix, srcpkg, srcpkg, srcver, fname) # https uris are not supported when they contain a username/password # because the urllib2 https implementation will not check certificates # and so its possible to do a man-in-the-middle attack to steal the # credentials res = urlsplit(uri) if res.scheme == "https" and res.username: raise HttpsChangelogsUnsupportedError( "https locations with username/password are not" "supported to fetch changelogs") # print("Trying: %s " % uri) changelog = urlopen(uri) #print(changelog.read()) # do only get the lines that are new alllines = "" regexp = "^%s \\((.*)\\)(.*)$" % (re.escape(srcpkg)) while True: line = changelog.readline().decode("UTF-8", "replace") if line == "": break match = re.match(regexp, line) if match: # strip epoch from installed version # and from changelog too installed = getattr(pkg.installed, "version", None) if installed and ":" in installed: installed = installed.split(":", 1)[1] changelogver = match.group(1) if changelogver and ":" in changelogver: changelogver = changelogver.split(":", 1)[1] # we test for "==" here for changelogs # to ensure that the version # is actually really in the changelog - if not # just display it all, this catches cases like: # gcc-defaults with "binver=4.3.1" and srcver=1.76 # # for NEWS.Debian we do require the changelogver > installed if strict_versioning: if (installed and apt_pkg.version_compare(changelogver, installed) < 0): break else: if (installed and apt_pkg.version_compare(changelogver, installed) == 0): break alllines = alllines + line return alllines def _extract_ppa_changelog_uri(self, name): """Return the changelog URI from the Launchpad API Return None in case of an error. """ if not Launchpad: logging.warning("Launchpadlib not available, cannot retrieve PPA " "changelog") return None cdt = self[name].candidate for uri in cdt.uris: if urlsplit(uri).hostname != 'ppa.launchpad.net': continue match = re.search('http.*/(.*)/(.*)/ubuntu/.*', uri) if match is not None: user, ppa = match.group(1), match.group(2) break else: logging.error("Unable to find a valid PPA candidate URL.") return # Login on launchpad if we are not already if self.launchpad is None: self.launchpad = Launchpad.login_anonymously('update-manager', 'production', version='devel') archive = self.launchpad.archives.getByReference( reference='~%s/ubuntu/%s' % (user, ppa) ) if archive is None: logging.error("Unable to retrieve the archive from the Launchpad " "API.") return spphs = archive.getPublishedSources(source_name=cdt.source_name, exact_match=True, version=cdt.source_version) if not spphs: logging.error("No published sources were retrieved from the " "Launchpad API.") return return spphs[0].changelogUrl() def _guess_third_party_changelogs_uri_by_source(self, name): pkg = self[name] deb_uri = pkg.candidate.uri if deb_uri is None: return None srcrec = pkg.candidate.record.get("Source") if not srcrec: return None # srcpkg can be "apt" or "gcc-default (1.0)" srcpkg = srcrec.split("(")[0].strip() if "(" in srcrec: srcver = srcrec.split("(")[1].rstrip(")") else: srcver = pkg.candidate.source_version base_uri = deb_uri.rpartition("/")[0] return base_uri + "/%s_%s.changelog" % (srcpkg, srcver) def _guess_third_party_changelogs_uri_by_binary(self, name): """ guess changelogs uri based on ArchiveURI by replacing .deb with .changelog """ # there is always a pkg and a pkg.candidate, no need to add # check here pkg = self[name] deb_uri = pkg.candidate.uri if deb_uri: return "%s.changelog" % deb_uri.rsplit(".", 1)[0] return None def get_news_and_changelog(self, name, lock): self.get_news(name) self.get_changelog(name) try: lock.release() except Exception: pass def get_news(self, name): " get the NEWS.Debian file from the changelogs location " try: news = self._get_changelog_or_news(name, "NEWS.Debian", True) except Exception: return if news: self.all_news[name] = news def _fetch_changelog_for_third_party_package(self, name, origins): # Special case for PPAs changelogs_uri_ppa = None for origin in origins: if origin.origin.startswith('LP-PPA-'): try: changelogs_uri_ppa = self._extract_ppa_changelog_uri(name) break except Exception: logging.exception("Unable to connect to the Launchpad " "API.") # Try non official changelog location changelogs_uri_binary = \ self._guess_third_party_changelogs_uri_by_binary(name) changelogs_uri_source = \ self._guess_third_party_changelogs_uri_by_source(name) error_message = "" for changelogs_uri in [changelogs_uri_ppa, changelogs_uri_binary, changelogs_uri_source]: if changelogs_uri: try: changelog = self._get_changelog_or_news( name, "changelog", False, changelogs_uri) self.all_changes[name] += changelog except (HTTPError, HttpsChangelogsUnsupportedError): # no changelogs_uri or 404 error_message = _( "This update does not come from a " "source that supports changelogs.") except (IOError, BadStatusLine, socket.error): # network errors and others logging.exception("error on changelog fetching") error_message = _( "Failed to download the list of changes. \n" "Please check your Internet connection.") self.all_changes[name] += error_message # If the machine is not attached to Ubuntu Pro, Update Manager advertises # the upgrades that would be available if it were attached. # As that is unbeknownst to Apt, we need this map to show the correct # version of each upgradable-if-pro-subscribed package. def create_pro_cache(self, pro_pkgs): for (name, version, _a, _a) in pro_pkgs: self.pro_versions[name] = version def get_changelog(self, name): " get the changelog file from the changelog location " origins = self[name].candidate.origins version = self.pro_versions.get(name, self[name].candidate.version) self.all_changes[name] = _("Changes for %s versions:\n" "Installed version: %s\n" "Available version: %s\n\n") % \ (name, getattr(self[name].installed, "version", None), version) if self.CHANGELOG_ORIGIN not in [o.origin for o in origins]: self._fetch_changelog_for_third_party_package(name, origins) return # fixup epoch handling version srcpkg = self[name].candidate.source_name srcver_epoch = self[name].candidate.source_version.replace(':', '%3A') try: changelog = self._get_changelog_or_news(name, "changelog") if len(changelog) == 0: changelog = _("The changelog does not contain any relevant " "changes.\n\n" "Please use http://launchpad.net/ubuntu/+source/" "%s/%s/+changelog\n" "until the changes become available or try " "again later.") % (srcpkg, srcver_epoch) except HTTPError: changelog = _("The list of changes is not available yet.\n\n" "Please use http://launchpad.net/ubuntu/+source/" "%s/%s/+changelog\n" "until the changes become available or try again " "later.") % (srcpkg, srcver_epoch) except (IOError, BadStatusLine, socket.error) as e: print("caught exception: ", e) changelog = _("Failed to download the list " "of changes. \nPlease " "check your Internet " "connection.") self.all_changes[name] += changelog