File: //opt/cloudlinux/venv/lib64/python3.11/site-packages/clcommon/public_hooks/bundle/cpanel/lib.py
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
#
import argparse
import hashlib
import json
import os
import re
import subprocess
import logging
import urllib.parse
import raven
from clcommon.public_hooks import (
POST_MODIFY_DOMAIN,
PRE_MODIFY_USER,
POST_MODIFY_USER
)
from clcommon.public_hooks.lib.helpers import valid_name
from clcommon.utils import run_command, ExternalProgramFailed, is_user_present
from clcommon import cpapi
from secureio import makedirs_secure, write_file_secure, read_file_secure
EXIT_NO_USER_FOUND = 1
ERR_NO_USER_FOUND = 'Unable to find username in hook cmdline'
WHMAPI1 = '/usr/sbin/whmapi1'
LVE_DIR = '/var/lve'
TMP_DIR = os.path.join(LVE_DIR, 'tmp')
logger = logging.getLogger(__name__)
# Domain validation: RFC-1035 hostname-style (letters, digits, dot, hyphen).
# Use re.fullmatch at call-sites — the prior `^...$` form would have allowed
# a trailing newline (Python `$` matches end-of-string OR just before a final
# `\n`). bugbot 0bcbadf9 on !27 flagged this for the username regex; we apply
# the same fullmatch discipline here to domains. Username validation routes
# through valid_name (which already uses fullmatch).
#
# RFC 1035 hostname shape: each label starts with an alphanumeric, may
# contain interior `-`, ends with an alphanumeric, ≤63 chars per label,
# labels joined by literal `.`. Rejects bare `.`, `..`, `---`, leading/
# trailing dots and hyphens (bugbot 5a4f0354 on d1cc213). Mirrors the
# regex already in post_modify_domain.py — keeping the cpanel-bundle
# copy avoids a public-hooks-bin import from the cpanel bundle.
_VALID_DOMAIN_RE = re.compile(
r'[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?'
r'(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*'
)
def _is_valid_username(value):
"""Wrap the shared valid_name helper to fit the cpanel-hook return-0
convention. valid_name raises argparse.ArgumentTypeError on bad input;
cpanel hooks must return 0 (not sys.exit) so the cPanel hook chain
keeps running for unrelated downstream listeners."""
try:
valid_name(value)
return True
except argparse.ArgumentTypeError:
return False
def _is_valid_domain(value):
"""Domain-name validation matching the prior cpanel-bundle behavior but
using fullmatch to defeat the trailing-newline bypass (bugbot 0bcbadf9)."""
return isinstance(value, str) and bool(_VALID_DOMAIN_RE.fullmatch(value))
def print_response(hook_name, success=True):
"""
cPanel expects that each custom hook
prints two values in the end of the execution:
- status, where 1 means success
- message, which explains non-1 statuses
otherwise nothing really breaks, but logs
are full of "script returned invalid response" msgs
:param hook_name: name, path or anything else to fill
message with in order to understand
what exactly failed
:param success: is it everything ended successfully?
:return: Nothing
"""
if not success:
print(0, f"Failed to execute hook {hook_name}; you can find logs in "
"/var/log/cloudlinux/hooks/info.log "
"and contact CloudLinux Support if you need "
"help with the issue.")
else:
print(1, "Ok")
# ATTENTION: we use this function in
# processpaneluserspackages and lvemanager
def call_sync_map():
"""
Run lvectl sync-map and log possible stdout|err in case of errors.
:return: None
"""
with subprocess.Popen(
['lvectl', 'sync-map'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
) as proc:
stdout, stderr = proc.communicate()
if proc.returncode != 0:
logger.error('Error during "lvectl sync-map", code: %s, '
'stderr: `%s`, stdout: `%s`. Reseller limits '
'kernel mapping might be not synchronized.'
'Contact CloudLinux Support for help.'
'', proc.returncode, stdout, stderr)
def cpanel_postwwwacct_main(data):
"""
Post create account hook of cPanel
:return: None
"""
user = data.get('user', None)
owner = data.get('owner', None)
if not user:
logger.warning(ERR_NO_USER_FOUND)
return EXIT_NO_USER_FOUND
return subprocess.call([
POST_MODIFY_USER, 'create',
'--username', user, '--owner', owner],
env={'CPANEL_RESTORE': str(data.get('is_restore', 0))})
def cpanel_prekillacct_main(data):
"""
Pre kill account hook of cPanel
:return: None
"""
# It's necessary destroy lve before remove user home directory,
# otherwise will be error due busy mount points of cagefs
user = data.get('user', None)
if not user:
logger.warning(ERR_NO_USER_FOUND)
return EXIT_NO_USER_FOUND
if not is_user_present(user):
logger.warning('User %s does not present in the system, skip hook', user)
return 0
return subprocess.call([
PRE_MODIFY_USER, 'delete',
'--username', user])
def cpanel_postkillacct_main(data):
"""
Post kill account hook of cPanel
:return: None
"""
user = data.get('user', None)
if not user:
logger.warning(ERR_NO_USER_FOUND)
return EXIT_NO_USER_FOUND
return subprocess.call([
POST_MODIFY_USER, 'delete',
'--username', user])
def cpanel_postunsuspendacct_main(data):
"""
Post unsuspend account hook of cPanel
data: {'result': 1, 'args': {'user': 'susp2'}, 'user': 'root'}
"""
user = data.get('args', {}).get('user', None)
if not user:
logger.warning(ERR_NO_USER_FOUND)
return EXIT_NO_USER_FOUND
if not is_user_present(user):
logger.warning('User %s does not present in the system, skip hook', user)
return 0
return subprocess.call([
POST_MODIFY_USER, 'unsuspend',
'--username', user])
def cpanel_postsuspendacct_main(data):
"""
Post unsuspend account hook of cPanel
data: {'result': 1, 'args': {'user': 'susp2'}, 'user': 'root'}
"""
user = data.get('args', {}).get('user', None)
if not user:
logger.warning(ERR_NO_USER_FOUND)
return EXIT_NO_USER_FOUND
if not is_user_present(user):
logger.warning('User %s does not present in the system, skip hook', user)
return 0
return subprocess.call([
POST_MODIFY_USER, 'suspend',
'--username', user])
def _safe_tmp_path(user):
"""
Build a safe file path under TMP_DIR for the given user.
Raises ValueError if the resulting path escapes TMP_DIR.
"""
if not user or os.sep in user or (os.altsep and os.altsep in user) or '..' in user:
raise ValueError(f"Invalid user value: {user!r}")
filename = os.path.join(TMP_DIR, user)
real = os.path.realpath(filename)
if not real.startswith(os.path.realpath(TMP_DIR) + os.sep):
raise ValueError(f"Path traversal detected for user: {user!r}")
return filename
def _read_old_domain(user):
"""
Read old domain for modified account
:param user: name of user
:return: old domain
:rtype: str
"""
domain = None
try:
filename = _safe_tmp_path(user)
except ValueError:
logger.error("Invalid user for tmp file: %s", user)
return None
try:
content = read_file_secure(filename, uid=0, gid=0, exit_on_error=False,
# This log is intended to be used only by
# cagefs update command
write_log=False)
domain = content[0]
except (IndexError, OSError, IOError):
# use Raven carefully and only in places where
# you sure that sentry is already initialized
raven.base.Raven.captureException(
message='failed to read old domain for user (pre hook no called?)')
return domain
def cpanel_postmodifyacct_main(data):
"""
Post modify account hook of cPanel
:return: None
"""
user = data.get('user', None)
new_user = data.get('newuser', None)
domain = data.get('domain', None)
exit_code = 0
# changing owner of user
# FIXME: this check does not work because cpanel sends `owner` always
new_owner = data.get('owner', data.get('OWNER'))
args = [POST_MODIFY_USER, 'modify', '-u', user]
if new_owner:
args += ['--new-owner', new_owner]
if all((user, new_user,)) and user != new_user:
args += ['--new-username', new_user]
exit_code += subprocess.call(args)
old_domain = _read_old_domain(user)
if domain is not None and domain != old_domain:
# looks like domain is renamed
exit_code += subprocess.call([
POST_MODIFY_DOMAIN,
'modify', '--username', user if new_user is None else new_user,
'--domain', old_domain, '--new-domain', domain,
'--include-subdomains'])
return exit_code
def cpanel_postrestoreacct_main(data):
"""
Post restore account hook of cPanel
:return: None
"""
user = data.get('user', None)
if not user:
logger.warning(ERR_NO_USER_FOUND)
return EXIT_NO_USER_FOUND
return subprocess.call([
POST_MODIFY_USER, 'restore',
'--username', user])
def _get_old_domain(user):
"""
Get old domain for modified account
:param user: name of user
:return: old domain
:rtype: str
"""
domain = None
try:
cmd = [
WHMAPI1,
'listaccts',
f'search={user}',
'searchtype=user',
'searchmethod=exact',
'want=domain',
'--output=json'
]
std_out = run_command(cmd, return_full_output=True)[1] # take only std_out, ignore std_err
data = json.loads(std_out)
domain = data['data']['acct'][0]['domain']
except (ExternalProgramFailed, IndexError, KeyError):
# use Raven carefully and only in places where
# you sure that sentry is already initialized
raven.base.Raven.captureException(message='failed to get old domain for user from cpanel')
return domain
def cpanel_premodifyacct_main(data):
"""
Pre modify account hook of cPanel
:return: None
"""
user = data.get('user')
# getting old domain
# TODO: why not cpapi?
domain = _get_old_domain(user)
if domain is None:
return 0
# save old domain to file
try:
filename = _safe_tmp_path(user)
except ValueError:
logger.error("Invalid user for tmp file: %s", user)
return 0
makedirs_secure(TMP_DIR, perm=0o750, uid=0, gid=0, parent_path=LVE_DIR)
write_file_secure([domain], filename, uid=0, gid=0, perm=0o700)
return 0
def cpanel_postcreateaddondom_main(data):
args = data.get('args', {})
user = data.get('user', None)
newdomain = args.get('newdomain', None) or args.get('domain', None)
if not all([user, newdomain]):
logger.warning("Missing required arguments: user=%s, newdomain=%s",
str(user),
str(newdomain))
return 0
if not _is_valid_username(user):
logger.warning("Invalid username value: %s", str(user))
return 0
if not _is_valid_domain(newdomain):
logger.warning("Invalid domain value: %s", str(newdomain))
return 0
return subprocess.call(
[
POST_MODIFY_DOMAIN,
'create', '--username', user,
'--domain', newdomain]
)
def cpanel_postkilladdondom_main(data):
args = data.get('args', {})
user = data.get('user', None)
domain = args.get('domain', None)
if not all([user, domain]):
logger.warning("Missing required arguments: user=%s, domain=%s",
str(user),
str(domain))
return 0
if not _is_valid_username(user):
logger.warning("Invalid username value: %s", str(user))
return 0
if not _is_valid_domain(domain):
logger.warning("Invalid domain value: %s", str(domain))
return 0
return subprocess.call(
[
POST_MODIFY_DOMAIN,
'delete', '--username', user,
'--domain', domain]
)
def cpanel_prechangesubdomaindocroot_main(data):
"""
Pre change subdomain document root hook of cPanel
Called when subdomain document root is being changed via
Api2::SubDomain::changedocroot
Saves old docroot to file for post hook to use
:param data: hook data from cPanel
:return: exit code
"""
args = data.get('args', {})
user = data.get('user', None)
subdomain = args.get('subdomain', None)
rootdomain = args.get('rootdomain', None)
new_dir = args.get('dir', None)
if not all([user, subdomain, rootdomain, new_dir]):
logger.error("Missing required arguments: user=%s, subdomain=%s, rootdomain=%s, new_dir=%s",
str(user),
str(subdomain),
str(rootdomain),
str(new_dir))
return 0
# Construct full domain name (subdomain.rootdomain)
domain = f"{subdomain}.{rootdomain}"
# Get old docroot using cpapi
try:
old_docroot, _ = cpapi.docroot(domain)
except Exception:
raven.base.Raven.captureException(
message=f'failed to get old docroot for subdomain {domain}')
logger.warning("Could not get old docroot for domain %s", domain)
return 0
# Save old docroot and new dir to file for post hook
# Use a hash suffix to avoid TOCTOU races with concurrent changes
docroot_tmp_dir = os.path.join(TMP_DIR, 'prechangesubdomaindoc')
suffix = hashlib.md5(f"{user}:{new_dir}".encode()).hexdigest()[:8]
filename = os.path.join(docroot_tmp_dir, f"{domain}.{suffix}")
if not os.path.realpath(filename).startswith(os.path.realpath(docroot_tmp_dir) + os.sep):
logger.error("Invalid domain name %s: path traversal detected", domain)
return 0
if not os.path.realpath(filename).startswith(os.path.realpath(docroot_tmp_dir) + os.sep):
logger.error("Invalid domain name %s: path traversal detected", domain)
return 0
makedirs_secure(TMP_DIR, perm=0o750, uid=0, gid=0, parent_path=LVE_DIR)
makedirs_secure(docroot_tmp_dir, perm=0o750, uid=0, gid=0, parent_path=TMP_DIR)
write_file_secure([old_docroot], filename, uid=0, gid=0, perm=0o700)
return 0
def _read_old_docroot(domain, suffix):
"""
Read saved docroot data for a subdomain
"""
docroot_tmp_dir = os.path.join(TMP_DIR, 'prechangesubdomaindoc')
filename = os.path.join(docroot_tmp_dir, f"{domain}.{suffix}")
if not os.path.realpath(filename).startswith(os.path.realpath(docroot_tmp_dir) + os.sep):
logger.error("Invalid domain name %s: path traversal detected", domain)
return None
if not os.path.realpath(filename).startswith(os.path.realpath(docroot_tmp_dir) + os.sep):
logger.error("Invalid domain name %s: path traversal detected", domain)
return None
try:
content = read_file_secure(filename, uid=0, gid=0, exit_on_error=False,
write_log=False)
old_docroot = content[0]
return old_docroot
except (IndexError, OSError, IOError):
raven.base.Raven.captureException(
message='failed to read old docroot for subdomain (pre hook not called?)')
return None
finally:
try:
os.unlink(filename)
except OSError:
pass
def cpanel_postchangesubdomaindocroot_main(data):
"""
Post change subdomain document root hook of cPanel
Called after subdomain document root is changed via
Api2::SubDomain::changedocroot
Reads saved docroot data and calls post_modify_domain
:param data: hook data from cPanel
:return: exit code
"""
args = data.get('args', {})
user = data.get('user', None)
subdomain = args.get('subdomain', None)
rootdomain = args.get('rootdomain', None)
new_dir = args.get('dir', None)
# new_dir must be in the all() check: the pre-hook only writes a temp
# file when new_dir is present (line ~408 above), and the suffix is
# md5(user + new_dir). If we computed suffix here with new_dir=None,
# the literal string "None" would produce a suffix that does NOT match
# any pre-hook file — _read_old_docroot would silently return None and
# the pre-hook's temp file would leak. Fixes bugbot fbaa32bc on !28.
if not all([user, subdomain, rootdomain, new_dir]):
logger.warning("Missing required arguments: user=%s, subdomain=%s, rootdomain=%s, new_dir=%s",
str(user),
str(subdomain),
str(rootdomain),
str(new_dir))
return 0
# Construct full domain name (subdomain.rootdomain)
domain = f"{subdomain}.{rootdomain}"
suffix = hashlib.md5(f"{user}:{new_dir}".encode()).hexdigest()[:8]
old_docroot = _read_old_docroot(domain, suffix)
if old_docroot is None:
logger.warning("Could not read old docroot for domain %s", domain)
return 0
# new_dir is guaranteed truthy by the all([...]) guard above.
new_docroot = urllib.parse.unquote(new_dir)
if '\0' in new_docroot or '..' in os.path.normpath(new_docroot).split(os.sep):
logger.warning("Invalid new_docroot path for domain %s: %s",
domain, new_docroot)
return 0
cmd = [
POST_MODIFY_DOMAIN,
'modify', '--username', user,
'--domain', domain,
'--old-docroot', old_docroot,
'--new-docroot', new_docroot,
]
return subprocess.call(cmd)