File: //proc/self/root/usr/local/lib/python3.9/site-packages/wordfence/cli/malwarescan/malwarescan.py
import sys
import signal
import logging
import pickle
import tempfile
import os
from multiprocessing import parent_process
from typing import Optional, Tuple
from wordfence import scanning
from wordfence.scanning import filtering
from wordfence.scanning.matching import MatchEngine, MatchEngineOptions, \
MatchEngineCompilerOptions
from wordfence.util import caching, pcre, vectorscan
from wordfence.util.platform import Platform, UnknownPlatform
from wordfence.intel.signatures import SignatureSet, PrecompiledSignatureSet, \
deserialize_precompiled_signature_set
from wordfence.util.io import chmod_with_umask
from wordfence.logging import (log, remove_initial_handler,
restore_initial_handler)
from ..subcommands import Subcommand
from .reporting import ScanReportManager
from .progress import ProgressDisplay, ProgressException, reset_terminal
screen_handler: Optional[logging.Handler] = None
def revert_progress_changes() -> None:
global screen_handler
if screen_handler:
log.removeHandler(screen_handler)
restore_initial_handler()
reset_terminal()
class MalwareScanSubcommand(Subcommand):
def _filter_signatures(
self,
signatures: SignatureSet,
) -> None:
if self.config.include_signatures:
for identifier in list(signatures.signatures.keys()):
if identifier not in self.config.include_signatures:
signatures.remove_signature(identifier)
for identifier in self.config.include_signatures:
if identifier in signatures.signatures:
log.debug(f'Including signature: {identifier}')
else:
log.warning(
f'Signature {identifier} was not found and '
'could not be included'
)
if self.config.exclude_signatures is not None:
for identifier in self.config.exclude_signatures:
if signatures.remove_signature(identifier):
log.debug(f'Excluded signature {identifier}')
else:
log.warning(
f'Signature {identifier} is not in the existing '
'set. It will not be used in the scan.'
)
signature_count = len(signatures.signatures)
log.debug(f'Filtered signature count: {signature_count}')
def _get_pre_compiled_signatures(
self,
match_engine: MatchEngine
) -> Optional[PrecompiledSignatureSet]:
def fetch_pre_compiled() -> Optional[PrecompiledSignatureSet]:
client = self.context.get_noc1_client()
try:
pre_compiled = client.get_precompiled_malware_signatures(
Platform.detect(),
vectorscan.API_VERSION
)
if pre_compiled is None:
log.warning(
'No compatible pre-compiled signature set was '
'found. Signatures will be compiled locally.'
)
return pre_compiled
except UnknownPlatform:
log.warning(
'Unable to determine current platform, pre-compiled '
'signatures cannot be used'
)
return None
cacheable = caching.Cacheable(
f'pre-compiled-signatures-{match_engine.module}',
fetch_pre_compiled,
caching.DURATION_ONE_DAY
)
return cacheable.get(self.cache)
def _get_signatures(
self,
match_engine: MatchEngine,
check_precompiled: bool = True
) -> SignatureSet:
supports_pre_compilation = match_engine.supports_pre_compilation()
if supports_pre_compilation and check_precompiled:
precompiled = self._get_pre_compiled_signatures(match_engine)
if precompiled is not None:
filtered = precompiled.signature_set
if match_engine.validate_database_source(precompiled.data):
self._filter_signatures(filtered)
return filtered, precompiled.data
else:
log.warning(
'Pre-compiled database source is not valid...'
'compiling locally...'
)
def fetch_signatures() -> SignatureSet:
noc1_client = self.context.get_noc1_client()
return noc1_client.get_malware_signatures()
self.cacheable_signatures = caching.Cacheable(
'signatures',
fetch_signatures,
caching.DURATION_ONE_DAY
)
signatures = self.cacheable_signatures.get(self.cache)
self._filter_signatures(signatures)
return signatures, None
def _get_file_list_separator(self) -> str:
if isinstance(self.config.file_list_separator, bytes):
return self.config.file_list_separator.decode('utf-8')
return self.config.file_list_separator
def _initialize_file_filter(self) -> filtering.FileFilter:
filter = filtering.FileFilter()
has_include_overrides = False
if self.config.include_all_files:
filter.add(filtering.filter_any)
if self.config.include_files is not None:
has_include_overrides = True
for name in self.config.include_files:
filter.add(filtering.FilenameFilter(os.fsencode(name)))
if self.config.include_files_pattern is not None:
has_include_overrides = True
for pattern in self.config.include_files_pattern:
try:
filter.add(filtering.filter_pattern(os.fsencode(pattern)))
except filtering.InvalidPatternException:
raise Exception(
'File inclusion pattern must be a Python regex, '
f'received: "{pattern}"'
)
if self.config.exclude_files is not None:
for name in self.config.exclude_files:
filter.add(filtering.FilenameFilter(os.fsencode(name)), False)
if self.config.exclude_files_pattern is not None:
for pattern in self.config.exclude_files_pattern:
try:
filter.add(
filtering.filter_pattern(
os.fsencode(pattern)
),
False
)
except filtering.InvalidPatternException:
raise Exception(
'File exclusion pattern must be a Python regex, '
f'received: "{pattern}"'
)
if not has_include_overrides:
filter.add(filtering.filter_php)
filter.add(filtering.filter_html)
filter.add(filtering.filter_js)
if self.config.images:
filter.add(filtering.filter_images)
return filter
def _get_pcre_options(self) -> pcre.PcreOptions:
return pcre.PcreOptions(
caseless=True,
match_limit=self.config.pcre_backtrack_limit,
match_limit_recursion=self.config.pcre_recursion_limit
)
def _get_compiled_signatures_cache_key(
self,
match_engine: MatchEngine
) -> str:
return f'compiled-signatures-{match_engine.module}'
def _get_database_source(
self,
match_engine: MatchEngine,
match_engine_options: MatchEngineOptions,
force: bool = False,
generic: bool = True
) -> Tuple[Optional[bytes], Optional[caching.Cacheable]]:
current_hash = match_engine_options.signature_set.get_hash()
def compile_database():
compiler_options = MatchEngineCompilerOptions(
generic=generic
)
compiler = match_engine.get_compiler(compiler_options)
if compiler is None:
return None
compiled = compiler.compile_serializable(
match_engine_options.signature_set
)
signature_set = PrecompiledSignatureSet(
match_engine_options.signature_set,
compiled,
current_hash,
match_engine_options.signature_set.license
)
if generic:
signature_set.clear_license()
return signature_set
def is_precompiled_compatible(precompiled):
return (
precompiled is not None and
precompiled.signature_hash == current_hash and
precompiled.is_supported_version()
)
if self.config.pattern_database_path is None:
def filter_precompiled(precompiled):
if not is_precompiled_compatible(precompiled):
raise caching.InvalidCachedValueException(
'Incompatible signature set'
)
return precompiled
cacheable = caching.Cacheable(
self._get_compiled_signatures_cache_key(match_engine),
compile_database,
filters=[
filter_precompiled
]
)
if force:
cacheable.delete(self.cache)
precompiled = cacheable.get(self.cache)
else:
precompiled = None
database_path = self.config.pattern_database_path
if not force:
try:
with open(database_path, 'rb') as file:
existing = file.read()
if len(existing) > 0:
existing = \
deserialize_precompiled_signature_set(
existing
)
if is_precompiled_compatible(existing):
precompiled = existing
except FileNotFoundError:
pass
if precompiled is None:
precompiled = compile_database()
with tempfile.NamedTemporaryFile(
dir=os.path.dirname(database_path),
prefix='compiling.',
suffix='.db',
delete=False
) as file:
file.write(pickle.dumps(precompiled))
file.flush()
file.close()
chmod_with_umask(file.name)
os.replace(file.name, database_path)
if precompiled is None:
return None
return precompiled.data
def _initialize_interrupt_handling(self) -> None:
def handle_interrupt(signal_number: int, stack) -> None:
revert_progress_changes()
if parent_process() is None:
log.info('Scan command interrupted, stopping...')
self.terminate()
reset_terminal()
sys.exit(130)
signal.signal(signal.SIGINT, handle_interrupt)
def invoke(self) -> int:
self._initialize_interrupt_handling()
match_engine = MatchEngine.for_option(self.config.match_engine)
pre_compile = (
self.config.pre_compile
or self.config.pre_compile_generic
)
signatures, database_source = self._get_signatures(
match_engine,
check_precompiled=not (
pre_compile or self.config.compile_local
)
)
match_engine_options = MatchEngineOptions(
signature_set=signatures,
pcre_options=self._get_pcre_options(),
match_all=self.config.match_all,
)
if database_source is None:
database_source = self._get_database_source(
match_engine,
match_engine_options,
force=self.config.re_compile,
generic=self.config.pre_compile_generic
)
match_engine_options.database_source = database_source
if pre_compile:
if match_engine_options.database_source is None:
log.error(
'Signature set pre-compilation is not supported '
'with the current options'
)
return 1
else:
log.info('Signature set successfully pre-compiled')
return 0
report_manager = ScanReportManager(
self.context,
signatures
)
io_manager = report_manager.get_io_manager()
progress = None
if self.config.progress:
global screen_handler
progress = ProgressDisplay(int(self.config.workers))
screen_handler = progress.get_log_handler()
if sys.stderr is None or sys.stderr.isatty():
remove_initial_handler()
log.addHandler(screen_handler)
report_manager.set_progress_display(progress)
paths = set()
for argument in self.config.trailing_arguments:
paths.add(argument)
options = scanning.scanner.Options(
paths=paths,
match_engine_options=match_engine_options,
workers=int(self.config.workers),
chunk_size=self.config.chunk_size,
scanned_content_limit=int(self.config.scanned_content_limit),
file_filter=self._initialize_file_filter(),
allow_io_errors=self.config.allow_io_errors,
debug=self.config.debug,
logging_initializer=self.context.get_log_settings().apply,
match_engine=match_engine,
profile=self.config.profile,
profile_path=self.config.profile_path,
direct_io=self.config.direct_io
)
if io_manager.should_read_stdin():
options.path_source = io_manager.get_input_reader()
with report_manager.open_output_file() as output_file:
report = report_manager.initialize_report(
output_file
)
self.scanner = scanning.scanner.Scanner(options)
if progress:
use_log_events = True
else:
use_log_events = False
metrics, timer = self.scanner.scan(
report.add_result,
progress.handle_update if progress is not None else None,
progress.scan_finished_handler if progress is not None
else None,
use_log_events
)
if progress:
progress.end_on_input()
revert_progress_changes()
if progress.results_message:
print(progress.results_message)
report.metrics = metrics
report.timer = timer
report.complete()
return 0
def terminate(self) -> None:
if hasattr(self, 'scanner') and self.scanner is not None:
self.scanner.terminate()
reset_terminal()
def generate_exception_message(
self,
exception: BaseException
) -> Optional[str]:
if isinstance(exception, ProgressException):
return (
'The current terminal is too small to '
'display progress output with the current scan '
'options'
)
return super().generate_exception_message(exception)
factory = MalwareScanSubcommand