HEX
Server: LiteSpeed
System: Linux srv1.dhviews.com 5.14.0-570.23.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Jun 24 11:27:16 EDT 2025 x86_64
User: bdedition (1723)
PHP: 7.4.33
Disabled: NONE
Upload Files
File: //proc/self/root/usr/local/lib/python3.9/site-packages/wordfence/wordpress/extension.py
import re
import os
from typing import Optional, Dict, List

from ..logging import log
from ..util.io import SYMLINK_IO_ERRORS
from ..util.encoding import str_to_bytes
from .exceptions import ExtensionException


HEADER_READ_SIZE = 8 * 1024
HEADER_CLEANUP_PATTERN = re.compile(r'\s*(?:\*\/|\?>).*')
CARRIAGE_RETURN_PATTERN = re.compile('\r')


class Extension:

    def __init__(
                self,
                slug: str,
                version: Optional[bytes],
                header: Dict[str, str],
                path: bytes
            ):
        self.slug = slug
        self.version = version
        self.header = header
        self.path = path

    def get_name(self) -> str:
        try:
            return self.header['Name']
        except KeyError:
            return self.slug

    def __str__(self) -> str:
        return f'{self.slug}({self.version})'


class ExtensionLoader:

    def __init__(
                self,
                extension_type: str,
                directory: str,
                header_fields: Dict[str, str],
                allow_io_errors: bool = False,
            ):
        self.extension_type = extension_type
        self.directory = directory
        self.header_fields = header_fields
        self.allow_io_errors = allow_io_errors

    def _clean_up_header_value(self, value: str) -> str:
        return HEADER_CLEANUP_PATTERN.sub('', value).strip()

    def _read_header(self, path: bytes) -> str:
        try:
            with open(path, 'r', errors='replace') as stream:
                data = stream.read(HEADER_READ_SIZE)
                return data
        except OSError as error:
            raise ExtensionException(
                    f'Unable to read {self.extension_type} header from '
                    + os.fsdecode(path)
                ) from error

    def _parse_header(
                self,
                data: str,
            ) -> Dict[str, str]:
        data = CARRIAGE_RETURN_PATTERN.sub('\n', data)
        values = {}
        for field, pattern in self.header_fields.items():
            match = re.search(
                    '^[ \t\\/*#@]*' + re.escape(pattern) + r':(.*)$',
                    data,
                    re.MULTILINE | re.IGNORECASE
                )
            if match is not None:
                values[field] = self._clean_up_header_value(match.group(1))
        return values

    def load(
                self,
                slug: str,
                path: bytes,
                base_path: Optional[bytes] = None
            ) -> Optional[Extension]:
        header_data = self._read_header(path)
        header = self._parse_header(header_data)
        if 'Name' not in header:
            return None
        try:
            version = header['Version']
            if isinstance(version, str):
                version = str_to_bytes(version)
        except KeyError:
            version = None
        if base_path is None:
            base_path = path
        return self._initialize_extension(slug, version, header, base_path)

    def _initialize_extension(
                self,
                slug: str,
                version: Optional[str],
                header: Dict[str, str],
                path: bytes
            ):
        return Extension(
                slug=slug,
                version=version,
                header=header,
                path=path
            )

    def _process_entry(entry: os.DirEntry) -> Optional[Extension]:
        return None

    def load_all(self) -> List[Extension]:
        extensions = []
        try:
            for entry in os.scandir(self.directory):
                try:
                    extension = self._process_entry(entry)
                    if extension is not None:
                        extensions.append(extension)
                except OSError as error:
                    if error.errno in SYMLINK_IO_ERRORS:
                        continue
                    if self.allow_io_errors:
                        log.warning(
                                f'Unable to load {self.extension_type} from '
                                + os.fsdecode(entry.path) + f': {error}'
                            )
                    else:
                        raise
        except OSError as error:
            if error.errno not in SYMLINK_IO_ERRORS:
                message = (
                        f'Unable to scan {self.extension_type} directory at '
                        + os.fsdecode(self.directory)
                    )
                if self.allow_io_errors:
                    log.warning(message)
                else:
                    raise ExtensionException(message) from error
        return extensions