HEX
Server: LiteSpeed
System: Linux srv1.dhviews.com 5.14.0-570.23.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Tue Jun 24 11:27:16 EDT 2025 x86_64
User: bdedition (1723)
PHP: 7.4.33
Disabled: NONE
Upload Files
File: //proc/thread-self/root/usr/local/lib/python3.9/site-packages/wordfence/util/direct_io.py
import os
import mmap
import math

from typing import Optional


class DirectIoBuffer:

    def __init__(self, max_chunk_size: int = mmap.PAGESIZE):
        self.max_chunk_size = max_chunk_size
        self.buffer_size = (
                math.ceil(max_chunk_size / mmap.PAGESIZE) * mmap.PAGESIZE
            )
        self.buffer = mmap.mmap(-1, self.buffer_size)
        self.buffer_view = memoryview(self.buffer)
        self.buffers = [self.buffer]

    def seek(self, position: int = 0) -> list:
        self.buffer.seek(position)

    def read(self, length: int) -> bytes:
        return self.buffer.read(length)


class DirectIoReader:

    def __init__(self, path: str, buffer: DirectIoBuffer):
        self.fd = os.open(path, os.O_RDONLY | os.O_DIRECT)
        self.buffer = buffer
        self.offset = 0

    def read(self, limit: Optional[int] = None) -> bytes:
        read_offset = math.floor(self.offset / mmap.PAGESIZE) * mmap.PAGESIZE
        skip = self.offset % mmap.PAGESIZE
        read_length = os.preadv(self.fd, self.buffer.buffers, read_offset)
        read_length -= skip
        read_length = min(read_length, limit)
        self.offset += read_length
        self.buffer.seek(skip)
        return self.buffer.read(read_length)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        os.close(self.fd)