HEX
Server: LiteSpeed
System: Linux srv1.dhviews.com 5.14.0-570.23.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Jun 24 11:27:16 EDT 2025 x86_64
User: bdedition (1723)
PHP: 7.4.33
Disabled: NONE
Upload Files
File: //opt/cloudlinux/venv/lib/python3.11/site-packages/clcommon/clhook.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
#

import hashlib
import logging
import os
import shutil
import stat
import subprocess
import tempfile
import simplejson as json
import configparser

CL_CONFIG = '/etc/sysconfig/cloudlinux'
HOOK_SECTION = 'hooks'
HOOK_SEPARATOR = ';'

DEBUG_STRING = 'python-cllib:clhook:'

logger = logging.getLogger(__name__)


def get_config(file_name):
    """
    read config from file_name
    return config
    """
    config = configparser.ConfigParser(interpolation=None, strict=False)
    config.optionxform = str  # make config case sensitive
    config.read(file_name)
    return config


def write_config(config, file_name, debug_local_name):
    """
    write config to file_name
    return [BOOLEAN_STATE, MESSAGE]
    """
    try:
        fd, tmp_cfg_path = tempfile.mkstemp(dir=os.path.dirname(file_name))
        try:
            os.fchmod(fd, 0o644)
            with os.fdopen(fd, 'w', encoding='utf-8') as tmpconfig:
                fd = None  # ownership transferred to fdopen; with-block closes it
                config.write(tmpconfig)
            shutil.move(tmp_cfg_path, file_name)
        except BaseException:
            if fd is not None:
                try:
                    os.close(fd)
                except OSError:
                    pass
            try:
                os.unlink(tmp_cfg_path)
            except OSError:
                pass
            raise
    except IOError as e:
        return [False, f'{debug_local_name}{file_name} write error\n{str(e)}']
    return [True, 'OK\n']


def get_hook_list_from_file(hook_prefix, config_file = CL_CONFIG):
    """
    return [BOOLEAN_STATE, hook_list]
    """
    hook_list = []
    config = get_config(config_file)
    try:
        hook_list = config.get(HOOK_SECTION, hook_prefix).split(HOOK_SEPARATOR)
    except (configparser.NoOptionError, configparser.NoSectionError):
        return [False, hook_list]
    return [True, hook_list]


def register_hook(hook_path, hook_prefix, config_file = CL_CONFIG):
    """
    add hook to config file
    return [BOOLEAN_STATE, MESSAGE]
    """
    debug_local_name = DEBUG_STRING + 'register_hook:'
    caller_uid = os.getuid()
    caller_pid = os.getpid()
    logger.info('register_hook: hook_path=%s, hook_prefix=%s, config_file=%s, '
                'caller_uid=%d, caller_pid=%d',
                hook_path, hook_prefix, config_file, caller_uid, caller_pid)
    config = get_config(config_file)
    try:
        hook_list = config.get(HOOK_SECTION, hook_prefix).split(HOOK_SEPARATOR)
        if hook_path in hook_list:
            logger.info('register_hook: hook_path=%s already registered for '
                        'hook_prefix=%s', hook_path, hook_prefix)
            return [True, debug_local_name + 'Hook ' + str(hook_path) + ' already registred\n' ]
        hook_list.append(hook_path)
        config.set(HOOK_SECTION, hook_prefix, HOOK_SEPARATOR.join(hook_list))
    except configparser.NoSectionError:
        config.add_section(HOOK_SECTION)
        config.set(HOOK_SECTION, hook_prefix, hook_path)
    except configparser.NoOptionError:
        config.set(HOOK_SECTION, hook_prefix, hook_path)
    result = write_config(config, config_file, debug_local_name)
    if result[0]:
        logger.info('register_hook: successfully registered hook_path=%s, '
                    'hook_prefix=%s', hook_path, hook_prefix)
    else:
        logger.warning('register_hook: failed to register hook_path=%s, '
                       'hook_prefix=%s, error=%s', hook_path, hook_prefix, result[1])
    return result


def unregister_hook(hook_path, hook_prefix, config_file = CL_CONFIG):
    """
    remove hook from config_file
    return [BOOLEAN_STATE, MESSAGE]
    """
    debug_local_name = DEBUG_STRING + 'unregister_hook:'
    caller_uid = os.getuid()
    caller_pid = os.getpid()
    logger.info('unregister_hook: hook_path=%s, hook_prefix=%s, config_file=%s, '
                'caller_uid=%d, caller_pid=%d',
                hook_path, hook_prefix, config_file, caller_uid, caller_pid)
    config = get_config(config_file)
    try:
        hook_list = config.get(HOOK_SECTION, hook_prefix).split(HOOK_SEPARATOR)
        hook_list.remove(hook_path)
        config.set(HOOK_SECTION, hook_prefix, HOOK_SEPARATOR.join(hook_list))
        result = write_config(config, config_file, debug_local_name)
        if result[0]:
            logger.info('unregister_hook: successfully unregistered hook_path=%s, '
                        'hook_prefix=%s', hook_path, hook_prefix)
        else:
            logger.warning('unregister_hook: failed to unregister hook_path=%s, '
                           'hook_prefix=%s, error=%s', hook_path, hook_prefix, result[1])
        return result
    except (configparser.NoSectionError, configparser.NoOptionError, ValueError):
        logger.info('unregister_hook: hook_path=%s not found in hook_prefix=%s',
                    hook_path, hook_prefix)
        return [True, 'OK\n']


def call_hook_list(hook_list, data):
    """
    call hooks and send data as json to stdin
    return [BOOLEAN_STATUS, message]
    """
    debug_local_name = DEBUG_STRING + 'call_hook:'
    json_encode = json.dumps(data)
    data_hash = hashlib.sha256(json_encode if isinstance(json_encode, bytes)
                               else json_encode.encode('utf-8')).hexdigest()
    caller_uid = os.getuid()
    caller_pid = os.getpid()
    for hook in hook_list:
        # TOCTOU-safe hook execution: open the path once, fstat the resulting
        # fd to validate ownership/mode against the bound inode, then exec via
        # /proc/self/fd/N so the exec'd binary is the same inode we just
        # validated. The fd-binding is what closes the TOCTOU window — once
        # open() returns, an attacker swapping the path (or any symlink
        # component) cannot redirect us to a different inode. The fstat
        # checks (S_ISREG, root-owned, not group/world writable, executable)
        # are the load-bearing security properties; O_NOFOLLOW is intentionally
        # NOT used here because legitimate listener installs (lvemanager,
        # lve-stats, etc.) place symlinks at /usr/share/cloudlinux/hooks/
        # listeners/ pointing at root-owned scripts in the consumer RPM's
        # data dir — rejecting symlinks broke those consumers wholesale
        # (see CLOS-4280 follow-up to !16). pass_fds keeps the fd open across
        # execve (Popen defaults to close_fds=True in Python 3).
        try:
            fd = os.open(hook, os.O_RDONLY)
        except OSError:
            logger.warning('call_hook_list: hook=%s open error, '
                           'caller_uid=%d, caller_pid=%d',
                           hook, caller_uid, caller_pid)
            return [False, debug_local_name + str(hook) + ' : isn`t file or isn`t runnable\n']
        try:
            st = os.fstat(fd)
            if not (stat.S_ISREG(st.st_mode) and
                    st.st_uid == 0 and
                    not (st.st_mode & (stat.S_IWGRP | stat.S_IWOTH)) and
                    (st.st_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))):
                logger.warning('call_hook_list: hook=%s rejected by fstat checks '
                               '(not regular file, not root-owned, group/world '
                               'writable, or not executable), '
                               'caller_uid=%d, caller_pid=%d',
                               hook, caller_uid, caller_pid)
                return [False, debug_local_name + str(hook) + ' : isn`t file or isn`t runnable\n']
            fd_path = f'/proc/self/fd/{fd}'
            logger.info('call_hook_list: executing hook=%s, data_hash=%s, '
                        'caller_uid=%d, caller_pid=%d',
                        hook, data_hash, caller_uid, caller_pid)
            try:
                with subprocess.Popen(
                    fd_path,
                    pass_fds=(fd,),
                    stdin=subprocess.PIPE,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                ) as proc:
                    proc.stdin.write(json_encode)
                    proc.communicate()
                    logger.info('call_hook_list: hook=%s exited with status=%d',
                                hook, proc.returncode)
            except OSError:
                logger.warning('call_hook_list: hook=%s call error, '
                               'caller_uid=%d, caller_pid=%d',
                               hook, caller_uid, caller_pid)
                return [False, debug_local_name + 'Hook call error\n']
        finally:
            os.close(fd)
    return [True, 'OK\n']