File: //opt/cloudlinux/venv/lib/python3.11/site-packages/clselect/baseclselect/pkgmanager.py
# coding: utf-8
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
import fcntl
import os
import contextlib
import psutil
import subprocess
import simplejson as json # because of unicode handling
from abc import ABCMeta, abstractmethod
from time import time
from . import (
INSTALLING_STATUS,
REMOVING_STATUS,
AcquireInterpreterLockError,
)
from future.utils import with_metaclass
from clcommon.utils import is_testing_enabled_repo
from clcommon.group_info_reader import GroupInfoReader
MAX_CACHE_AGE_SEC = 24 * 3600
class PkgManagerError(Exception):
pass
class BasePkgManager(with_metaclass(ABCMeta, object)):
"""
Class responsible for all interactions with Yum, interpreter versions
installation/removal and gathering info about already installed versions
"""
_testing_repo_enabled_cache = None
_config_dir = None
_versions_info = None
_alt_names = None
_log_file = None
# Base yum argv shared by all subclasses. Subclasses extend this with
# the action verb (install/remove/groupinstall/groupremove) and target
# package name. Kept as an argv list — no shell features needed.
_yum_argv = ['yum', '--disableplugin=fastestmirror']
@classmethod
def run_background(cls, argv, logfile):
# Hardening (CLOS-4372): never invoke via the shell. `argv` MUST be a
# list[str]; the previous format-string + shell design made regex
# correctness in _verify_action load-bearing for RCE safety.
if not isinstance(argv, list):
raise TypeError('run_background expects argv list, got {}'
.format(type(argv).__name__))
log_fp = open(logfile, 'ab')
env = dict(os.environ)
env['LANG'] = 'C'
return subprocess.Popen(
argv,
stdin=subprocess.DEVNULL,
stdout=log_fp,
stderr=subprocess.STDOUT,
shell=False,
env=env,
)
@property
def _testing_enabled(self):
if self._testing_repo_enabled_cache is not None:
return self._testing_repo_enabled_cache
res = is_testing_enabled_repo()
self._testing_repo_enabled_cache = res
return res
@property
def _yum_cache_file(self):
if self._testing_enabled:
return os.path.join(self._config_dir, 'yum_cache.dat.testing_enabled')
return os.path.join(self._config_dir, 'yum_cache.dat')
def update_yum_cache(self):
groups = GroupInfoReader.get_group_info(self._alt_names)
groups = list(groups.keys())
with open(self._yum_cache_file, 'w') as f:
for group in groups:
f.write(f'{group}\n')
def _read_yum_cache(self):
"""Return data from file or None if file is absent or outdated"""
try:
stat = os.stat(self._yum_cache_file)
except OSError:
return None
if (time() - stat.st_mtime) > MAX_CACHE_AGE_SEC:
return None
return open(self._yum_cache_file).read()
@staticmethod
def _remove_silent(f):
""" Silently remove file ignoring all errors """
try:
os.remove(f)
except (OSError, IOError):
pass
@property
def installed_versions(self):
"""
Returns list of installed interpreter versions by scanning alt_node_dir
and cache result. Cache also can be pre-filled at init time for
testing/debugging purposes
"""
if self._versions_info is None:
self._versions_info = self._scan_interpreter_versions()
return list(self._versions_info.keys())
def get_full_version(self, maj):
"""
Should return full interpreter version for a particular major version or
just fallback to given version if info is not available for any reason.
This information is taken from the hash map populated during
installed_packages scan.
:param maj: Major interpreter version
:return: Full interpreter version or Major if info is not available
"""
if self._versions_info is None:
self._versions_info = self._scan_interpreter_versions()
try:
return self._versions_info[maj]['full_version']
except KeyError:
return maj
@property
def _pid_lock_file(self):
return os.path.join(self._config_dir, 'yum.pid.lock')
@property
def _cache_lock_file(self):
return os.path.join(self._config_dir, 'yum_cache.pid.lock')
def _write_yum_status(self, pid, version=None, status=None):
"""
:param pid: pid of Yum process
:param version: interpreter version or None for "cache update" case
:param status: what yum is currently doing(few predefined statuses)
:return: None
"""
if not os.path.exists(self._config_dir):
self._create_config_dirs()
json.dump({
'pid': pid,
'version': str(version),
'status': status,
'time': float(time()),
}, open(self._pid_lock_file, 'w'))
def _check_yum_in_progress(self):
ongoing_yum = self._read_yum_status()
if ongoing_yum is not None:
return "{} of version '{}' is in progress. " \
"Please, wait till it's done"\
.format(ongoing_yum['status'], ongoing_yum['version'])
def _read_yum_status(self):
"""
Result "None" - means installing/removing of our packages is not
currently in progress. However, it doesn't mean that any other yum
instance is not running at the same time, but we ok with this
because our yum process will start processing automatically once
standard /var/run/yum.pid lock is removed by other process
:return: None or dict
"""
if self._pid_lock_file is None:
raise NotImplementedError()
try:
data = json.load(open(self._pid_lock_file))
except Exception:
# No file or it's broken:
self._remove_silent(self._pid_lock_file)
return None
if not psutil.pid_exists(data.get('pid')): #pylint: disable=E1101
self._remove_silent(self._pid_lock_file)
return None
# TODO check timeout and stop it or just run with bash "timeout ..."
try:
pid, _ = os.waitpid(data['pid'], os.WNOHANG)
except OSError:
# Case when we exit before completion and yum process is no
# longer our child process
return data # still working, wait...
if pid == 0: # still working, wait...
return data
self._remove_silent(self._pid_lock_file)
return None # It was zombie and has already finished
@abstractmethod
def format_cmd_string_for_installing(self, version):
"""
Build the yum install argv for a given interpreter version.
Returns a list[str] suitable for subprocess.Popen(shell=False).
The historical name is kept for caller compatibility; the return
type changed from str to list[str] as part of CLOS-4372 hardening.
:param version: version of interpreter for installing
:rtype: list[str]
"""
raise NotImplementedError()
@abstractmethod
def format_cmd_string_for_removing(self, version):
"""
Build the yum remove argv for a given interpreter version.
Returns a list[str] suitable for subprocess.Popen(shell=False).
:param version: version of interpreter for removing
:rtype: list[str]
"""
raise NotImplementedError()
def install_version(self, version):
"""Return None or Error string"""
err = self._verify_action(version)
if err:
return err
if version in self.installed_versions:
return 'Version "{}" is already installed'.format(version)
available = self.checkout_available()
if available is None:
return ('Updating available versions cache is currently '
'in progress. Please, try again in a few minutes')
if version not in available:
return ('Version "{}" is not available. '
'Please, make sure you typed it correctly'.format(version))
argv = self.format_cmd_string_for_installing(version)
p = self.run_background(argv, self._log_file)
self._write_yum_status(p.pid, version, INSTALLING_STATUS)
def remove_version(self, version):
"""Return None or Error string"""
err = self._verify_action(version)
if err:
return err
if version not in self.installed_versions:
return 'Version "{}" is not installed'.format(version)
if self.is_interpreter_locked(version):
return "This version is currently in use by another operation. " \
"Please, wait until it's complete and try again"
if self._is_version_in_use(version):
return "It's not possible to uninstall version which is " \
"currently in use by applications"
argv = self.format_cmd_string_for_removing(version)
p = self.run_background(argv, self._log_file)
self._write_yum_status(p.pid, version, REMOVING_STATUS)
def in_progress(self):
"""
Should return version and it's status for versions that is
currently installing|removing
"""
ongoing_yum = self._read_yum_status()
if ongoing_yum is not None and \
ongoing_yum['status'] in (INSTALLING_STATUS, REMOVING_STATUS,):
return {
ongoing_yum['version']: {
'status': ongoing_yum['status'],
'base_dir': '',
}
}
return None
@contextlib.contextmanager
def acquire_interpreter_lock(self, interpreter_version):
lock_name = self._get_lock_file_path(interpreter_version)
try:
lf = open(lock_name, 'w')
except IOError:
raise AcquireInterpreterLockError(interpreter_version)
try:
fcntl.flock(lf, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
# TODO: try to use LOCK_SH here
# It's ok if it's already locked because we allow multiple
# operations for different applications at the same time
# with the same "--new-version"
pass
try:
yield
finally: # Protection from exception in "context code"
lf.close()
@abstractmethod
def checkout_available(self):
raise NotImplementedError()
@abstractmethod
def _scan_interpreter_versions(self):
raise NotImplementedError()
@abstractmethod
def _create_config_dirs(self):
raise NotImplementedError()
def is_interpreter_locked(self, interpreter_version):
lock_name = self._get_lock_file_path(interpreter_version)
if not os.path.isfile(lock_name):
return False
lf = open(lock_name, 'w')
try:
fcntl.flock(lf, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
return True
finally:
lf.close()
return False
@abstractmethod
def _verify_action(self, version):
raise NotImplementedError()
def _get_lock_file_path(self, version):
raise NotImplementedError()
@abstractmethod
def _is_version_in_use(self, version):
raise NotImplementedError()