0x1949 Team - FAZEMRX - MANAGER
Edit File: MetaRelease.py
# MetaRelease.py # -*- Mode: Python; indent-tabs-mode: nil; tab-width: 4; coding: utf-8 -*- # # Copyright (c) 2004,2005 Canonical # # Author: Michael Vogt <michael.vogt@ubuntu.com> # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as # published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA from __future__ import absolute_import, print_function import apt import apt_pkg import distro_info try: import configparser except ImportError: import ConfigParser as configparser try: from http.client import BadStatusLine except ImportError: from httplib import BadStatusLine import logging import email.utils import os import socket import sys import time import threading try: from urllib.parse import quote from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError except ImportError: from urllib2 import HTTPError, Request, URLError, urlopen, quote from .utils import (get_lang, get_dist, get_dist_version, get_ubuntu_flavor, get_ubuntu_flavor_name) class MetaReleaseParseError(Exception): pass class Dist(object): def __init__(self, name, version, date, supported): self.name = name self.version = version self.date = date self.supported = supported self.releaseNotesURI = None self.releaseNotesHtmlUri = None self.upgradeTool = None self.upgradeToolSig = None # the server may report that the upgrade is broken currently self.upgrade_broken = None class MetaReleaseCore(object): """ A MetaReleaseCore object abstracts the list of released distributions. """ DEBUG = "DEBUG_UPDATE_MANAGER" in os.environ # some constants CONF = "/etc/update-manager/release-upgrades" CONF_METARELEASE = "/etc/update-manager/meta-release" def __init__(self, useDevelopmentRelease=False, useProposed=False, debug=False, forceLTS=False, forceDownload=False, cache=None): if debug: self.DEBUG = True self._debug("MetaRelease.__init__() useDevel=%s useProposed=%s" % (useDevelopmentRelease, useProposed)) # force download instead of sending if-modified-since self.forceDownload = forceDownload self.useDevelopmentRelease = useDevelopmentRelease # information about the available dists self.downloaded = threading.Event() self.upgradable_to = None self.new_dist = None if cache is None: cache = apt.Cache() self.flavor = get_ubuntu_flavor(cache=cache) self.flavor_name = get_ubuntu_flavor_name(cache=cache) self.current_dist_name = get_dist() self.current_dist_version = get_dist_version() self.no_longer_supported = None self.prompt = None # default (if the conf file is missing) base_uri = "https://changelogs.ubuntu.com/" self.METARELEASE_URI = base_uri + "meta-release" self.METARELEASE_URI_LTS = base_uri + "meta-release-lts" self.METARELEASE_URI_UNSTABLE_POSTFIX = "-development" self.METARELEASE_URI_PROPOSED_POSTFIX = "-proposed" # check the meta-release config first parser = configparser.ConfigParser() if os.path.exists(self.CONF_METARELEASE): try: parser.read(self.CONF_METARELEASE) except configparser.Error as e: sys.stderr.write("ERROR: failed to read '%s':\n%s" % ( self.CONF_METARELEASE, e)) return # make changing the metarelease file and the location # for the files easy if parser.has_section("METARELEASE"): sec = "METARELEASE" for k in ["URI", "URI_LTS", "URI_UNSTABLE_POSTFIX", "URI_PROPOSED_POSTFIX"]: if parser.has_option(sec, k): self._debug("%s: %s " % (self.CONF_METARELEASE, parser.get(sec, k))) setattr(self, "%s_%s" % (sec, k), parser.get(sec, k)) # check the config file first to figure if we want lts upgrades only parser = configparser.ConfigParser() if os.path.exists(self.CONF): try: parser.read(self.CONF) except configparser.Error as e: sys.stderr.write("ERROR: failed to read '%s':\n%s" % ( self.CONF, e)) return # now check which specific url to use if parser.has_option("DEFAULT", "Prompt"): prompt = parser.get("DEFAULT", "Prompt").lower() if (prompt == "never" or prompt == "no"): self.prompt = 'never' # nothing to do for this object # FIXME: what about no longer supported? self.downloaded.set() return elif prompt == "lts": self.prompt = 'lts' # the Prompt=lts setting only makes sense when running on # a LTS, otherwise it would result in users not receiving # any distro upgrades di = distro_info.UbuntuDistroInfo() if di.is_lts(self.current_dist_name): self.METARELEASE_URI = self.METARELEASE_URI_LTS else: self._debug("Prompt=lts for non-LTS, ignoring") else: self.prompt = 'normal' # needed for the _tryUpgradeSelf() code in DistUpgradeController if forceLTS: self.METARELEASE_URI = self.METARELEASE_URI_LTS # devel and proposed "just" change the postfix if useDevelopmentRelease: self.METARELEASE_URI += self.METARELEASE_URI_UNSTABLE_POSTFIX elif useProposed: self.METARELEASE_URI += self.METARELEASE_URI_PROPOSED_POSTFIX self._debug("metarelease-uri: %s" % self.METARELEASE_URI) self.metarelease_information = None if not self._buildMetaReleaseFile(): self._debug("_buildMetaReleaseFile failed") return # we start the download thread here and we have a timeout threading.Thread(target=self.download).start() #threading.Thread(target=self.check).start() def _buildMetaReleaseFile(self): # build the metarelease_file name self.METARELEASE_FILE = os.path.join( "/var/lib/update-manager/", os.path.basename(self.METARELEASE_URI)) # check if we can write to the global location, if not, # write to homedir try: open(self.METARELEASE_FILE, "a").close() except IOError: cache_dir = os.getenv( "XDG_CACHE_HOME", os.path.expanduser("~/.cache")) # Take special care when creating this directory; ~/.cache needs # to be created with mode 0700, but the other directories do # not. cache_parent_dir = os.path.split(cache_dir)[0] if not os.path.exists(cache_parent_dir): try: os.makedirs(cache_parent_dir) except OSError as e: sys.stderr.write("mkdir() failed: '%s'" % e) return False if not os.path.exists(cache_dir): try: os.mkdir(cache_dir, 0o700) except OSError as e: sys.stderr.write("mkdir() failed: '%s'" % e) return False path = os.path.join(cache_dir, 'update-manager-core') if not os.path.exists(path): try: os.mkdir(path) except OSError as e: sys.stderr.write("mkdir() failed: '%s'" % e) return False self.METARELEASE_FILE = os.path.join( path, os.path.basename(self.METARELEASE_URI)) # if it is empty, remove it to avoid I-M-S hits on empty file try: if os.path.getsize(self.METARELEASE_FILE) == 0: os.unlink(self.METARELEASE_FILE) except Exception: pass return True def dist_no_longer_supported(self, dist): """ virtual function that is called when the distro is no longer supported """ self.no_longer_supported = dist def new_dist_available(self, dist): """ virtual function that is called when a new distro release is available """ self.new_dist = dist def parse(self): self._debug("MetaRelease.parse()") current_dist_name = self.current_dist_name self._debug("current dist name: '%s'" % current_dist_name) current_dist = None dists = [] # parse the metarelease_information file index_tag = apt_pkg.TagFile(self.metarelease_information) try: while index_tag.step(): for required_key in ("Dist", "Version", "Supported", "Date"): if required_key not in index_tag.section: raise MetaReleaseParseError( "Required key '%s' missing" % required_key) name = index_tag.section["Dist"] self._debug("found distro name: '%s'" % name) rawdate = index_tag.section["Date"] parseddate = list(email.utils.parsedate(rawdate)) parseddate[8] = 0 # assume no DST date = time.mktime(tuple(parseddate)) supported = int(index_tag.section["Supported"]) version = index_tag.section["Version"] # add the information to a new date object dist = Dist(name, version, date, supported) if "ReleaseNotes" in index_tag.section: dist.releaseNotesURI = index_tag.section["ReleaseNotes"] lang = get_lang() if lang: dist.releaseNotesURI += "?lang=%s" % lang if "ReleaseNotesHtml" in index_tag.section: dist.releaseNotesHtmlUri = index_tag.section[ "ReleaseNotesHtml"] query = self._get_release_notes_uri_query_string(dist) if query: dist.releaseNotesHtmlUri += query if "UpgradeTool" in index_tag.section: dist.upgradeTool = index_tag.section["UpgradeTool"] if "UpgradeToolSignature" in index_tag.section: dist.upgradeToolSig = index_tag.section[ "UpgradeToolSignature"] if "UpgradeBroken" in index_tag.section: dist.upgrade_broken = index_tag.section["UpgradeBroken"] dists.append(dist) if name == current_dist_name: current_dist = dist except apt_pkg.Error: raise MetaReleaseParseError("Unable to parse %s" % self.METARELEASE_URI) self.metarelease_information.close() self.metarelease_information = None # first check if the current runing distro is in the meta-release # information. if not, we assume that we run on something not # supported and silently return if current_dist is None: self._debug("current dist not found in meta-release file\n") return False # then see what we can upgrade to upgradable_to = "" for dist in dists: if dist.date > current_dist.date: # Only offer to upgrade to an unsupported release if running # with useDevelopmentRelease, this way one can upgrade from an # LTS release to the next supported non-LTS release e.g. from # 14.04 to 15.04. if not dist.supported and not self.useDevelopmentRelease: continue upgradable_to = dist self._debug("new dist: %s" % upgradable_to) break # only warn if unsupported and a new dist is available (because # the development version is also unsupported) if upgradable_to != "" and not current_dist.supported: self.upgradable_to = upgradable_to self.dist_no_longer_supported(current_dist) if upgradable_to != "": self.upgradable_to = upgradable_to self.new_dist_available(upgradable_to) # parsing done and sucessfully return True # the network thread that tries to fetch the meta-index file # can't touch the gui, runs as a thread def download(self): self._debug("MetaRelease.download()") lastmodified = 0 req = Request(self.METARELEASE_URI) # make sure that we always get the latest file (#107716) req.add_header("Cache-Control", "No-Cache") req.add_header("Pragma", "no-cache") if os.access(self.METARELEASE_FILE, os.W_OK): try: lastmodified = os.stat(self.METARELEASE_FILE).st_mtime except OSError: pass if lastmodified > 0 and not self.forceDownload: req.add_header("If-Modified-Since", time.asctime(time.gmtime(lastmodified))) try: # open uri = urlopen(req, timeout=20) # sometime there is a root owned meta-relase file # there, try to remove it so that we get it # with proper permissions if (os.path.exists(self.METARELEASE_FILE) and not os.access(self.METARELEASE_FILE, os.W_OK)): try: os.unlink(self.METARELEASE_FILE) except OSError as e: print("Can't unlink '%s' (%s)" % (self.METARELEASE_FILE, e)) # we may get exception here on e.g. disk full try: f = open(self.METARELEASE_FILE, "w+") for line in uri.readlines(): f.write(line.decode("UTF-8")) f.flush() f.seek(0, 0) self.metarelease_information = f except IOError: pass uri.close() # http error except HTTPError as e: # mvo: only reuse local info on "not-modified" if e.code == 304 and os.path.exists(self.METARELEASE_FILE): self._debug("reading file '%s'" % self.METARELEASE_FILE) self.metarelease_information = open(self.METARELEASE_FILE, "r") else: self._debug("result of meta-release download: '%s'" % e) # generic network error except (URLError, BadStatusLine, socket.timeout) as e: self._debug("result of meta-release download: '%s'" % e) print("Failed to connect to %s. Check your Internet connection " "or proxy settings" % self.METARELEASE_URI) # now check the information we have if self.metarelease_information is not None: self._debug("have self.metarelease_information") try: self.parse() except Exception: logging.exception("parse failed for '%s'" % self.METARELEASE_FILE) # no use keeping a broken file around os.remove(self.METARELEASE_FILE) # we don't want to keep a meta-release file around when it # has a "Broken" flag, this ensures we are not bitten by # I-M-S/cache issues if self.new_dist and self.new_dist.upgrade_broken: os.remove(self.METARELEASE_FILE) else: self._debug("NO self.metarelease_information") self.downloaded.set() @property def downloading(self): return not self.downloaded.is_set() def _get_release_notes_uri_query_string(self, dist): q = "?" # get the lang lang = get_lang() if lang: q += "lang=%s&" % lang # get the os q += "os=%s&" % self.flavor # get the version to upgrade to q += "ver=%s" % dist.version # the archive didn't respond well to ? being %3F return quote(q, '/?') def _debug(self, msg): if self.DEBUG: sys.stderr.write(msg + "\n") if __name__ == "__main__": meta = MetaReleaseCore(False, False)