0x1949 Team - FAZEMRX - MANAGER
Edit File: unattended-upgrade-shutdown
#!/usr/bin/python3 # Copyright (c) 2009-2018 Canonical Ltd # # AUTHOR: # Michael Vogt <mvo@ubuntu.com> # Balint Reczey <rbalint@ubuntu.com> # # unattended-upgrade-shutdown - helper that checks if a # unattended-upgrade is in progress and waits until it exists # # This file is part of unattended-upgrades # # unattended-upgrades is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License as published # by the Free Software Foundation; either version 2 of the License, or (at # your option) any later version. # # unattended-upgrades is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with unattended-upgrades; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # import copy import dbus import signal import sys import time import logging import logging.handlers import gettext import subprocess import os.path # for dbus signal handling try: from dbus.mainloop.glib import DBusGMainLoop from gi.repository import GLib except ImportError: pass from optparse import OptionParser, Values Values # pyflakes from gettext import gettext as _ from threading import Event try: import apt_pkg except Exception: # if there is no python-apt no unattended-upgrades can run so not # need to stop the shutdown logging.exception("importing of apt_pkg failed, exiting") sys.exit(0) def do_usplash(msg): # type: (str) -> None if os.path.exists("/sbin/usplash_write"): logging.debug("Running usplash_write") subprocess.call(["/sbin/usplash_write", "TEXT", msg]) subprocess.call(["/sbin/usplash_write", "PULSATE"]) def do_plymouth(msg): # type: (str) -> None if os.path.exists("/bin/plymouth"): for line in msg.split("\n"): logging.debug("Running plymouth --text") subprocess.call(["/bin/plymouth", "message", "--text", line]) def log_msg(msg, level=logging.WARN): # type: (str, int) -> None """ helper that will print msg to usplash, plymouth, console """ logging.log(level, msg) do_plymouth(msg) do_usplash(msg) def log_progress(): # type: () -> None """ helper to log the install progress (if any) """ # wait a some seconds and try again msg = _("Unattended-upgrade in progress during shutdown, " "please don't turn off the computer") # progress info progress = "/var/run/unattended-upgrades.progress" if os.path.exists(progress): msg += "\n" + open(progress).read() # log it log_msg(msg) def signal_stop_unattended_upgrade(): """ send SIGTERM to running unattended-upgrade if there is any """ pidfile = "/var/run/unattended-upgrades.pid" if os.path.exists(pidfile): pid = int(open(pidfile).read()) logging.debug("found running unattended-upgrades pid %s" % pid) try: os.kill(pid, signal.SIGTERM) except ProcessLookupError: logging.debug("sending SIGTERM failed because unattended-upgrades " "already stopped") def exit_log_result(success): if success: log_msg(_("All upgrades installed"), logging.INFO) sys.exit(0) else: log_msg(_("Unattended-upgrades stopped. There may be upgrades" " left to be installed in the next run."), logging.INFO) sys.exit(1) class UnattendedUpgradesShutdown(): def __init__(self, options): # type: (Values) -> None self.options = options self.max_delay = options.delay * 60 self.iter_timer_set = False self.apt_pkg_reinit_done = None self.shutdown_pending = False self.on_shutdown_mode = None self.on_shutdown_mode_uu_proc = None self.start_time = None self.lock_was_taken = False self.signal_sent = False self.stop_signal_received = Event() try: hasattr(GLib, "MainLoop") DBusGMainLoop(set_as_default=True) except NameError: pass try: self.inhibit_lock = self.get_inhibit_shutdown_lock() except dbus.exceptions.DBusException: logging.warning("Could not get delay inhibitor lock") self.inhibit_lock = None self.logind_proxy = None self.wait_period = min(3, self.get_inhibit_max_delay() / 3) self.preparing_for_shutdown = False def get_logind_proxy(self): """ Get logind dbus proxy object """ if not self.logind_proxy: bus = dbus.SystemBus() if self.inhibit_lock is None: # try to get inhibit_lock or throw exception quickly when # logind is down self.inhibit_lock = self.get_inhibit_shutdown_lock() self.logind_proxy = bus.get_object( 'org.freedesktop.login1', '/org/freedesktop/login1') return self.logind_proxy def get_inhibit_shutdown_lock(self): """ Take delay inhibitor lock """ bus = dbus.SystemBus() return bus.call_blocking( 'org.freedesktop.login1', '/org/freedesktop/login1', 'org.freedesktop.login1.Manager', 'Inhibit', 'ssss', ('shutdown', 'Unattended Upgrades Shutdown', _('Stop ongoing upgrades or perform upgrades before shutdown'), 'delay'), timeout=2.0) def get_inhibit_max_delay(self): try: logind_proxy = self.get_logind_proxy() getter_interface = dbus.Interface( logind_proxy, dbus_interface='org.freedesktop.DBus.Properties') return (getter_interface.Get( "org.freedesktop.login1.Manager", "InhibitDelayMaxUSec") / (1000 * 1000)) except dbus.exceptions.DBusException: return 3 def is_preparing_for_shutdown(self): if not self.shutdown_pending: try: logind_proxy = self.get_logind_proxy() getter_interface = dbus.Interface( logind_proxy, dbus_interface='org.freedesktop.DBus.Properties') self.shutdown_pending = getter_interface.Get( "org.freedesktop.login1.Manager", "PreparingForShutdown") except dbus.exceptions.DBusException: return False return self.shutdown_pending def start_iterations(self): if not self.iter_timer_set: try: GLib.timeout_add(self.wait_period * 1000, self.iter) # schedule first iteration immediately GLib.timeout_add(0, lambda: self.iter() and False) except NameError: pass def run_polling(self, signal_handler): logging.warning( _("Unable to monitor PrepareForShutdown() signal, polling " "instead.")) logging.warning( _("To enable monitoring the PrepareForShutdown() signal " "instead of polling please install the python3-gi package")) signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGHUP, signal_handler) # poll for PrepareForShutdown then run final iterations if self.options.wait_for_signal: logging.debug("Waiting for signal to start operation ") while (not self.stop_signal_received.is_set() and not self.is_preparing_for_shutdown()): self.stop_signal_received.wait(self.wait_period) else: logging.debug("Skip waiting for signals, starting operation " "now") while not self.iter(): # TODO iter on sigterm and sighup, too time.sleep(self.wait_period) def run(self): """ delay shutdown and wait for PrepareForShutdown or other signals""" # set signal handlers def signal_handler(signum, frame): # type: (object) -> None logging.warning( "SIGTERM or SIGHUP received, stopping unattended-upgrades " "only if it is running") self.stop_signal_received.set() self.start_iterations() # fall back to polling without GLib try: hasattr(GLib, "MainLoop") except NameError: self.run_polling(signal_handler) return for sig in (signal.SIGTERM, signal.SIGHUP): GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, sig, signal_handler, None, None) if self.options.wait_for_signal: def prepare_for_shutdown_handler(active): """ Handle PrepareForShutdown() """ if not active: logging.warning("PrepareForShutdown(false) received, " "this should not happen") # PrepareForShutdown arrived, starting final iterations self.start_iterations() try: self.get_logind_proxy().connect_to_signal( "PrepareForShutdown", prepare_for_shutdown_handler) except dbus.exceptions.DBusException: logging.warning( _("Unable to monitor PrepareForShutdown() signal, polling " "instead.")) logging.warning( _("Maybe systemd-logind service is not running.")) self.run_polling(signal_handler) return logging.debug("Waiting for signal to start operation ") else: # starting final iterations immediately logging.debug("Skip waiting for signals, starting operation " "now") self.start_iterations() GLib.MainLoop().run() def try_iter_on_shutdown(self): # check if we need to run unattended-upgrades on shutdown and if # so, run it try: if self.apt_pkg_reinit_done is None: logging.debug("Initializing apt_pkg configuration") apt_pkg.init_config() self.apt_pkg_reinit_done = True except apt_pkg.Error as error: # apt may be in a transient state due to unattended-upgrades # running, thus assuming non shutdown mode is reasonable logging.error(_("Apt returned an error thus shutdown mode is " "disabled")) logging.error(_("error message: '%s'"), error) self.apt_pkg_reinit_done = False if self.on_shutdown_mode is None: self.on_shutdown_mode = ( not self.options.stop_only and not self.stop_signal_received.is_set() and self.apt_pkg_reinit_done and apt_pkg.config.find_b( "Unattended-Upgrade::InstallOnShutdown", False)) if self.on_shutdown_mode: env = copy.copy(os.environ) env["UNATTENDED_UPGRADES_FORCE_INSTALL_ON_SHUTDOWN"] = "1" logging.debug("starting unattended-upgrades in shutdown mode") self.on_shutdown_mode_uu_proc = subprocess.Popen( ["unattended-upgrade"], env=env) log_msg(_("Running unattended-upgrades in shutdown mode")) # run u-u, but switch to stopping when receiving stop signal # because it means shutdown progressed despite holding the lock if self.on_shutdown_mode: log_progress() if self.on_shutdown_mode_uu_proc.poll() is not None: # unattended-upgrades stopped on its own exit_log_result(True) else: return True return False def iter(self): if self.start_time is None: self.start_time = time.time() logging.debug("Starting countdown of %s minutes", self.max_delay / 60) else: if (time.time() - self.start_time) > self.max_delay: logging.warning(_( "Giving up on lockfile after %s minutes of delay"), self.max_delay / 60) sys.exit(1) if not self.stop_signal_received.is_set(): if self.try_iter_on_shutdown(): return True # run monitoring and keep "UI" updated res = apt_pkg.get_lock(self.options.lock_file) logging.debug("get_lock returned %i" % res) # exit here if there is no lock if res > 0: logging.debug("lock not taken") if self.lock_was_taken: exit_log_result(self.signal_sent) else: sys.exit(0) self.lock_was_taken = True signal_stop_unattended_upgrade() self.signal_sent = True # show log log_progress() return True def main(): # setup gettext localesApp = "unattended-upgrades" localesDir = "/usr/share/locale" gettext.bindtextdomain(localesApp, localesDir) gettext.textdomain(localesApp) # use a normal logfile instead of syslog too as on shutdown its too # easy to get syslog killed logdir = "/var/log/unattended-upgrades/" try: apt_pkg.init_config() logdir = apt_pkg.config.find_dir( "Unattended-Upgrade::LogDir", logdir) except apt_pkg.Error as error: logging.error(_("Apt returned an error when loading configuration, " "using default values")) logging.error(_("error message: '%s'"), error) parser = OptionParser() parser.add_option("", "--debug", action="store_true", dest="debug", default=apt_pkg.config.find_b( "Unattended-Upgrade::Debug", False), help="print debug messages") parser.add_option("", "--delay", default=25, type="int", help="delay in minutes to wait for unattended-upgrades") parser.add_option("", "--lock-file", default="/var/run/unattended-upgrades.lock", help="lock file location") parser.add_option("", "--stop-only", action="store_true", dest="stop_only", default=False, help="only stop running unattended-upgrades, don't " "start it even when " "Unattended-Upgrade::InstallOnShutdown is true") parser.add_option("", "--wait-for-signal", action="store_true", dest="wait_for_signal", default=False, help="wait for TERM signal before starting operation") (options, args) = parser.parse_args() # setup logging level = logging.INFO if options.debug: level = logging.DEBUG if not os.path.exists(logdir): os.makedirs(logdir) logfile = os.path.join(logdir, "unattended-upgrades-shutdown.log") logging.basicConfig(filename=logfile, level=level, format="%(asctime)s %(levelname)s - %(message)s") UnattendedUpgradesShutdown(options).run() if __name__ == "__main__": main()