File: //proc/self/root/usr/local/lib/python3.9/site-packages/wordfence/cli/malwarescan/reporting.py
import os
from typing import List, IO, Optional, Dict
from email.message import EmailMessage
from email.headerregistry import Address
from wordfence.scanning.scanner import ScanResult
from wordfence.intel.signatures import SignatureSet, Signature
from wordfence.util.html import Tag
from wordfence.util.units import scale_byte_unit
from wordfence.util.encoding import bytes_to_str
from ..reporting import Report, ReportFormat, get_config_options, \
ReportFormatEnum, ReportColumnEnum, ReportRecord, ReportWriter, \
ReportManager, ReportColumn, ReportEmail, generate_report_email_html, \
generate_html_table, \
REPORT_FORMAT_CSV, REPORT_FORMAT_TSV, REPORT_FORMAT_NULL_DELIMITED, \
REPORT_FORMAT_LINE_DELIMITED
from ..context import CliContext
from ..email import Mailer
from .progress import ProgressDisplay
class ScanReportColumn(ReportColumnEnum):
FILENAME = 'filename', lambda record: os.fsdecode(record.result.path)
SIGNATURE_ID = 'signature_id', lambda record: record.signature.identifier
SIGNATURE_NAME = 'signature_name', lambda record: record.signature.name
SIGNATURE_DESCRIPTION = 'signature_description', \
lambda record: record.signature.description
MATCHED_TEXT = 'matched_text', \
lambda record: (
'' if record.match is None
else bytes_to_str(record.match)
)
class HumanReadableWriter(ReportWriter):
def __init__(self, target: IO, columns: List[ScanReportColumn]):
super().__init__(target)
self._columns = columns
def _get_value(data: List[str], column: str) -> str:
return
def _map_data_to_dict(self, data: List[str]) -> dict:
return {
column.header: data[index] for index, column
in enumerate(self._columns)
}
def write_row(self, data: List[str]) -> None:
values = self._map_data_to_dict(data)
file = None
signature_id = None
if 'filename' in values:
file = values['filename']
if 'signature_id' in values:
signature_id = values['signature_id']
# TODO: Add more custom messages if desired
if file is not None:
if signature_id is not None:
self._target.write(
f"File at {file} matched signature {signature_id}"
)
else:
self._target.write(
f"File {file} matched a signature"
)
else:
self._target.write(
"Match found: " + str(values)
)
self._target.write("\n")
def allows_headers(self) -> bool:
return False
REPORT_FORMAT_HUMAN = ReportFormat(
'human',
lambda stream, columns: HumanReadableWriter(stream, columns),
allows_headers=False
)
class ScanReportFormat(ReportFormatEnum):
CSV = REPORT_FORMAT_CSV
TSV = REPORT_FORMAT_TSV
NULL_DELIMITED = REPORT_FORMAT_NULL_DELIMITED
LINE_DELIMITED = REPORT_FORMAT_LINE_DELIMITED
HUMAN = REPORT_FORMAT_HUMAN
class ScanReportRecord(ReportRecord):
def __init__(
self,
result: ScanResult,
signature: Signature,
match: bytes
):
self.result = result
self.signature = signature
self.match = match
class ScanReport(Report):
def __init__(
self,
format: ScanReportFormat,
columns: List[ScanReportColumn],
signature_set: SignatureSet,
email_addresses: List[str],
mailer: Optional[Mailer],
write_headers: bool = False
):
super().__init__(
format=format,
columns=columns,
email_addresses=email_addresses,
mailer=mailer,
write_headers=write_headers
)
self.signature_set = signature_set
self.metrics = None
def add_result(self, result: ScanResult) -> None:
records = []
for signature_id, match in result.matches.items():
signature = self.signature_set.get_signature(signature_id)
record = ScanReportRecord(
result=result,
signature=signature,
match=match
)
records.append(record)
self.write_records(records)
def generate_email(
self,
recipient: Address,
attachments: Dict[str, str],
hostname: str
) -> EmailMessage:
suspicious_count = self.metrics.get_total_matches()
total_count = self.metrics.get_total_count()
byte_unit = scale_byte_unit(self.metrics.get_total_bytes())
elapsed = round(self.timer.get_elapsed())
results = {
'Suspicious Files': suspicious_count,
'Scanned Files': total_count,
'Bytes Processed': byte_unit,
'Elapsed Time': f'{elapsed} second(s)'
}
content = Tag('div')
plain = ('Suspicious files were found by Wordfence CLI during a '
'malware scan.\n\n')
content.append(Tag('p').append(plain))
for label, value in results.items():
plain += f'{label}: {value}\n'
table = generate_html_table(results)
content.append(table)
document = generate_report_email_html(
content,
'Malware Scan Results',
hostname
)
return ReportEmail(
recipient=recipient,
subject=f'Malware Scan Results for {hostname}',
plain_content=plain,
html_content=document.to_html()
)
SCAN_REPORT_CONFIG_OPTIONS = get_config_options(
ScanReportFormat,
ScanReportColumn
)
class ScanReportManager(ReportManager):
def __init__(
self,
context: CliContext,
signature_set: SignatureSet
):
super().__init__(
formats=ScanReportFormat,
columns=ScanReportColumn,
context=context,
read_stdin=context.config.read_stdin,
input_delimiter=context.config.file_list_separator,
binary_input=True
)
self.signature_set = signature_set
self.progress = None
def set_progress_display(self, progress: ProgressDisplay) -> None:
self.progress = progress
def _instantiate_report(
self,
format: ReportFormat,
columns: List[ReportColumn],
email_addresses: List[str],
mailer: Optional[Mailer],
write_headers: bool
) -> ScanReport:
return ScanReport(
format,
columns,
self.signature_set,
email_addresses,
mailer,
write_headers
)
def _get_stdout_target(self) -> IO:
if self.progress is not None:
return self.progress.get_output_stream()
return super()._get_stdout_target()