HEX
Server: LiteSpeed
System: Linux srv1.dhviews.com 5.14.0-570.23.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Jun 24 11:27:16 EDT 2025 x86_64
User: bdedition (1723)
PHP: 7.4.33
Disabled: NONE
Upload Files
File: //proc/self/root/usr/local/lib/python3.9/site-packages/wordfence/cli/malwarescan/progress.py
import curses
import logging
import signal
import os
from typing import List, Optional, Deque
from logging import Handler
from collections import deque, namedtuple
from time import sleep

from wordfence.scanning.scanner import (ScanProgressUpdate, ScanMetrics,
                                        default_scan_finished_handler)
from ..banner.banner import get_welcome_banner
from ...util import timing
from ...util.unicode import filter_control_characters
from ...util.units import scale_byte_unit


class ProgressException(Exception):
    pass


_displays = []

METRIC_BOX_WIDTH = 39
"""
Hard-coded width of metric boxes

The actual width taken up will be the hard-coded value +2 to account for the
left and right borders. Each box on the same row will be separated by the
padding value as well.
"""


def reset_terminal() -> None:
    for display in _displays:
        display.end()


def resize_terminal(signalnum, frame) -> None:
    for display in _displays:
        display.queue_resize()


signal.signal(signal.SIGWINCH, resize_terminal)


Position = namedtuple('Position', ['y', 'x'])


class LayoutProperties:

    def __init__(
                self,
                lines: int,
                current_line: int,
                max_row_width: int
            ):
        self.lines = lines
        self.current_line = current_line
        self.max_row_width = max_row_width


class Box:

    def __init__(
                self,
                parent: Optional[curses.window] = None,
                border: bool = True,
                title: Optional[str] = None
            ):
        self.parent = parent
        self.border = border
        self.title = title
        self.window = None
        self.position = None
        self.last_size = None

    def _initialize_window(self, y: int = 0, x: int = 0) -> None:
        height, width = self.compute_size()
        if self.parent is None:
            self.window = curses.newwin(height, width, y, x)
        else:
            self.window = self.parent.subwin(height, width, y, x)
        self.position = Position(y, x)

    def set_position(self, y: int, x: int) -> None:
        if self.window is None:
            self._initialize_window(y, x)
        else:
            self.resize(1, 1)
            try:
                self.window.erase()
                self.window.mvderwin(y, x)
                self.window.mvwin(y, x)
                self.position = Position(y, x)
            except Exception as e:
                size = os.get_terminal_size()
                raise ValueError(
                        f"error moving window: y: {y}, x: {x}; "
                        f"height: {self.get_height()}; "
                        f"width: {self.get_width()}; "
                        f"lines: {size.lines}; "
                        f"columns: {size.columns}"
                    ) from e
            self.resize()

    def _require_window(self) -> None:
        if self.window is None:
            self._initialize_window()

    def compute_size(self) -> (int, int):
        height = self.get_height()
        width = self.get_width()
        if self.border:
            width += 2
            height += 2
        self.last_size = (height, width)
        return self.last_size

    def resize(
                self,
                lines: Optional[int] = None,
                cols: Optional[int] = None
            ) -> None:
        if self.window is None:
            return
        height, width = self.compute_size()
        if lines is not None:
            height = lines
        if cols is not None:
            width = cols
        self.window.erase()
        try:
            self.window.resize(height, width)
        except Exception:
            pass  # Ignore temporary errors during resizing
        self.update()

    def set_title(self, title: str) -> None:
        self.title = title

    def render(self) -> None:
        self._require_window()
        height, width = self.compute_size()
        if self.border:
            self.window.border()
        if self.title is not None:
            title_length = len(self.title)
            title_offset = 0
            if title_length < width:
                title_offset = int((width - title_length) / 2)
            try:
                self.window.addstr(0, title_offset, self.title)
            except Exception:
                pass  # Ignore temporary errors during resizing
        try:
            self.draw_content()
        except Exception:
            pass  # Ignore temporary errors during resizing

    def get_border_offset(self) -> int:
        return 1 if self.border else 0

    def draw_content(self) -> None:
        pass

    def update(self) -> None:
        self.render()
        self.window.syncup()
        self.window.noutrefresh()

    def resize_for_layout(self, properties: LayoutProperties) -> False:
        return False


class Metric:

    def __init__(self, label: str, value):
        self.label = label
        self.value = str(value)


class MetricBox(Box):

    def __init__(
                self,
                metrics: List[Metric],
                title: Optional[str] = None,
                parent: Optional[curses.window] = None
            ):
        self.metrics = metrics
        super().__init__(parent, title=title)

    def get_width(self) -> int:
        return METRIC_BOX_WIDTH

    def get_height(self) -> int:
        return len(self.metrics)

    def draw_content(self) -> None:
        offset = self.get_border_offset()
        width = self.get_width()
        for index, metric in enumerate(self.metrics):
            line = index + offset
            label = f'{metric.label}:'
            value_string = metric.value.rjust(width - len(label))
            self.window.addstr(line, offset, label + value_string)


class BannerBox(Box):

    def __init__(
                self,
                banner,
                parent: Optional[curses.window] = None
            ):
        self.banner = banner
        super().__init__(parent, border=False)

    def get_width(self):
        return self.banner.column_count

    def get_height(self):
        return self.banner.row_count

    def draw_content(self):
        offset = self.get_border_offset()
        for index, row in enumerate(self.banner.rows):
            self.window.addstr(index + offset, offset, row)


DEFAULT_MAX_MESSAGES = 512


class LogBox(Box):

    def __init__(
                self,
                columns: int,
                lines: int,
                max_messages: int = 0,
                parent: Optional[curses.window] = None
            ):
        self.columns = columns
        self.lines = lines
        self.messages = deque(
                maxlen=self._determine_max_messages(max_messages)
            )
        self.cursor_position = None
        super().__init__(parent, border=True)

    def _determine_max_messages(self, max_messages: int = 0) -> Optional[int]:
        if max_messages < 0:
            return None
        elif max_messages == 0:
            return max(self.lines, DEFAULT_MAX_MESSAGES)
        else:
            return max_messages

    def get_width(self):
        return self.columns

    def get_height(self):
        return self.lines

    def _map_messages_to_lines(self, offset: int) -> Deque[str]:
        lines = deque(maxlen=self.lines)
        remaining_lines = self.lines
        for message in reversed(self.messages):
            if remaining_lines == 0:
                break
            message_lines = []
            while len(message):
                if remaining_lines == 0:
                    break
                line = message[:self.columns]
                message = message[self.columns:]
                message_lines.append(line)
                remaining_lines -= 1
            for line in reversed(message_lines):
                lines.appendleft(line)
        return lines

    def draw_content(self) -> None:
        offset = self.get_border_offset()
        line_number = offset
        last_line_number = line_number
        last_line_length = 0
        for line in self._map_messages_to_lines(offset):
            last_line_number = line_number
            last_line_length = len(line)
            line = line.ljust(self.columns)
            try:
                self.window.addstr(line_number, offset, line)
            except Exception:
                break
            line_number += 1
        self.cursor_offset = Position(last_line_number, last_line_length)

    def add_message(self, message: str) -> None:
        self.messages.append(filter_control_characters(message))
        self.update()

    def get_cursor_position(self) -> Position:
        y = 0
        x = 0
        if self.position is not None:
            y += self.position.y
            x += self.position.x
        if self.cursor_offset is not None:
            y += self.cursor_offset.y
            x += self.cursor_offset.x
        return Position(y, x)

    def resize_for_layout(self, properties: LayoutProperties) -> bool:
        self.columns = properties.max_row_width - 2
        self.lines = properties.lines - properties.current_line - 2
        self.cursor_position = None
        if self.lines < 3:
            raise ProgressException(
                    'Insufficient space available to display log messages'
                )
        return True


class LogBoxHandler(Handler):

    def __init__(self, log_box: LogBox):
        self.log_box = log_box
        Handler.__init__(self)

    def emit(self, record):
        self.log_box.add_message(record.getMessage())
        pass


class LogBoxStream():

    def __init__(self, log_box: LogBox):
        self.log_box = log_box

    def write(self, line):
        self.log_box.add_message(line)


class BoxLayout:

    def __init__(self, lines: int, cols: int, padding: int = 1):
        self.lines = lines
        self.cols = cols
        self.padding = padding
        self.current_line = 0
        self._content = []
        self._unpositioned = []
        self.max_row_width = 0

    def add_box(self, box: Box) -> None:
        self._content.append(box)
        self._unpositioned.append(box)

    def add_break(self) -> None:
        self._content.append(None)
        self._unpositioned.append(None)

    def get_layout_properties(self) -> LayoutProperties:
        return LayoutProperties(
                    lines=self.lines,
                    current_line=self.current_line,
                    max_row_width=self.max_row_width
                )

    def _position_row(self, row: list) -> list:
        positioned = []
        extra = []
        row_width = 0
        unpadded_row_width = 0
        row_height = 0
        for box in row:
            box.resize_for_layout(self.get_layout_properties())
            height, width = box.compute_size()
            required_width = width + self.padding
            if len(positioned) and (
                        len(extra) or
                        row_width + required_width > self.cols
                    ):
                extra.append(box)
            else:
                row_width += required_width
                if row_width > self.cols:
                    raise ProgressException('Insufficient columns available')
                unpadded_row_width += width
                row_height = max(row_height, height)
                positioned.append((box, height, width))
        if self.current_line + row_height > self.lines:
            raise ProgressException('Insufficient lines available')
        box_count = len(positioned)
        padding = int((self.cols - unpadded_row_width) / (box_count + 1))
        padded_width = unpadded_row_width + padding * (box_count + 1)
        x = padding + int((self.cols - padded_width) / 2)
        final_row_width = 0
        previous_padding = 0
        for (box, height, width) in positioned:
            final_row_width += previous_padding
            y = self.current_line + round((row_height - height) / 2)
            box.set_position(y, x)
            x += width + padding
            final_row_width += width
            previous_padding = padding
        self.current_line += row_height + self.padding
        self.max_row_width = max(self.max_row_width, final_row_width)
        return extra

    def position(self) -> None:
        row = []
        items = self._unpositioned
        for item in items:
            if item is None:
                row = self._position_row(row)
            else:
                row.append(item)
        while len(row):
            row = self._position_row(row)
        self._unpositioned = []

    def update_content(self) -> None:
        for item in self._content:
            if item is not None:
                item.update()

    def reset(self) -> None:
        self.current_line = 0
        self.max_row_width = 0
        self._unpositioned = self._content.copy()

    def resize(self, lines: int, cols: int) -> None:
        self.lines = lines
        self.cols = cols
        self.reset()
        self.position()


class ProgressDisplay:

    METRICS_PADDING = 1
    METRICS_COUNT = 5
    MIN_MESSAGE_BOX_HEIGHT = 4

    def __init__(self, worker_count: int):
        _displays.append(self)
        self.worker_count = worker_count
        self.results_message = None
        self.pending_resize = False
        self._setup_curses()

    def _setup_curses(self) -> None:
        self.stdscr = curses.initscr()
        curses.noecho()
        curses.curs_set(0)
        self.terminal_size = os.get_terminal_size()
        self._initialize_content(self.terminal_size)

    def _initialize_content(self, size: os.terminal_size) -> None:
        self.clear()
        self.banner_box = self._initialize_banner()
        self.metric_boxes = self._initialize_metric_boxes()
        self.log_box = self._initialize_log_box()
        self.layout = self._initialize_layout(size)
        self.refresh()

    def clear(self):
        self.stdscr.clear()

    def refresh(self):
        self.stdscr.noutrefresh()
        curses.doupdate()

    def end_on_input(self):
        curses.flushinp()
        self.stdscr.nodelay(True)
        while True:
            key = self.stdscr.getch()
            if key != -1 and key != curses.KEY_RESIZE:
                break
            if self._resize_if_necessary():
                self._move_cursor_to_log_end()
            sleep(0.1)
        self.end()

    def end(self):
        curses.endwin()
        _displays.remove(self)

    def _initialize_banner(self) -> Optional[BannerBox]:
        banner = get_welcome_banner()
        if banner is None:
            return None
        return BannerBox(banner=banner, parent=self.stdscr)

    def _compute_rate(self, value: int, elapsed_time: float) -> int:
        if elapsed_time > 0:
            return int(value / elapsed_time)
        return 0

    def _get_metrics(
                self,
                update: ScanProgressUpdate,
                worker_index: Optional[int] = None
            ) -> List[Metric]:
        file_count = update.metrics.get_int_metric('counts', worker_index)
        byte_count = update.metrics.get_int_metric('bytes', worker_index)
        byte_value = scale_byte_unit(byte_count)
        match_count = update.metrics.get_int_metric('matches', worker_index)
        file_rate = self._compute_rate(file_count, update.elapsed_time)
        byte_rate = self._compute_rate(byte_count, update.elapsed_time)
        byte_rate = scale_byte_unit(byte_rate)
        metrics = [
                Metric('Files Processed', file_count),
                Metric('Bytes Processed', byte_value),
                Metric('Matches Found', match_count),
                Metric('Files / Second', file_rate),
                Metric('Bytes / Second', byte_rate)
            ]
        if len(metrics) > self.METRICS_COUNT:
            raise ValueError("Metrics count is out of sync")
        return metrics

    def _initialize_metric_boxes(self) -> List[MetricBox]:
        default_metrics = ScanMetrics(self.worker_count)
        default_update = ScanProgressUpdate(
                elapsed_time=0,
                metrics=default_metrics
            )
        boxes = []
        for index in range(0, self.worker_count + 1):
            if index == 0:
                worker_index = None
                title = 'Summary'
            else:
                worker_index = index - 1
                title = f'Worker {index}'
            box = MetricBox(
                    self._get_metrics(default_update, worker_index),
                    title=title,
                    parent=self.stdscr
                )
            boxes.append(box)
        return boxes

    def _initialize_log_box(self) -> LogBox:
        log_box = LogBox(
                    # Lines and columns are dynamic
                    columns=10,
                    lines=5,
                    parent=self.stdscr
                )
        return log_box

    def _initialize_layout(self, size: os.terminal_size) -> BoxLayout:
        layout = BoxLayout(size.lines, size.columns, self.METRICS_PADDING)
        if self.banner_box is not None:
            layout.add_box(self.banner_box)
        for index, box in enumerate(self.metric_boxes):
            layout.add_box(box)
            if index == 0:
                layout.add_break()
        layout.add_break()
        layout.add_box(self.log_box)
        layout.position()
        layout.update_content()
        return layout

    def _display_metrics(self, update: ScanProgressUpdate) -> None:
        for index in range(0, self.worker_count + 1):
            box = self.metric_boxes[index]
            worker_index = None if index == 0 else index - 1
            box.metrics = self._get_metrics(update, worker_index)
            box.update()

    def handle_update(self, update: ScanProgressUpdate) -> None:
        self._resize_if_necessary()
        try:
            self._display_metrics(update)
            self.refresh()
        except Exception as e:
            reset_terminal()
            raise ProgressException('Rendering progress update failed') from e

    def queue_resize(self) -> None:
        self.pending_resize = True

    def resize(self) -> None:
        size = os.get_terminal_size()
        smaller = size.columns < self.terminal_size.columns
        self.terminal_size = size
        if smaller:
            self.layout.resize(size.lines, size.columns)
        curses.resizeterm(size.lines, size.columns)
        self.stdscr.erase()
        self.stdscr.refresh()
        self.stdscr.resize(size.lines, size.columns)
        if not smaller:
            self.layout.resize(size.lines, size.columns)
        self.layout.update_content()
        self.refresh()

    def _resize_if_necessary(self) -> bool:
        if not self.pending_resize:
            return False
        try:
            self.resize()
            self.pending_resize = False
            return True
        except Exception as e:
            reset_terminal()
            raise ProgressException(
                    'Failed to adjust progress output to new terminal size'
                ) from e

    def get_log_handler(self) -> logging.Handler:
        return LogBoxHandler(self.log_box)

    def get_output_stream(self) -> LogBoxStream:
        return LogBoxStream(self.log_box)

    def _move_cursor_to_log_end(self) -> None:
        cursor_position = self.log_box.get_cursor_position()
        if cursor_position is not None:
            try:
                self.stdscr.move(cursor_position.y, cursor_position.x + 1)
            except Exception:
                pass

    def scan_finished_handler(
                self, metrics: ScanMetrics,
                timer: timing.Timer
            ) -> None:
        messages = default_scan_finished_handler(metrics, timer)
        self.results_message = messages.results
        self.log_box.add_message('Scan completed! Press any key to exit.')
        self._move_cursor_to_log_end()
        curses.curs_set(1)