File: //opt/cloudlinux/venv/lib/python3.11/site-packages/clcommon/clhook.py
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
#
import hashlib
import logging
import os
import shutil
import stat
import subprocess
import tempfile
import simplejson as json
import configparser
CL_CONFIG = '/etc/sysconfig/cloudlinux'
HOOK_SECTION = 'hooks'
HOOK_SEPARATOR = ';'
DEBUG_STRING = 'python-cllib:clhook:'
logger = logging.getLogger(__name__)
def get_config(file_name):
"""
read config from file_name
return config
"""
config = configparser.ConfigParser(interpolation=None, strict=False)
config.optionxform = str # make config case sensitive
config.read(file_name)
return config
def write_config(config, file_name, debug_local_name):
"""
write config to file_name
return [BOOLEAN_STATE, MESSAGE]
"""
try:
fd, tmp_cfg_path = tempfile.mkstemp(dir=os.path.dirname(file_name))
try:
os.fchmod(fd, 0o644)
with os.fdopen(fd, 'w', encoding='utf-8') as tmpconfig:
fd = None # ownership transferred to fdopen; with-block closes it
config.write(tmpconfig)
shutil.move(tmp_cfg_path, file_name)
except BaseException:
if fd is not None:
try:
os.close(fd)
except OSError:
pass
try:
os.unlink(tmp_cfg_path)
except OSError:
pass
raise
except IOError as e:
return [False, f'{debug_local_name}{file_name} write error\n{str(e)}']
return [True, 'OK\n']
def get_hook_list_from_file(hook_prefix, config_file = CL_CONFIG):
"""
return [BOOLEAN_STATE, hook_list]
"""
hook_list = []
config = get_config(config_file)
try:
hook_list = config.get(HOOK_SECTION, hook_prefix).split(HOOK_SEPARATOR)
except (configparser.NoOptionError, configparser.NoSectionError):
return [False, hook_list]
return [True, hook_list]
def register_hook(hook_path, hook_prefix, config_file = CL_CONFIG):
"""
add hook to config file
return [BOOLEAN_STATE, MESSAGE]
"""
debug_local_name = DEBUG_STRING + 'register_hook:'
caller_uid = os.getuid()
caller_pid = os.getpid()
logger.info('register_hook: hook_path=%s, hook_prefix=%s, config_file=%s, '
'caller_uid=%d, caller_pid=%d',
hook_path, hook_prefix, config_file, caller_uid, caller_pid)
config = get_config(config_file)
try:
hook_list = config.get(HOOK_SECTION, hook_prefix).split(HOOK_SEPARATOR)
if hook_path in hook_list:
logger.info('register_hook: hook_path=%s already registered for '
'hook_prefix=%s', hook_path, hook_prefix)
return [True, debug_local_name + 'Hook ' + str(hook_path) + ' already registred\n' ]
hook_list.append(hook_path)
config.set(HOOK_SECTION, hook_prefix, HOOK_SEPARATOR.join(hook_list))
except configparser.NoSectionError:
config.add_section(HOOK_SECTION)
config.set(HOOK_SECTION, hook_prefix, hook_path)
except configparser.NoOptionError:
config.set(HOOK_SECTION, hook_prefix, hook_path)
result = write_config(config, config_file, debug_local_name)
if result[0]:
logger.info('register_hook: successfully registered hook_path=%s, '
'hook_prefix=%s', hook_path, hook_prefix)
else:
logger.warning('register_hook: failed to register hook_path=%s, '
'hook_prefix=%s, error=%s', hook_path, hook_prefix, result[1])
return result
def unregister_hook(hook_path, hook_prefix, config_file = CL_CONFIG):
"""
remove hook from config_file
return [BOOLEAN_STATE, MESSAGE]
"""
debug_local_name = DEBUG_STRING + 'unregister_hook:'
caller_uid = os.getuid()
caller_pid = os.getpid()
logger.info('unregister_hook: hook_path=%s, hook_prefix=%s, config_file=%s, '
'caller_uid=%d, caller_pid=%d',
hook_path, hook_prefix, config_file, caller_uid, caller_pid)
config = get_config(config_file)
try:
hook_list = config.get(HOOK_SECTION, hook_prefix).split(HOOK_SEPARATOR)
hook_list.remove(hook_path)
config.set(HOOK_SECTION, hook_prefix, HOOK_SEPARATOR.join(hook_list))
result = write_config(config, config_file, debug_local_name)
if result[0]:
logger.info('unregister_hook: successfully unregistered hook_path=%s, '
'hook_prefix=%s', hook_path, hook_prefix)
else:
logger.warning('unregister_hook: failed to unregister hook_path=%s, '
'hook_prefix=%s, error=%s', hook_path, hook_prefix, result[1])
return result
except (configparser.NoSectionError, configparser.NoOptionError, ValueError):
logger.info('unregister_hook: hook_path=%s not found in hook_prefix=%s',
hook_path, hook_prefix)
return [True, 'OK\n']
def call_hook_list(hook_list, data):
"""
call hooks and send data as json to stdin
return [BOOLEAN_STATUS, message]
"""
debug_local_name = DEBUG_STRING + 'call_hook:'
json_encode = json.dumps(data)
data_hash = hashlib.sha256(json_encode if isinstance(json_encode, bytes)
else json_encode.encode('utf-8')).hexdigest()
caller_uid = os.getuid()
caller_pid = os.getpid()
for hook in hook_list:
# TOCTOU-safe hook execution: open the path once, fstat the resulting
# fd to validate ownership/mode against the bound inode, then exec via
# /proc/self/fd/N so the exec'd binary is the same inode we just
# validated. The fd-binding is what closes the TOCTOU window — once
# open() returns, an attacker swapping the path (or any symlink
# component) cannot redirect us to a different inode. The fstat
# checks (S_ISREG, root-owned, not group/world writable, executable)
# are the load-bearing security properties; O_NOFOLLOW is intentionally
# NOT used here because legitimate listener installs (lvemanager,
# lve-stats, etc.) place symlinks at /usr/share/cloudlinux/hooks/
# listeners/ pointing at root-owned scripts in the consumer RPM's
# data dir — rejecting symlinks broke those consumers wholesale
# (see CLOS-4280 follow-up to !16). pass_fds keeps the fd open across
# execve (Popen defaults to close_fds=True in Python 3).
try:
fd = os.open(hook, os.O_RDONLY)
except OSError:
logger.warning('call_hook_list: hook=%s open error, '
'caller_uid=%d, caller_pid=%d',
hook, caller_uid, caller_pid)
return [False, debug_local_name + str(hook) + ' : isn`t file or isn`t runnable\n']
try:
st = os.fstat(fd)
if not (stat.S_ISREG(st.st_mode) and
st.st_uid == 0 and
not (st.st_mode & (stat.S_IWGRP | stat.S_IWOTH)) and
(st.st_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))):
logger.warning('call_hook_list: hook=%s rejected by fstat checks '
'(not regular file, not root-owned, group/world '
'writable, or not executable), '
'caller_uid=%d, caller_pid=%d',
hook, caller_uid, caller_pid)
return [False, debug_local_name + str(hook) + ' : isn`t file or isn`t runnable\n']
fd_path = f'/proc/self/fd/{fd}'
logger.info('call_hook_list: executing hook=%s, data_hash=%s, '
'caller_uid=%d, caller_pid=%d',
hook, data_hash, caller_uid, caller_pid)
try:
with subprocess.Popen(
fd_path,
pass_fds=(fd,),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as proc:
proc.stdin.write(json_encode)
proc.communicate()
logger.info('call_hook_list: hook=%s exited with status=%d',
hook, proc.returncode)
except OSError:
logger.warning('call_hook_list: hook=%s call error, '
'caller_uid=%d, caller_pid=%d',
hook, caller_uid, caller_pid)
return [False, debug_local_name + 'Hook call error\n']
finally:
os.close(fd)
return [True, 'OK\n']