HEX
Server: LiteSpeed
System: Linux srv1.dhviews.com 5.14.0-570.23.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Jun 24 11:27:16 EDT 2025 x86_64
User: bdedition (1723)
PHP: 7.4.33
Disabled: NONE
Upload Files
File: //proc/self/root/usr/local/lib/python3.9/site-packages/wordfence/cli/config/config_items.py
import abc
import base64
import json
from dataclasses import dataclass, fields
from enum import Enum
from functools import lru_cache
from typing import Optional, Any, Dict, Set, Tuple, Type, Callable, Union
from .typing import ConfigDefinitions

# see wordfence/config/__init__.py for special handling of reserved names
invalid_config_item_names: Set[str] = {
    'subcommand',
    'trailing_arguments'
}


class Context(Enum):
    ALL = 1
    """a config item that is available in both the CLI and INI contexts"""
    CLI = 2
    """a config item that is only available in the CLI context (not INI)"""
    CONFIG = 3
    """a config item that is only available in the INI context (not CLI)"""


class ArgumentType(Enum):
    ARGUMENT = 1
    """No names, just ordered CLI values"""
    FLAG = 2
    """boolean values set by name (no value) -- inverts the default value when
    provided"""
    OPTIONAL_FLAG = 3
    """boolean values set by name with an optional third state (None)
    representing the absence of a value"""
    OPTION = 4
    """required the option name plus a value"""
    OPTION_REPEATABLE = 5
    """an option that can be repeated multiple times with different values"""


@dataclass(frozen=True)
class ReferenceToken:
    """Instantiate a new instance to use the `x is y` language construct to
    determine if other instances point to the same token"""
    pass


not_set_token = ReferenceToken()


@dataclass(frozen=True)
class ConfigItemMeta:
    valid_options: Optional[Tuple[str]] = None
    multiple: Optional[bool] = None
    separator: Optional[str] = None
    value_type: Union[Type, Callable] = str
    accepts_file: bool = False
    accepts_directory: bool = False

    def accepts_paths(self) -> bool:
        return self.accepts_file or self.accepts_directory


@dataclass(frozen=True)
class ConfigItemDefinition:
    name: str
    property_name: str
    description: str
    context: Context
    argument_type: ArgumentType
    default: Any
    hidden: bool = False
    short_name: Optional[str] = None
    meta: Optional[ConfigItemMeta] = None
    category: str = 'General Options'

    @staticmethod
    def clean_argument_dict(source: Dict[str, Any]) -> Dict[str, Any]:
        return {key: value for key, value in source.items() if
                key in get_data_item_fields()}

    def has_options_list(self) -> bool:
        return True if self.meta and self.meta.valid_options else False

    def has_separator(self) -> bool:
        return True if self.meta and self.meta.separator else False

    def is_flag(self) -> bool:
        return self.argument_type == ArgumentType.FLAG \
            or self.argument_type == ArgumentType.OPTIONAL_FLAG

    def accepts_value(self) -> bool:
        return not self.is_flag()

    def get_value_type(self):
        if self.is_flag():
            return bool
        if not self.meta:
            return str
        return_type = self.meta.value_type
        if not return_type:
            if self.meta.accepts_paths():
                return bytes
            raise ValueError(
                f"Specified type not in the allow list: {self.meta.value_type}"
                )
        return return_type

    def accepts_paths(self) -> bool:
        return self.meta and self.meta.accepts_paths()

    @classmethod
    def from_dict(cls, source: dict):
        # The property name is always derived from the configuration's "name"
        # value. Any "property_name" value specified in the configuration is
        # ignored.
        source['property_name'] = source['name'].replace('-', '_')

        is_optional_flag = \
            ArgumentType.OPTIONAL_FLAG == source['argument_type']
        is_flag = (
                is_optional_flag or
                ArgumentType.FLAG == source['argument_type']
            )

        if source.get('default_type', None) == 'base64':
            if 'default' not in source:
                raise ValueError(
                    "When base64 default type is specified, a default value "
                    "must be present")
            source['default'] = base64.b64decode(source['default'])

        if 'default' not in source:
            source['default'] = not_set_token
        # convert enums
        source['context'] = source['context'] if isinstance(source['context'],
                                                            Context) else \
            Context[source['context']]
        source['argument_type'] = source['argument_type'] if isinstance(
            source['argument_type'], ArgumentType) else \
            ArgumentType[
                source['argument_type']]

        # convert the meta dict to an object to make it hashable
        if source.get('meta', False):
            # convert lists to tuples to make them hashable
            if source['meta'].get('valid_options', False):
                source['meta']['valid_options'] = tuple(
                    source['meta']['valid_options'])
            # set flags to booleans types
            # if another type is not already defined
            if not_set_token is source['meta'].get('value_type',
                                                   not_set_token) and is_flag:
                source['meta']['value_type'] = 'bool'
            source['meta'] = ConfigItemMeta(**source['meta'])
        else:
            source['meta'] = ConfigItemMeta()

        # sanity check
        if is_flag and not (
                    isinstance(source['default'], bool) or
                    (is_optional_flag and source['default'] is None)
                ):
            raise ValueError(
                f"Flag {source['name']} has a non-boolean value type defined: "
                f"{type(source['default'])}")
        return cls(**ConfigItemDefinition.clean_argument_dict(source))

    @classmethod
    def from_json(cls, source: str):
        return cls.from_dict(json.loads(source))


@dataclass(frozen=True)
class ConfigValue:
    definition: ConfigItemDefinition
    value: Any


class CanonicalValueExtractorInterface(metaclass=abc.ABCMeta):
    @classmethod
    def __subclasshook__(cls, subclass):
        return (callable(subclass.get_canonical_value),
                callable(subclass.is_valid_source),
                callable(subclass.assert_is_valid_source))

    @abc.abstractmethod
    def is_valid_source(self, source: Any) -> bool:
        """Validate the source is supported"""
        raise NotImplementedError

    def assert_is_valid_source(self, source: Any) -> None:
        if not self.is_valid_source(source):
            raise ValueError(f"Invalid configuration source: {type(source)}")

    @abc.abstractmethod
    def get_canonical_value(self, definition: ConfigItemDefinition,
                            source: Any) -> Any:
        """Return the canonical configuration value as stored in the
        configuration source"""
        raise NotImplementedError

    @abc.abstractmethod
    def get_context(self) -> Context:
        raise NotImplementedError


@lru_cache(maxsize=1)
def get_data_item_fields() -> Set[str]:
    return set([x.name for x in fields(ConfigItemDefinition)])


def config_definitions_to_config_map(
            config_definitions: ConfigDefinitions
        ) -> Dict[str, ConfigItemDefinition]:
    result: Dict[str, ConfigItemDefinition] = {}
    used_short_names: Set[str] = set()
    implied_names: Set[str] = set()
    for name, value in config_definitions.items():
        value['name'] = name
        config_item = ConfigItemDefinition.from_dict(value)
        if config_item.name in result:
            raise KeyError(
                f"The name {json.dumps(config_item.name)} has already been "
                f"loaded")
        if config_item.name in implied_names:
            raise KeyError(
                f"A configured flag has already claimed "
                f"{json.dumps(config_item.name)} as an implied name")
        if config_item.short_name:
            if config_item.short_name in used_short_names:
                raise KeyError(
                    f"The short name {json.dumps(config_item.short_name)} "
                    f"has already been loaded")
            else:
                used_short_names.add(config_item.short_name)
        if config_item.is_flag():
            implied_name = f'no-{config_item.name}'
            if implied_name in implied_names:
                raise KeyError(
                    f"Another option has already taken the implied name "
                    f"{json.dumps(implied_name)}")
            implied_names.add(implied_name)

        if config_item.name in invalid_config_item_names:
            raise KeyError(
                f"The name {json.dumps(config_item.name)} is reserved")
        result[config_item.name] = config_item
    return result


def merge_config_maps(
            a: Dict[str, ConfigItemDefinition],
            b: Dict[str, ConfigItemDefinition]
        ) -> Dict[str, ConfigItemDefinition]:
    return {**a, **b}