HEX
Server: LiteSpeed
System: Linux srv1.dhviews.com 5.14.0-570.23.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Jun 24 11:27:16 EDT 2025 x86_64
User: bdedition (1723)
PHP: 7.4.33
Disabled: NONE
Upload Files
File: //proc/thread-self/root/usr/local/lib/python3.9/site-packages/wordfence/wordpress/remediator.py
import os

from typing import Optional

from ..util.io import iterate_files, resolve_path
from ..logging import log
from ..api import noc1
from ..api.exceptions import ApiException
from .identifier import FileIdentifier, FileType, FileIdentity, \
    KnownFileIdentity, GroupIdentity


class RemediationSource:

    def get_correct_content(
                self,
                identity: KnownFileIdentity
            ) -> Optional[bytes]:
        pass


class Noc1RemediationSource(RemediationSource):

    def __init__(self, client: noc1.Client):
        self.client = client

    def get_correct_content(
                self,
                identity: KnownFileIdentity
            ) -> Optional[bytes]:
        try:
            type = identity.type.value
            path = os.fsdecode(identity.local_path)
            if identity.extension is None:
                return self.client.get_wp_file_content(
                        type,
                        path,
                        identity.site.get_version(),
                    )
            else:
                return self.client.get_wp_file_content(
                        type,
                        path,
                        identity.site.get_version(),
                        identity.extension.get_name(),
                        identity.extension.version
                    )
        except ApiException as e:
            log.warning(
                    f'Unable to fetch correct content for {identity} - {e}'
                )
            return None


class RemediationResult:

    def __init__(
                self,
                path: bytes,
                identity: FileIdentity,
                known: bool = False,
                remediated: bool = False,
                target_path: Optional[bytes] = None
            ):
        self.path = path
        self.identity = identity
        self.known = known
        self.remediated = remediated
        self.target_path = target_path if target_path is not None else path

    def __bool__(self) -> bool:
        return self.remediated


class Remediator:

    def __init__(self, source: RemediationSource):
        self.identifier = FileIdentifier()
        self.source = source
        self.input_count = 0

    def get_correct_content(self, identity: KnownFileIdentity) -> bytes:
        return self.source.get_correct_content(identity)

    def remediate_file(
                self,
                path: bytes,
                target_path: Optional[bytes] = None
            ) -> RemediationResult:
        identity = self.identifier.identify(path)
        result = RemediationResult(path, identity, target_path=target_path)
        if identity.type is FileType.UNKNOWN:
            log.warning(f'Unable to identify {path}')
            return result
        log.debug('Identified ' + os.fsdecode(path) + f' as {identity}')
        correct_content = self.get_correct_content(identity)
        if correct_content is None:
            log.warning(
                    f'Unable to determine correct content for {identity}, '
                    'skipping remediation...'
                )
            return result
        result.known = True
        try:
            log.debug('Overwriting ' + os.fsdecode(path) + '...')
            with open(path, 'wb') as file:
                file.write(correct_content)
            result.remediated = True
        except OSError as error:
            log.error(
                    f'An error occurred while attempting to remediate {path}: '
                    + str(error)
                )
        return result

    def handle_symlink_loop(self, path: str) -> None:
        log.warning(f'Symlink loop detected at {path}')

    def remediate(self, path: bytes) -> RemediationResult:
        self.input_count += 1
        path = resolve_path(path)
        if os.path.isdir(path):
            file_found = False
            for file in iterate_files(
                        path,
                        loop_callback=self.handle_symlink_loop
                    ):
                yield self.remediate_file(resolve_path(file), path)
                file_found = True
            if not file_found:
                yield RemediationResult(
                        path,
                        GroupIdentity(FileType.UNKNOWN, path)
                    )
        else:
            yield self.remediate_file(path)