HEX
Server: LiteSpeed
System: Linux srv1.dhviews.com 5.14.0-570.23.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Jun 24 11:27:16 EDT 2025 x86_64
User: bdedition (1723)
PHP: 7.4.33
Disabled: NONE
Upload Files
File: //usr/local/lib/python3.9/site-packages/wordfence/cli/context.py
import sys
from typing import Optional, Any, Callable, Set, Union

from ..version import __version__, __version_name__
from ..util import pcre, vectorscan
from ..util.text import yes_no
from ..api import noc1, intelligence
from ..util.caching import Cache, CacheDirectory, RuntimeCache, \
        InvalidCachedValueException, CacheException
from ..util.input import has_terminal_input, has_terminal_output
from ..util.io import resolve_path
from ..api.licensing import License, LicenseRequiredException, \
        LicenseSpecific, to_license
from ..logging import log, LogLevel, LogSettings
from .config.config import Config
from .email import Mailer


class CliContext:

    def __init__(
                self,
                config: Config,
                cacheable_types: Set[str],
                helper,
                allows_color: bool
            ):
        self.config = config
        self.cacheable_types = cacheable_types
        self.set_up_cache(self.config.cache_directory)
        self.helper = helper
        self.allows_color = allows_color
        self._license = None
        self._noc1_client = None
        self._terms_update_hooks = []
        self._license_update_hooks = []
        self._wfi_client = None
        self._mailer = None
        self.configurer = None
        self._log_settings = None

    def get_log_level(self) -> LogLevel:
        if self.config.log_level is not None:
            return LogLevel[self.config.log_level]
        elif self.config.quiet:
            return LogLevel.CRITICAL
        elif self.config.debug:
            return LogLevel.DEBUG
        elif self.config.verbose or (
                    self.config.verbose is None
                    and sys.stdout is not None and sys.stdout.isatty()
                ):
            return LogLevel.VERBOSE
        else:
            return LogLevel.INFO

    def get_log_settings(self) -> LogSettings:
        if self._log_settings is None:
            prefixed = not self.allows_color \
                if self.config.prefix_log_levels is None \
                else self.config.prefix_log_levels
            self._log_settings = LogSettings(
                    level=self.get_log_level(),
                    colored=self.allows_color,
                    prefixed=prefixed
                )
        return self._log_settings

    def initialize_logging(self) -> None:
        settings = self.get_log_settings()
        settings.apply()

    def set_up_cache(self, directory: bytes) -> None:
        cache = self._initialize_cache(directory)
        cache.add_filter(self.filter_cache_entry)
        self.cache = cache

    def _initialize_cache(self, directory: bytes) -> Cache:
        if self.config.cache:
            try:
                return CacheDirectory(
                        resolve_path(directory),
                        self.cacheable_types
                    )
            except CacheException as exception:
                log.warning(
                        'Failed to initialize directory cache: '
                        + str(exception)
                    )
        return RuntimeCache()

    def register_terms_update_hook(
                self,
                callable: Callable[[bool, License], None]
            ) -> None:
        self._terms_update_hooks.append(callable)

    def register_license_update_hook(
                self,
                callable: Callable[[License], None]
            ) -> None:
        self._license_update_hooks.append(callable)

    def get_license(self) -> Optional[License]:
        if self._license is None and self.config.license is not None:
            self._license = License(self.config.license)
        return self._license

    def require_license(self) -> License:
        license = self.get_license()
        if license is None:
            raise LicenseRequiredException()
        return license

    def filter_cache_entry(self, value: Any) -> Any:
        if isinstance(value, LicenseSpecific):
            if not value.is_compatible_with_license(self.require_license()):
                raise InvalidCachedValueException(
                        'Incompatible license'
                    )
        return value

    def create_noc1_client(
                self,
                license: Optional[Union[License, str]] = None,
                use_hooks: bool = True
            ) -> noc1.Client:
        license = to_license(license)
        client = noc1.Client(
                license,
                self.config.noc1_url
            )
        if use_hooks:
            for hook in self._terms_update_hooks:
                client.register_terms_update_hook(hook)
            for hook in self._license_update_hooks:
                client.register_license_update_hook(hook)
        return client

    def get_noc1_client(self) -> noc1.Client:
        if self._noc1_client is None:
            self._noc1_client = self.create_noc1_client(
                    self.require_license(),
                    use_hooks=True
                )
        return self._noc1_client

    def get_wfi_client(self) -> intelligence.Client:
        if self._wfi_client is None:
            self._wfi_client = intelligence.Client(
                    self.config.wfi_url
                )
        return self._wfi_client

    def get_mailer(self) -> Mailer:
        if self._mailer is None:
            self._mailer = Mailer(self.config)
        return self._mailer

    def has_pcre(self) -> bool:
        return pcre.AVAILABLE

    def has_vectorscan(self) -> bool:
        return vectorscan.AVAILABLE

    def display_version(self) -> None:
        if __version_name__ is None:
            name_suffix = ''
        else:
            name_suffix = f' "{__version_name__}"'
        print(f"Wordfence CLI {__version__}{name_suffix}")
        has_pcre = self.has_pcre()
        pcre_support_text = yes_no(has_pcre)
        if has_pcre:
            jit_support_text = yes_no(pcre.HAS_JIT_SUPPORT)
            pcre_support_text += (
                    f" - PCRE Version: {pcre.VERSION}"
                    f" (JIT Supported: {jit_support_text})"
                )
        print(f'PCRE Supported: {pcre_support_text}')
        has_vectorscan = self.has_vectorscan()
        vectorscan_support_text = yes_no(has_vectorscan)
        if has_vectorscan:
            vectorscan_support_text += (
                    f' - Version: {vectorscan.VERSION} (API Version: '
                    f'{vectorscan.API_VERSION})'
                )
        print(f'Vectorscan Supported: {vectorscan_support_text}')

    def has_terminal_output(self) -> bool:
        return has_terminal_output()

    def has_terminal_input(self) -> bool:
        return has_terminal_input()

    def requires_input(self, option: Optional[bool]) -> bool:
        return (
                option is True or
                (option is None and self.has_terminal_input())
            )

    def clean_up(self) -> None:
        if self._mailer is not None:
            self._mailer.close()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_tb) -> None:
        self.clean_up()