Server IP : 15.235.198.142 / Your IP : 216.73.216.14 Web Server : Apache/2.4.58 (Ubuntu) System : Linux ballsack 6.8.0-45-generic #45-Ubuntu SMP PREEMPT_DYNAMIC Fri Aug 30 12:02:04 UTC 2024 x86_64 User : www-data ( 33) PHP Version : 8.3.6 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : OFF Directory : /proc/self/root/lib/python3/dist-packages/uaclient/api/u/unattended_upgrades/status/ |
Upload File : |
import datetime import os from typing import Dict, List, Optional, Tuple, Union from uaclient import apt, exceptions, messages, system from uaclient.api.api import APIEndpoint from uaclient.api.data_types import AdditionalInfo from uaclient.api.exceptions import UnattendedUpgradesError from uaclient.apt import get_apt_config_keys, get_apt_config_values from uaclient.config import UAConfig from uaclient.data_types import ( BoolDataValue, DataObject, DatetimeDataValue, Field, IntDataValue, StringDataValue, data_list, ) UNATTENDED_UPGRADES_CONFIG_KEYS = [ "APT::Periodic::Enable", "APT::Periodic::Update-Package-Lists", "APT::Periodic::Unattended-Upgrade", "Unattended-Upgrade::Allowed-Origins", ] UNATTENDED_UPGRADES_STAMP_PATH = "/var/lib/apt/periodic/upgrade-stamp" class UnattendedUpgradesDisabledReason(DataObject): fields = [ Field("msg", StringDataValue), Field("code", StringDataValue), ] def __init__(self, msg: str, code: str): self.msg = msg self.code = code class UnattendedUpgradesStatusResult(DataObject, AdditionalInfo): fields = [ Field("systemd_apt_timer_enabled", BoolDataValue), Field("apt_periodic_job_enabled", BoolDataValue), Field("package_lists_refresh_frequency_days", IntDataValue), Field("unattended_upgrades_frequency_days", IntDataValue), Field( "unattended_upgrades_allowed_origins", data_list(StringDataValue), ), Field("unattended_upgrades_running", BoolDataValue), Field( "unattended_upgrades_disabled_reason", UnattendedUpgradesDisabledReason, required=False, ), Field( "unattended_upgrades_last_run", DatetimeDataValue, required=False ), ] def __init__( self, *, systemd_apt_timer_enabled: bool, apt_periodic_job_enabled: bool, package_lists_refresh_frequency_days: int, unattended_upgrades_frequency_days: int, unattended_upgrades_allowed_origins: List[str], unattended_upgrades_running: bool, unattended_upgrades_disabled_reason: Optional[ UnattendedUpgradesDisabledReason ], unattended_upgrades_last_run: Optional[datetime.datetime] ): self.systemd_apt_timer_enabled = systemd_apt_timer_enabled self.apt_periodic_job_enabled = apt_periodic_job_enabled self.package_lists_refresh_frequency_days = ( package_lists_refresh_frequency_days ) self.unattended_upgrades_frequency_days = ( unattended_upgrades_frequency_days ) self.unattended_upgrades_allowed_origins = ( unattended_upgrades_allowed_origins ) self.unattended_upgrades_running = unattended_upgrades_running self.unattended_upgrades_disabled_reason = ( unattended_upgrades_disabled_reason ) self.unattended_upgrades_last_run = unattended_upgrades_last_run def _get_apt_daily_job_status() -> bool: try: apt_daily_job_enabled = system.is_systemd_unit_active( "apt-daily.timer" ) apt_daily_upgrade_job_enabled = system.is_systemd_unit_active( "apt-daily-upgrade.timer" ) systemd_apt_timer_enabled = ( apt_daily_job_enabled and apt_daily_upgrade_job_enabled ) except exceptions.ProcessExecutionError as e: raise UnattendedUpgradesError(error_msg=str(e)) return systemd_apt_timer_enabled def _is_unattended_upgrades_running( systemd_apt_timer_enabled: bool, unattended_upgrades_cfg: Dict[str, Union[str, List[str]]], ) -> Tuple[bool, Optional[messages.NamedMessage]]: if not systemd_apt_timer_enabled: return (False, messages.UNATTENDED_UPGRADES_SYSTEMD_JOB_DISABLED) for key, value in unattended_upgrades_cfg.items(): if not value: return ( False, messages.UNATTENDED_UPGRADES_CFG_LIST_VALUE_EMPTY.format( cfg_name=key ), ) if isinstance(value, str) and value == "0": return ( False, messages.UNATTENDED_UPGRADES_CFG_VALUE_TURNED_OFF.format( cfg_name=key ), ) return (True, None) def _get_unattended_upgrades_last_run() -> Optional[datetime.datetime]: try: creation_epoch = os.path.getctime(UNATTENDED_UPGRADES_STAMP_PATH) except FileNotFoundError: return None return datetime.datetime.fromtimestamp(creation_epoch) def status() -> UnattendedUpgradesStatusResult: return _status(UAConfig()) def _status(cfg: UAConfig) -> UnattendedUpgradesStatusResult: if not apt.is_installed("unattended-upgrades"): return UnattendedUpgradesStatusResult( systemd_apt_timer_enabled=False, apt_periodic_job_enabled=False, package_lists_refresh_frequency_days=0, unattended_upgrades_frequency_days=0, unattended_upgrades_allowed_origins=[], unattended_upgrades_disabled_reason=UnattendedUpgradesDisabledReason( # noqa msg=messages.UNATTENDED_UPGRADES_UNINSTALLED.msg, code=messages.UNATTENDED_UPGRADES_UNINSTALLED.name, ), unattended_upgrades_running=False, unattended_upgrades_last_run=None, ) systemd_apt_timer_enabled = _get_apt_daily_job_status() unattended_upgrades_last_run = _get_unattended_upgrades_last_run() unattended_upgrades_cfg = get_apt_config_values( set( UNATTENDED_UPGRADES_CONFIG_KEYS + get_apt_config_keys("Unattended-Upgrade") ) ) # If that key is not present on the APT Config, we assume it # that the config is "enabled", as by default this configuration # will not be present in APT unattended_upgrades_cfg["APT::Periodic::Enable"] = ( unattended_upgrades_cfg["APT::Periodic::Enable"] or "1" ) ( unattended_upgrades_running, disabled_reason, ) = _is_unattended_upgrades_running( systemd_apt_timer_enabled, unattended_upgrades_cfg ) if disabled_reason: unattended_upgrades_disabled_reason = UnattendedUpgradesDisabledReason( msg=disabled_reason.msg, code=disabled_reason.name, ) else: unattended_upgrades_disabled_reason = None unattended_upgrades_result = UnattendedUpgradesStatusResult( systemd_apt_timer_enabled=systemd_apt_timer_enabled, apt_periodic_job_enabled=str( unattended_upgrades_cfg.get("APT::Periodic::Enable", "") ) == "1", package_lists_refresh_frequency_days=int( unattended_upgrades_cfg.get( # type: ignore "APT::Periodic::Update-Package-Lists", 0 ) ), unattended_upgrades_frequency_days=int( unattended_upgrades_cfg.get( # type: ignore "APT::Periodic::Unattended-Upgrade", 0 ) ), unattended_upgrades_allowed_origins=list( unattended_upgrades_cfg.get("Unattended-Upgrade::Allowed-Origins") or [] ), unattended_upgrades_disabled_reason=( unattended_upgrades_disabled_reason ), unattended_upgrades_running=unattended_upgrades_running, unattended_upgrades_last_run=unattended_upgrades_last_run, ) unattended_upgrades_result.meta = {"raw_config": unattended_upgrades_cfg} return unattended_upgrades_result endpoint = APIEndpoint( version="v1", name="UnattendedUpgradesStatus", fn=_status, options_cls=None, )