HEX
Server: LiteSpeed
System: Linux srv1.dhviews.com 5.14.0-570.23.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Jun 24 11:27:16 EDT 2025 x86_64
User: bdedition (1723)
PHP: 7.4.33
Disabled: NONE
Upload Files
File: //proc/self/root/usr/local/lib/python3.9/site-packages/wordfence/cli/malwarescan/malwarescan.py
import sys
import signal
import logging
import pickle
import tempfile
import os
from multiprocessing import parent_process
from typing import Optional, Tuple

from wordfence import scanning
from wordfence.scanning import filtering
from wordfence.scanning.matching import MatchEngine, MatchEngineOptions, \
    MatchEngineCompilerOptions
from wordfence.util import caching, pcre, vectorscan
from wordfence.util.platform import Platform, UnknownPlatform
from wordfence.intel.signatures import SignatureSet, PrecompiledSignatureSet, \
    deserialize_precompiled_signature_set
from wordfence.util.io import chmod_with_umask
from wordfence.logging import (log, remove_initial_handler,
                               restore_initial_handler)
from ..subcommands import Subcommand
from .reporting import ScanReportManager
from .progress import ProgressDisplay, ProgressException, reset_terminal


screen_handler: Optional[logging.Handler] = None


def revert_progress_changes() -> None:
    global screen_handler
    if screen_handler:
        log.removeHandler(screen_handler)
    restore_initial_handler()
    reset_terminal()


class MalwareScanSubcommand(Subcommand):

    def _filter_signatures(
                self,
                signatures: SignatureSet,
            ) -> None:
        if self.config.include_signatures:
            for identifier in list(signatures.signatures.keys()):
                if identifier not in self.config.include_signatures:
                    signatures.remove_signature(identifier)
            for identifier in self.config.include_signatures:
                if identifier in signatures.signatures:
                    log.debug(f'Including signature: {identifier}')
                else:
                    log.warning(
                                f'Signature {identifier} was not found and '
                                'could not be included'
                            )
        if self.config.exclude_signatures is not None:
            for identifier in self.config.exclude_signatures:
                if signatures.remove_signature(identifier):
                    log.debug(f'Excluded signature {identifier}')
                else:
                    log.warning(
                            f'Signature {identifier} is not in the existing '
                            'set. It will not be used in the scan.'
                        )
        signature_count = len(signatures.signatures)
        log.debug(f'Filtered signature count: {signature_count}')

    def _get_pre_compiled_signatures(
                self,
                match_engine: MatchEngine
            ) -> Optional[PrecompiledSignatureSet]:

        def fetch_pre_compiled() -> Optional[PrecompiledSignatureSet]:
            client = self.context.get_noc1_client()
            try:
                pre_compiled = client.get_precompiled_malware_signatures(
                        Platform.detect(),
                        vectorscan.API_VERSION
                    )
                if pre_compiled is None:
                    log.warning(
                            'No compatible pre-compiled signature set was '
                            'found. Signatures will be compiled locally.'
                        )
                return pre_compiled
            except UnknownPlatform:
                log.warning(
                        'Unable to determine current platform, pre-compiled '
                        'signatures cannot be used'
                    )
                return None

        cacheable = caching.Cacheable(
                f'pre-compiled-signatures-{match_engine.module}',
                fetch_pre_compiled,
                caching.DURATION_ONE_DAY
            )
        return cacheable.get(self.cache)

    def _get_signatures(
                self,
                match_engine: MatchEngine,
                check_precompiled: bool = True
            ) -> SignatureSet:
        supports_pre_compilation = match_engine.supports_pre_compilation()
        if supports_pre_compilation and check_precompiled:
            precompiled = self._get_pre_compiled_signatures(match_engine)
            if precompiled is not None:
                filtered = precompiled.signature_set
                if match_engine.validate_database_source(precompiled.data):
                    self._filter_signatures(filtered)
                    return filtered, precompiled.data
                else:
                    log.warning(
                            'Pre-compiled database source is not valid...'
                            'compiling locally...'
                        )

        def fetch_signatures() -> SignatureSet:
            noc1_client = self.context.get_noc1_client()
            return noc1_client.get_malware_signatures()

        self.cacheable_signatures = caching.Cacheable(
                'signatures',
                fetch_signatures,
                caching.DURATION_ONE_DAY
            )
        signatures = self.cacheable_signatures.get(self.cache)
        self._filter_signatures(signatures)
        return signatures, None

    def _get_file_list_separator(self) -> str:
        if isinstance(self.config.file_list_separator, bytes):
            return self.config.file_list_separator.decode('utf-8')
        return self.config.file_list_separator

    def _initialize_file_filter(self) -> filtering.FileFilter:
        filter = filtering.FileFilter()
        has_include_overrides = False
        if self.config.include_all_files:
            filter.add(filtering.filter_any)
        if self.config.include_files is not None:
            has_include_overrides = True
            for name in self.config.include_files:
                filter.add(filtering.FilenameFilter(os.fsencode(name)))
        if self.config.include_files_pattern is not None:
            has_include_overrides = True
            for pattern in self.config.include_files_pattern:
                try:
                    filter.add(filtering.filter_pattern(os.fsencode(pattern)))
                except filtering.InvalidPatternException:
                    raise Exception(
                            'File inclusion pattern must be a Python regex, '
                            f'received: "{pattern}"'
                        )
        if self.config.exclude_files is not None:
            for name in self.config.exclude_files:
                filter.add(filtering.FilenameFilter(os.fsencode(name)), False)
        if self.config.exclude_files_pattern is not None:
            for pattern in self.config.exclude_files_pattern:
                try:
                    filter.add(
                            filtering.filter_pattern(
                                os.fsencode(pattern)
                            ),
                            False
                        )
                except filtering.InvalidPatternException:
                    raise Exception(
                            'File exclusion pattern must be a Python regex, '
                            f'received: "{pattern}"'
                        )
        if not has_include_overrides:
            filter.add(filtering.filter_php)
            filter.add(filtering.filter_html)
            filter.add(filtering.filter_js)
            if self.config.images:
                filter.add(filtering.filter_images)
        return filter

    def _get_pcre_options(self) -> pcre.PcreOptions:
        return pcre.PcreOptions(
                    caseless=True,
                    match_limit=self.config.pcre_backtrack_limit,
                    match_limit_recursion=self.config.pcre_recursion_limit
                )

    def _get_compiled_signatures_cache_key(
                self,
                match_engine: MatchEngine
            ) -> str:
        return f'compiled-signatures-{match_engine.module}'

    def _get_database_source(
                self,
                match_engine: MatchEngine,
                match_engine_options: MatchEngineOptions,
                force: bool = False,
                generic: bool = True
            ) -> Tuple[Optional[bytes], Optional[caching.Cacheable]]:
        current_hash = match_engine_options.signature_set.get_hash()

        def compile_database():
            compiler_options = MatchEngineCompilerOptions(
                    generic=generic
                )
            compiler = match_engine.get_compiler(compiler_options)
            if compiler is None:
                return None
            compiled = compiler.compile_serializable(
                    match_engine_options.signature_set
                )
            signature_set = PrecompiledSignatureSet(
                    match_engine_options.signature_set,
                    compiled,
                    current_hash,
                    match_engine_options.signature_set.license
                )
            if generic:
                signature_set.clear_license()
            return signature_set

        def is_precompiled_compatible(precompiled):
            return (
                   precompiled is not None and
                   precompiled.signature_hash == current_hash and
                   precompiled.is_supported_version()
               )

        if self.config.pattern_database_path is None:

            def filter_precompiled(precompiled):
                if not is_precompiled_compatible(precompiled):
                    raise caching.InvalidCachedValueException(
                            'Incompatible signature set'
                        )
                return precompiled
            cacheable = caching.Cacheable(
                    self._get_compiled_signatures_cache_key(match_engine),
                    compile_database,
                    filters=[
                        filter_precompiled
                    ]
                )
            if force:
                cacheable.delete(self.cache)
            precompiled = cacheable.get(self.cache)

        else:

            precompiled = None
            database_path = self.config.pattern_database_path
            if not force:
                try:
                    with open(database_path, 'rb') as file:
                        existing = file.read()
                        if len(existing) > 0:
                            existing = \
                                deserialize_precompiled_signature_set(
                                    existing
                                )
                            if is_precompiled_compatible(existing):
                                precompiled = existing
                except FileNotFoundError:
                    pass

            if precompiled is None:
                precompiled = compile_database()
                with tempfile.NamedTemporaryFile(
                            dir=os.path.dirname(database_path),
                            prefix='compiling.',
                            suffix='.db',
                            delete=False
                        ) as file:
                    file.write(pickle.dumps(precompiled))
                    file.flush()
                    file.close()
                    chmod_with_umask(file.name)
                    os.replace(file.name, database_path)

        if precompiled is None:
            return None
        return precompiled.data

    def _initialize_interrupt_handling(self) -> None:

        def handle_interrupt(signal_number: int, stack) -> None:
            revert_progress_changes()
            if parent_process() is None:
                log.info('Scan command interrupted, stopping...')
                self.terminate()
                reset_terminal()
            sys.exit(130)

        signal.signal(signal.SIGINT, handle_interrupt)

    def invoke(self) -> int:
        self._initialize_interrupt_handling()

        match_engine = MatchEngine.for_option(self.config.match_engine)
        pre_compile = (
                self.config.pre_compile
                or self.config.pre_compile_generic
            )
        signatures, database_source = self._get_signatures(
                match_engine,
                check_precompiled=not (
                        pre_compile or self.config.compile_local
                    )
            )
        match_engine_options = MatchEngineOptions(
                signature_set=signatures,
                pcre_options=self._get_pcre_options(),
                match_all=self.config.match_all,
            )
        if database_source is None:
            database_source = self._get_database_source(
                match_engine,
                match_engine_options,
                force=self.config.re_compile,
                generic=self.config.pre_compile_generic
            )
        match_engine_options.database_source = database_source

        if pre_compile:
            if match_engine_options.database_source is None:
                log.error(
                        'Signature set pre-compilation is not supported '
                        'with the current options'
                    )
                return 1
            else:
                log.info('Signature set successfully pre-compiled')
                return 0

        report_manager = ScanReportManager(
                self.context,
                signatures
            )
        io_manager = report_manager.get_io_manager()

        progress = None
        if self.config.progress:
            global screen_handler
            progress = ProgressDisplay(int(self.config.workers))
            screen_handler = progress.get_log_handler()
            if sys.stderr is None or sys.stderr.isatty():
                remove_initial_handler()
            log.addHandler(screen_handler)
            report_manager.set_progress_display(progress)

        paths = set()
        for argument in self.config.trailing_arguments:
            paths.add(argument)

        options = scanning.scanner.Options(
                paths=paths,
                match_engine_options=match_engine_options,
                workers=int(self.config.workers),
                chunk_size=self.config.chunk_size,
                scanned_content_limit=int(self.config.scanned_content_limit),
                file_filter=self._initialize_file_filter(),
                allow_io_errors=self.config.allow_io_errors,
                debug=self.config.debug,
                logging_initializer=self.context.get_log_settings().apply,
                match_engine=match_engine,
                profile=self.config.profile,
                profile_path=self.config.profile_path,
                direct_io=self.config.direct_io
            )
        if io_manager.should_read_stdin():
            options.path_source = io_manager.get_input_reader()

        with report_manager.open_output_file() as output_file:
            report = report_manager.initialize_report(
                    output_file
                )
            self.scanner = scanning.scanner.Scanner(options)
            if progress:
                use_log_events = True
            else:
                use_log_events = False
            metrics, timer = self.scanner.scan(
                    report.add_result,
                    progress.handle_update if progress is not None else None,
                    progress.scan_finished_handler if progress is not None
                    else None,
                    use_log_events
                )

            if progress:
                progress.end_on_input()
                revert_progress_changes()
                if progress.results_message:
                    print(progress.results_message)
            report.metrics = metrics
            report.timer = timer
            report.complete()
        return 0

    def terminate(self) -> None:
        if hasattr(self, 'scanner') and self.scanner is not None:
            self.scanner.terminate()
        reset_terminal()

    def generate_exception_message(
                self,
                exception: BaseException
            ) -> Optional[str]:
        if isinstance(exception, ProgressException):
            return (
                    'The current terminal is too small to '
                    'display progress output with the current scan '
                    'options'
                )
        return super().generate_exception_message(exception)


factory = MalwareScanSubcommand