HEX
Server: LiteSpeed
System: Linux srv1.dhviews.com 5.14.0-570.23.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Jun 24 11:27:16 EDT 2025 x86_64
User: bdedition (1723)
PHP: 7.4.33
Disabled: NONE
Upload Files
File: //proc/self/root/usr/local/lib/python3.9/site-packages/wordfence/cli/malwarescan/reporting.py
import os

from typing import List, IO, Optional, Dict
from email.message import EmailMessage
from email.headerregistry import Address

from wordfence.scanning.scanner import ScanResult
from wordfence.intel.signatures import SignatureSet, Signature
from wordfence.util.html import Tag
from wordfence.util.units import scale_byte_unit
from wordfence.util.encoding import bytes_to_str

from ..reporting import Report, ReportFormat, get_config_options, \
        ReportFormatEnum, ReportColumnEnum, ReportRecord, ReportWriter, \
        ReportManager, ReportColumn, ReportEmail, generate_report_email_html, \
        generate_html_table, \
        REPORT_FORMAT_CSV, REPORT_FORMAT_TSV, REPORT_FORMAT_NULL_DELIMITED, \
        REPORT_FORMAT_LINE_DELIMITED
from ..context import CliContext
from ..email import Mailer
from .progress import ProgressDisplay


class ScanReportColumn(ReportColumnEnum):
    FILENAME = 'filename', lambda record: os.fsdecode(record.result.path)
    SIGNATURE_ID = 'signature_id', lambda record: record.signature.identifier
    SIGNATURE_NAME = 'signature_name', lambda record: record.signature.name
    SIGNATURE_DESCRIPTION = 'signature_description', \
        lambda record: record.signature.description
    MATCHED_TEXT = 'matched_text', \
        lambda record: (
                '' if record.match is None
                else bytes_to_str(record.match)
            )


class HumanReadableWriter(ReportWriter):

    def __init__(self, target: IO, columns: List[ScanReportColumn]):
        super().__init__(target)
        self._columns = columns

    def _get_value(data: List[str], column: str) -> str:
        return

    def _map_data_to_dict(self, data: List[str]) -> dict:
        return {
            column.header: data[index] for index, column
            in enumerate(self._columns)
        }

    def write_row(self, data: List[str]) -> None:
        values = self._map_data_to_dict(data)
        file = None
        signature_id = None
        if 'filename' in values:
            file = values['filename']
        if 'signature_id' in values:
            signature_id = values['signature_id']
        # TODO: Add more custom messages if desired
        if file is not None:
            if signature_id is not None:
                self._target.write(
                        f"File at {file} matched signature {signature_id}"
                    )
            else:
                self._target.write(
                        f"File {file} matched a signature"
                    )
        else:
            self._target.write(
                    "Match found: " + str(values)
                )
        self._target.write("\n")

    def allows_headers(self) -> bool:
        return False


REPORT_FORMAT_HUMAN = ReportFormat(
        'human',
        lambda stream, columns: HumanReadableWriter(stream, columns),
        allows_headers=False
    )


class ScanReportFormat(ReportFormatEnum):
    CSV = REPORT_FORMAT_CSV
    TSV = REPORT_FORMAT_TSV
    NULL_DELIMITED = REPORT_FORMAT_NULL_DELIMITED
    LINE_DELIMITED = REPORT_FORMAT_LINE_DELIMITED
    HUMAN = REPORT_FORMAT_HUMAN


class ScanReportRecord(ReportRecord):

    def __init__(
                self,
                result: ScanResult,
                signature: Signature,
                match: bytes
            ):
        self.result = result
        self.signature = signature
        self.match = match


class ScanReport(Report):

    def __init__(
                self,
                format: ScanReportFormat,
                columns: List[ScanReportColumn],
                signature_set: SignatureSet,
                email_addresses: List[str],
                mailer: Optional[Mailer],
                write_headers: bool = False
            ):
        super().__init__(
                format=format,
                columns=columns,
                email_addresses=email_addresses,
                mailer=mailer,
                write_headers=write_headers
            )
        self.signature_set = signature_set
        self.metrics = None

    def add_result(self, result: ScanResult) -> None:
        records = []
        for signature_id, match in result.matches.items():
            signature = self.signature_set.get_signature(signature_id)
            record = ScanReportRecord(
                    result=result,
                    signature=signature,
                    match=match
                )
            records.append(record)
        self.write_records(records)

    def generate_email(
                self,
                recipient: Address,
                attachments: Dict[str, str],
                hostname: str
            ) -> EmailMessage:

        suspicious_count = self.metrics.get_total_matches()
        total_count = self.metrics.get_total_count()
        byte_unit = scale_byte_unit(self.metrics.get_total_bytes())
        elapsed = round(self.timer.get_elapsed())

        results = {
                'Suspicious Files': suspicious_count,
                'Scanned Files': total_count,
                'Bytes Processed': byte_unit,
                'Elapsed Time': f'{elapsed} second(s)'
            }

        content = Tag('div')

        plain = ('Suspicious files were found by Wordfence CLI during a '
                 'malware scan.\n\n')

        content.append(Tag('p').append(plain))

        for label, value in results.items():
            plain += f'{label}: {value}\n'

        table = generate_html_table(results)
        content.append(table)

        document = generate_report_email_html(
                content,
                'Malware Scan Results',
                hostname
            )

        return ReportEmail(
                recipient=recipient,
                subject=f'Malware Scan Results for {hostname}',
                plain_content=plain,
                html_content=document.to_html()
            )


SCAN_REPORT_CONFIG_OPTIONS = get_config_options(
        ScanReportFormat,
        ScanReportColumn
    )


class ScanReportManager(ReportManager):

    def __init__(
                self,
                context: CliContext,
                signature_set: SignatureSet
            ):
        super().__init__(
                formats=ScanReportFormat,
                columns=ScanReportColumn,
                context=context,
                read_stdin=context.config.read_stdin,
                input_delimiter=context.config.file_list_separator,
                binary_input=True
            )
        self.signature_set = signature_set
        self.progress = None

    def set_progress_display(self, progress: ProgressDisplay) -> None:
        self.progress = progress

    def _instantiate_report(
                self,
                format: ReportFormat,
                columns: List[ReportColumn],
                email_addresses: List[str],
                mailer: Optional[Mailer],
                write_headers: bool
            ) -> ScanReport:
        return ScanReport(
                format,
                columns,
                self.signature_set,
                email_addresses,
                mailer,
                write_headers
            )

    def _get_stdout_target(self) -> IO:
        if self.progress is not None:
            return self.progress.get_output_stream()
        return super()._get_stdout_target()