HEX
Server: LiteSpeed
System: Linux srv1.dhviews.com 5.14.0-570.23.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Jun 24 11:27:16 EDT 2025 x86_64
User: bdedition (1723)
PHP: 7.4.33
Disabled: NONE
Upload Files
File: //proc/thread-self/root/usr/local/lib/python3.9/site-packages/wordfence/scanning/matching/pcre.py
import signal
from typing import Optional

from ...intel.signatures import CommonString, Signature, SignatureSet
from ...logging import log
from ...util import pcre
from ...util.pcre import PcrePattern, PcreException, PcreJitStack, \
        PcreOptions, PCRE_DEFAULT_OPTIONS

from .matching import Matcher, BaseMatcherContext, TimeoutException, \
        MatchWorkspace, MatchEngineCompilerOptions, MatchEngineOptions, \
        DEFAULT_TIMEOUT


if not pcre.AVAILABLE:
    raise RuntimeError('PCRE is not available')


class PcreCommonString:

    def __init__(self, common_string: CommonString, pcre_options: PcreOptions):
        self.common_string = common_string
        self.pattern = PcrePattern(common_string.string, pcre_options)


class PcreSignature:

    def __init__(self, signature: Signature, pcre_options: PcreOptions):
        self.signature = signature
        self.anchored_to_start = self._is_anchored_to_start()
        self.pcre_options = pcre_options
        if not signature.has_common_strings():
            self.compile()

    def _is_anchored_to_start(self) -> bool:
        try:
            first_character = self.signature.rule[0]
            return first_character == '^'
        except IndexError:
            # Patterns shouldn't be empty, but if they are, they're not
            # anchored
            return False

    def is_valid(self) -> bool:
        return self.get_pattern() is not None

    def compile(self) -> None:
        try:
            rule = self.signature.rule
            self.pattern = PcrePattern(rule, self.pcre_options)
        except BaseException as error:  # noqa: B036
            log.error('Regex compilation for signature ' +
                      str(self.signature.identifier) +
                      ' failed: ' +
                      str(error) +
                      ', pattern: ' +
                      repr(rule))
            self.pattern = None

    def get_pattern(self) -> PcrePattern:
        # Signature patterns are compiled lazily as they are only needed if
        # common strings are matched and compiling all takes several seconds
        if not hasattr(self, 'pattern'):
            self.compile()
        return self.pattern


class PcreMatchWorkspace(MatchWorkspace):

    def __init__(self):
        self.jit_stack = None

    def __enter__(self):
        self.jit_stack = PcreJitStack()
        return self

    def __exit__(self, exc_type, exc_value, traceback) -> None:
        self.jit_stack.__exit__(exc_type, exc_value, traceback)


class PcreMatcherContext(BaseMatcherContext):

    def __init__(self, matcher: Matcher):
        super().__init__(matcher)
        self.common_string_states = self._initialize_common_string_states()

    def _initialize_common_string_states(self) -> list:
        states = []
        for _common_string in self.matcher.common_strings:
            states.append(False)
        return states

    def _check_common_strings(self, chunk: bytes) -> list:
        common_string_counts = {}
        for index, common_string in enumerate(self.matcher.common_strings):
            if not self.common_string_states[index]:
                try:
                    match = common_string.pattern.match(chunk)
                    if match is not None:
                        self.common_string_states[index] = True
                except PcreException as e:
                    log.debug(
                            'Common string matching failed for '
                            f'{common_string.common_string.string} with error:'
                            f' {e}'
                        )
            if self.common_string_states[index]:
                for identifier in common_string.common_string.signature_ids:
                    if identifier in self.matches:
                        continue
                    if identifier in common_string_counts:
                        common_string_counts[identifier] += 1
                    else:
                        common_string_counts[identifier] = 1
        possible_signatures = []
        for identifier, count in common_string_counts.items():
            signature = self.matcher.signatures[identifier]
            if count == signature.signature.get_common_string_count():
                possible_signatures.append(signature)
        return possible_signatures

    def _match_signature(
                self,
                signature: Signature,
                chunk: bytes,
                start: bool = False,
                jit_stack: Optional[PcreJitStack] = None
            ) -> bool:
        if not signature.is_valid():
            print('Signature is not valid')
            return False
        if signature.anchored_to_start and not start:
            return False
        try:
            signal.setitimer(signal.ITIMER_VIRTUAL, self.matcher.timeout)
            match = signature.get_pattern().match(chunk, jit_stack)
            signal.setitimer(signal.ITIMER_VIRTUAL, 0)  # Clear timeout
            if match is not None:
                self._record_match(
                        identifier=signature.signature.identifier,
                        matched=match.matched_string
                    )
                return True
        except PcreException as e:
            log.debug(
                    'Signature matching failed for '
                    f'{signature.signature.identifier}, {e}'
                )
        except TimeoutException:
            self.timeouts.add(signature.signature.identifier)
        return False

    def process_chunk(
                self,
                chunk: bytes,
                start: bool = False,
                workspace: Optional[MatchWorkspace] = None
            ) -> bool:
        possible_signatures = self._check_common_strings(chunk)
        for signature in self.matcher.signatures_without_common_strings:
            if self._match_signature(
                        signature, chunk, start, workspace.jit_stack
                    ) and not self.matcher.match_all:
                return True
        for signature in possible_signatures:
            if self._match_signature(
                        signature, chunk, start, workspace.jit_stack
                    ) and not self.matcher.match_all:
                return True
        return False

    def __enter__(self):
        def handle_timeout(signum, frame):
            raise TimeoutException()

        self._previous_alarm_handler = signal.signal(
                signal.SIGVTALRM,
                handle_timeout
            )
        if self._previous_alarm_handler is None:
            self._previous_alarm_handler = signal.SIG_DFL
        return super().__enter__()

    def __exit__(self, exc_type, exc_value, traceback) -> None:
        signal.signal(signal.SIGVTALRM, self._previous_alarm_handler)


class PcreMatcher(Matcher):

    def __init__(
                self,
                signature_set: SignatureSet,
                timeout: int = DEFAULT_TIMEOUT,
                match_all: bool = False,
                pcre_options: PcreOptions = PCRE_DEFAULT_OPTIONS,
                lazy: bool = False
            ):
        self.pcre_options = pcre_options
        super().__init__(signature_set, timeout, match_all, lazy)

    def _extract_signatures_without_common_strings(self) -> list:
        signatures = []
        for signature in self.signatures.values():
            if not signature.signature.has_common_strings():
                signatures.append(signature)
        return signatures

    def _compile_common_strings(self) -> None:
        self.common_strings = [
                PcreCommonString(common_string, self.pcre_options)
                for common_string in self.signature_set.common_strings
            ]

    def _compile_signatures(self) -> None:
        self.signatures = {}
        for identifier, signature in self.signature_set.signatures.items():
            self.signatures[identifier] = PcreSignature(
                    signature,
                    self.pcre_options
                )

    def _compile_regexes(self) -> None:
        self._compile_common_strings()
        self._compile_signatures()
        self.signatures_without_common_strings = \
            self._extract_signatures_without_common_strings()

    def _prepare(self) -> None:
        self._compile_regexes()

    def create_context(self) -> PcreMatcherContext:
        return PcreMatcherContext(self)

    def create_workspace(self) -> PcreMatchWorkspace:
        return PcreMatchWorkspace()


def create_compiler(options: MatchEngineCompilerOptions) -> None:
    return None


def validate_pre_compiled_data(source: bytes) -> bool:
    return False  # Pre-compilation is not supported


def create_matcher(options: MatchEngineOptions) -> PcreMatcher:
    return PcreMatcher(
            options.signature_set,
            match_all=options.match_all,
            pcre_options=options.pcre_options,
            lazy=options.lazy
        )