File: //proc/thread-self/root/opt/cloudlinux/venv/lib64/python3.11/site-packages/wmt/common/utils.py
#!/opt/cloudlinux/venv/bin/python3 -bb
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENCE.TXT
#
import contextlib
import logging
import fcntl
import os
import time
from urllib.parse import urlparse
import requests
import subprocess
from clcommon import cpapi
from clcommon.utils import get_rhn_systemid_value
from wmt.common.url_parser import parse
from wmt.common.const import (
CLICKHOUSE_WMT_ENDPOINT,
JWT_TOKEN,
UNKNOWN_RHN_ID,
SERVICE_BIN,
CHKCONFIG_BIN,
SYSTEMCTL_BIN
)
# logic copied from get_domains script, that is used by go
def get_domains():
domains = set()
if cpapi.CP_NAME == cpapi.PLESK_NAME:
users = [_cpinfo[0] for _cpinfo in cpapi.cpinfo(keyls=('cplogin',))]
else:
users = cpapi.cpusers()
if not users:
return domains
try:
suspended_users = cpapi.suspended_users_list()
except Exception:
logging.exception('Cannot obtain list of suspended users')
suspended_users = []
# dirty hack to reload domains in runtime
# TODO: replace with more suitable solution
if cpapi.CP_NAME == 'cPanel':
cpapi.plugins.cpanel._user_to_domains_map_cpanel = dict()
for user in users:
if user in suspended_users:
logging.warning('User: %s is will not be pinged, because account is suspended', user)
continue
for domain, _ in cpapi.userdomains(user):
domains.add(parse(domain))
return domains
def setup_logger(logger_name):
app_logger = logging.getLogger(logger_name)
app_logger.setLevel(logging.DEBUG)
try:
old_umask = os.umask(0o137)
try:
fh = logging.FileHandler('/var/log/cl_wmt.log')
finally:
os.umask(old_umask)
except IOError:
pass
else:
# Use FileHandler.baseFilename so test fixtures that redirect the
# handler at a tempfile chmod the same path the FileHandler opened,
# and so future relocation of the log path doesn't drift the chmod.
try:
os.chmod(fh.baseFilename, 0o600)
except OSError:
pass
fh.formatter = logging.Formatter('[%(levelname)s | %(asctime)s]: %(message)s')
app_logger.addHandler(fh)
return app_logger
@contextlib.contextmanager
def save_pid_and_lock(file: str, pid: str):
# 'a+' creates the file if missing but does NOT truncate. Truncating
# before flock would destroy a running daemon's PID if a second startup
# raced in and lost the lock acquisition below.
f = open(file, 'a+')
try:
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
except OSError:
f.close()
raise OSError('Process %s already running!' % pid)
try:
f.seek(0)
f.truncate()
f.write(pid)
f.flush()
os.fsync(f)
yield
finally:
fcntl.flock(f.fileno(), fcntl.LOCK_UN)
f.close()
os.remove(file)
def intersect(d1, d2):
result = {}
for key in d1:
if key in d2:
result[key] = d1[key], d2[key]
return result
def send_report(url, report, headers=None):
error = requests.RequestException('Error while sending report to ClickHouse')
for i in range(5):
try:
response = requests.post(url, json=report, headers=headers, timeout=60)
response.raise_for_status()
except requests.RequestException as err:
error = err
time.sleep(min(4 ** i, 30))
else:
break
else:
raise error
def _strip_url_to_origin(url):
parsed = urlparse(url)
if parsed.scheme and parsed.netloc:
return '{}://{}'.format(parsed.scheme, parsed.netloc)
return url
def _read_jwt_token():
# UnicodeDecodeError (subclass of ValueError, not OSError) covers a
# corrupt or binary-overwritten token file; without it the daemon
# would propagate an unhandled exception through send_report_to_clickhouse,
# violating the "graceful fallback to unauthenticated" contract.
try:
with open(JWT_TOKEN, 'r') as f:
return f.read().strip()
except (IOError, OSError, UnicodeDecodeError):
logging.warning('Cannot read JWT token from %s', JWT_TOKEN)
return None
def send_report_to_clickhouse(report):
systemd_id = get_rhn_systemid_value('system_id') or UNKNOWN_RHN_ID
# ID-XXXXXXXX -> XXXXXXXX
report['server_id'] = systemd_id.replace('ID-', '')
summary = report.pop('summary_report')
# !17 — minimize disclosure: send scheme + hostname only
for entry in report.get('error_report', []):
entry['url'] = _strip_url_to_origin(entry['url'])
for entry in report.get('duration_report', []):
entry['url'] = _strip_url_to_origin(entry['url'])
# !15 — attach JWT bearer if available; falls through with empty
# headers when the token file is missing (defense-in-depth, not
# enforced server-side).
headers = {}
token = _read_jwt_token()
if token:
headers['Authorization'] = 'Bearer ' + token
send_report(CLICKHOUSE_WMT_ENDPOINT, {**report, **summary}, headers=headers)
def enable_wmt_daemon(daemon_name, is_cl6):
"""
Enable cl_wmt_scanner service
"""
if is_cl6:
# CL6
# Start wmt service
subprocess.run([SERVICE_BIN, daemon_name, 'start'],
capture_output=True)
subprocess.run([CHKCONFIG_BIN, '--add', daemon_name],
capture_output=True)
else:
# CL7, CL8
subprocess.run([SYSTEMCTL_BIN, 'daemon-reload'],
capture_output=True)
# Start wmt daemon
subprocess.run([SYSTEMCTL_BIN, 'enable', daemon_name],
capture_output=True)
subprocess.run([SYSTEMCTL_BIN, 'start', daemon_name],
capture_output=True)
def disable_wmt_daemon(daemon_name, is_cl6):
"""
Disable WMT daemon
:return:
"""
if is_cl6:
# CL6
subprocess.run([SERVICE_BIN, daemon_name, 'stop'],
capture_output=True)
subprocess.run([CHKCONFIG_BIN, '--del', daemon_name],
capture_output=True)
else:
# CL7, CL8
subprocess.run([SYSTEMCTL_BIN, 'kill', daemon_name],
capture_output=True)
subprocess.run([SYSTEMCTL_BIN, 'disable', daemon_name],
capture_output=True)
def manage_crons(status=True):
cron_tool = '/usr/share/web-monitoring-tool/cron_control.py'
if status:
command = [cron_tool, '-i']
else:
command = [cron_tool, '-d']
subprocess.run(command, capture_output=True, text=True)