HEX
Server: LiteSpeed
System: Linux srv1.dhviews.com 5.14.0-570.23.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Jun 24 11:27:16 EDT 2025 x86_64
User: bdedition (1723)
PHP: 7.4.33
Disabled: NONE
Upload Files
File: //proc/self/root/usr/local/lib/python3.9/site-packages/wordfence/cli/vulnscan/reporting.py
import os

from typing import List, Dict, Callable, Any, Optional
from email.message import EmailMessage
from email.headerregistry import Address

from ...intel.vulnerabilities import ScannableSoftware, Vulnerability, \
        Software, ProductionVulnerability
from ...api.intelligence import VulnerabilityFeedVariant
from ...util.terminal import Color, escape, RESET
from ...util.html import Tag
from ...util.versioning import version_to_str
from ..reporting import Report, ReportColumnEnum, ReportFormatEnum, \
        ReportRecord, ReportManager, ReportFormat, ReportColumn, \
        BaseHumanReadableWriter, ReportEmail, get_config_options, \
        generate_report_email_html, generate_html_table, \
        REPORT_FORMAT_CSV, REPORT_FORMAT_TSV, REPORT_FORMAT_NULL_DELIMITED, \
        REPORT_FORMAT_LINE_DELIMITED
from ..context import CliContext
from ..email import Mailer


class VulnScanReportColumn(ReportColumnEnum):
    SOFTWARE_TYPE = 'software_type', lambda record: record.software.type.value
    SLUG = 'slug', lambda record: record.software.slug
    VERSION = 'version', lambda record: version_to_str(record.software.version)
    ID = 'id', \
        lambda record: record.vulnerability.identifier
    TITLE = 'title', lambda record: record.vulnerability.title
    LINK = 'link', lambda record: record.vulnerability.get_wordfence_link()
    DESCRIPTION = 'description', \
        lambda record: record.vulnerability.description, \
        VulnerabilityFeedVariant.PRODUCTION
    CVE = 'cve', lambda record: record.vulnerability.cve, \
        VulnerabilityFeedVariant.PRODUCTION
    CVSS_VECTOR = 'cvss_vector', \
        lambda record: record.vulnerability.cvss.vector, \
        VulnerabilityFeedVariant.PRODUCTION
    CVSS_SCORE = 'cvss_score', \
        lambda record: record.vulnerability.cvss.score, \
        VulnerabilityFeedVariant.PRODUCTION
    CVSS_RATING = 'cvss_rating', \
        lambda record: record.vulnerability.cvss.rating, \
        VulnerabilityFeedVariant.PRODUCTION
    CWE_ID = 'cwe_id', \
        lambda record: record.vulnerability.cwe.identifier, \
        VulnerabilityFeedVariant.PRODUCTION
    CWE_NAME = 'cwe_name', \
        lambda record: record.vulnerability.cwe.name, \
        VulnerabilityFeedVariant.PRODUCTION
    CWE_DESCRIPTION = 'cwe_description', \
        lambda record: record.vulnerability.cwe.description, \
        VulnerabilityFeedVariant.PRODUCTION
    PATCHED = 'patched', \
        lambda record: record.get_matched_software().patched
    REMEDIATION = 'remediation', \
        lambda record: record.get_matched_software().remediation, \
        VulnerabilityFeedVariant.PRODUCTION,
    PUBLISHED = 'published', lambda record: record.vulnerability.published
    UPDATED = 'updated', \
        lambda record: record.vulnerability.updated, \
        VulnerabilityFeedVariant.PRODUCTION
    SCANNED_PATH = 'scanned_path', lambda record: os.fsdecode(
            record.software.scan_path
        )

    def __init__(
                self,
                header: str,
                extractor: Callable[[Any], str],
                feed_variant: Optional[VulnerabilityFeedVariant] = None
            ):
        super().__init__(header, extractor)
        self.feed_variant = feed_variant

    def is_compatible(
                self,
                variant: VulnerabilityFeedVariant
            ) -> bool:
        return self.feed_variant is None or \
                variant == self.feed_variant


class HumanReadableWriter(BaseHumanReadableWriter):

    def get_severity_color(self, severity: str) -> str:
        if severity == 'none' or severity == 'low':
            return escape(color=Color.WHITE, bold=True)
        if severity == 'high' or severity == 'critical':
            return escape(color=Color.RED, bold=True)
        return escape(color=Color.YELLOW, bold=True)

    def format_record(self, record) -> str:
        vuln = record.vulnerability
        sw = record.software
        sw_version = version_to_str(sw.version)
        yellow = escape(color=Color.YELLOW)
        link = vuln.get_wordfence_link()
        blue = escape(color=Color.BLUE)
        white = escape(color=Color.WHITE)
        severity = None
        if isinstance(record.vulnerability, ProductionVulnerability):
            if record.vulnerability.cvss is not None:
                severity = record.vulnerability.cvss.rating
        if severity is None:
            severity_message = ''
        else:
            severity = severity.lower()
            severity_color = self.get_severity_color(severity)
            severity_message = f'{severity_color}{severity}{yellow} severity '
        if vuln.informational:
            bold_white = escape(color=Color.WHITE, bold=True)
            info_message = f'{bold_white}informational{yellow} '
            if len(severity_message) == 0:
                info_message = ' ' + info_message
        else:
            info_message = ''
        return (
            f'{yellow}Found {severity_message}{info_message}vulnerability '
            f'{vuln.title} in {sw.slug}({sw_version})\n'
            f'{white}Details: {blue}{link}{RESET}'
            )


REPORT_FORMAT_HUMAN = ReportFormat(
        'human',
        lambda stream, columns: HumanReadableWriter(stream),
        allows_headers=False,
        allows_column_customization=False
    )


class VulnScanReportFormat(ReportFormatEnum):
    CSV = REPORT_FORMAT_CSV
    TSV = REPORT_FORMAT_TSV
    NULL_DELIMITED = REPORT_FORMAT_NULL_DELIMITED
    LINE_DELIMITED = REPORT_FORMAT_LINE_DELIMITED
    HUMAN = REPORT_FORMAT_HUMAN


class VulnScanReportRecord(ReportRecord):

    def __init__(
                self,
                software: ScannableSoftware,
                vulnerability: Vulnerability
            ):
        self.software = software
        self.vulnerability = vulnerability
        self.matched_software = None

    def get_matched_software(self) -> Software:
        if self.matched_software is None:
            self.matched_software = \
                    self.vulnerability.get_matched_software(self.software)
        return self.matched_software


class VulnScanReport(Report):

    def __init__(
                self,
                format: VulnScanReportFormat,
                columns: List[VulnScanReportColumn],
                email_addresses: List[str],
                mailer: Optional[Mailer],
                write_headers: bool = False
            ):
        super().__init__(
                format=format,
                columns=columns,
                email_addresses=email_addresses,
                mailer=mailer,
                write_headers=write_headers
            )
        self.scanner = None

    def add_result(
                self,
                software: ScannableSoftware,
                vulnerabilities: Dict[str, Vulnerability]
            ) -> None:
        records = []
        for vulnerability in vulnerabilities.values():
            record = VulnScanReportRecord(
                    software,
                    vulnerability
                )
            records.append(record)
        self.write_records(records)

    def generate_email(
                self,
                recipient: Address,
                attachments: Dict[str, str],
                hostname: str
            ) -> EmailMessage:

        unique_count = self.scanner.get_vulnerability_count()
        total_count = self.scanner.get_total_count()

        base_message = 'Vulnerabilities were found by Wordfence CLI during ' \
                       'a scan.'

        plain = f'{base_message}\n\n' \
                f'Unique Vulnerabilities: {unique_count}\n' \
                f'Total Vulnerabilities: {total_count}\n'

        content = Tag('div')

        content.append(Tag('p').append(base_message))

        results = {
                'Unique Vulnerabilities': unique_count,
                'Total Vulnerabilities': total_count
            }
        table = generate_html_table(results)
        content.append(table)

        document = generate_report_email_html(
                content,
                'Vulnerability Scan Results',
                hostname
            )

        return ReportEmail(
                recipient=recipient,
                subject=f'Vulnerability Scan Results for {hostname}',
                plain_content=plain,
                html_content=document.to_html()
            )


VULN_SCAN_REPORT_CONFIG_OPTIONS = get_config_options(
        VulnScanReportFormat,
        VulnScanReportColumn,
        default_format='human'
    )


class VulnScanReportManager(ReportManager):

    def __init__(
                self,
                context: CliContext,
                feed_variant: VulnerabilityFeedVariant
            ):
        super().__init__(
                formats=VulnScanReportFormat,
                columns=VulnScanReportColumn,
                context=context,
                read_stdin=context.config.read_stdin,
                input_delimiter=context.config.path_separator,
                binary_input=True
            )
        self.feed_variant = feed_variant

    def _instantiate_report(
                self,
                format: ReportFormat,
                columns: List[ReportColumn],
                email_addresses: List[str],
                mailer: Optional[Mailer],
                write_headers: bool
            ) -> VulnScanReport:
        for column in columns:
            if not column.is_compatible(self.feed_variant):
                raise ValueError(
                        f'Column {column.header} is not compatible '
                        'with the current feed'
                    )
        return VulnScanReport(
                format,
                columns,
                email_addresses,
                mailer,
                write_headers
            )