0x1949 Team - FAZEMRX - MANAGER
Edit File: __init__.py
# Copyright (C) 2020 Red Hat, Inc., Jake Hunsaker <jhunsake@redhat.com> # This file is part of the sos project: https://github.com/sosreport/sos # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # version 2 of the GNU General Public License. # # See the LICENSE file in the source distribution for further information. import re from sos.utilities import sos_get_command_output class InitSystem(): """Encapsulates an init system to provide service-oriented functions to sos. This should be used to query the status of services, such as if they are enabled or disabled on boot, or if the service is currently running. :param init_cmd: The binary used to interact with the init system :type init_cmd: ``str`` :param list_cmd: The list subcmd given to `init_cmd` to list services :type list_cmd: ``str`` :param query_cmd: The query subcmd given to `query_cmd` to query the status of services :type query_cmd: ``str`` :param chroot: Location to chroot to for any command execution, i.e. the sysroot if we're running in a container :type chroot: ``str`` or ``None`` """ def __init__(self, init_cmd=None, list_cmd=None, query_cmd=None, chroot=None): """Initialize a new InitSystem()""" self.services = {} self.init_cmd = init_cmd self.list_cmd = "%s %s" % (self.init_cmd, list_cmd) or None self.query_cmd = "%s %s" % (self.init_cmd, query_cmd) or None self.chroot = chroot def is_enabled(self, name): """Check if given service name is enabled :param name: The name of the service :type name: ``str`` :returns: ``True`` if the service is enabled, else ``False`` :rtype: ``bool`` """ if self.services and name in self.services: return self.services[name]['config'] == 'enabled' return False def is_disabled(self, name): """Check if a given service name is disabled :param name: The name of the service :type name: ``str`` :returns: ``True`` if the service is disabled, else ``False`` :rtype: ``bool`` """ if self.services and name in self.services: return self.services[name]['config'] == 'disabled' return False def is_service(self, name): """Checks if the given service name exists on the system at all, this does not check for the service status :param name: The name of the service :type name: ``str`` :returns: ``True`` if the service exists, else ``False`` :rtype: ``bool`` """ return name in self.services def is_running(self, name, default=True): """Checks if the given service name is in a running state. This should be overridden by initsystems that subclass InitSystem :param name: The name of the service :type name: ``str`` :param default: The default response in case the check fails :type default: ``bool` :returns: ``True`` if the service is running, else ``default`` :rtype: ``bool`` """ # This is going to be primarily used in gating if service related # commands are going to be run or not. Default to always returning # True when an actual init system is not specified by policy so that # we don't inadvertantly restrict sosreports on those systems return default def load_all_services(self): """This loads all services known to the init system into a dict. The dict should be keyed by the service name, and contain a dict of the name and service status This must be overridden by anything that subclasses `InitSystem` in order for service methods to function properly """ pass def _query_service(self, name): """Query an individual service""" if self.query_cmd: try: return sos_get_command_output( "%s %s" % (self.query_cmd, name), chroot=self.chroot ) except Exception: return None return None def parse_query(self, output): """Parses the output returned by the query command to make a determination of what the state of the service is This should be overriden by anything that subclasses InitSystem :param output: The raw output from querying the service with the configured `query_cmd` :type output: ``str`` :returns: A state for the service, e.g. 'active', 'disabled', etc... :rtype: ``str`` """ return output def get_service_names(self, regex): """Get a list of all services discovered on the system that match the given regex. :param regex: The service name regex to match against :type regex: ``str`` """ reg = re.compile(regex, re.I) return [s for s in self.services.keys() if reg.match(s)] def get_service_status(self, name): """Get the status for the given service name along with the output of the query command :param name: The name of the service :type name: ``str`` :returns: Service status and query_cmd output from the init system :rtype: ``dict`` with keys `name`, `status`, and `output` """ _default = { 'name': name, 'status': 'missing', 'output': '' } if name not in self.services: return _default if 'status' in self.services[name]: # service status has been queried before, return existing info return self.services[name] svc = self._query_service(name) if svc is not None: self.services[name]['status'] = self.parse_query(svc['output']) self.services[name]['output'] = svc['output'] return self.services[name] return _default # vim: set et ts=4 sw=4 :