Lewati ke konten utama

Dukungan multiprocessing Python

Batas waktu dan batas memori multiprocessing

Kelas GlasswallProcessManager dirancang untuk mengelola multiprocessing dengan batas waktu dan batas memori yang ditentukan untuk setiap file yang diproses oleh engine Glasswall.

GlasswallProcessManager menggunakan objek Task yang harus dibuat dan ditambahkan ke antrean. Objek Task terdiri dari sebuah fungsi yang akan dipanggil, serta argumen dan keyword argument yang akan diteruskan ke fungsi tersebut.

GlasswallProcessManager menghasilkan daftar objek TaskResult setelah pemrosesan selesai, atau mengembalikan objek TaskResult satu per satu saat masing-masing selesai. Objek TaskResult berisi atribut yang terkait dengan pemrosesan file.

Atribut TaskResult

task: Task
success: bool # True if function did not raise an exception
result: Any # function return value
exception: Union[Exception, None], # the exception raised by the function
exit_code: Union[int, None], # multiprocessing.Process.exitcode, 0 = success
timeout_seconds: Optional[float] # time limit for each process
memory_limit_in_gib: Optional[float] # memory limit for each process, 1 gibibyte = 1024 ** 3 bytes
start_time: float # uses time.time(), current time in seconds since the Epoch
end_time: float # uses time.time(), current time in seconds since the Epoch
elapsed_time: float # end_time - start_time
timed_out: bool # terminated for exceeding the time limit: 'timeout_seconds'
max_memory_used_in_gib: float # the highest recorded memory usage of the process
out_of_memory: bool # terminated for exceeding the memory limit: 'memory_limit_in_gib'

Menghasilkan daftar objek TaskResult setelah pemrosesan selesai

Dalam contoh ini, task dimasukkan ke antrean dan diproses secara paralel hingga jumlah worker maksimum, yang secara default sama dengan jumlah CPU logis dalam sistem.

Setelah semua task dimasukkan ke antrean, pemrosesan dimulai secara otomatis saat keluar dari konteks GlasswallProcessManager. Setelah semua task selesai, atribut daftar process_manager.task_results diisi dengan objek TaskResult yang menampilkan hasil pemrosesan.

Setelah semua task selesai, contoh ini mengiterasi process_manager.task_results dalam loop for dan mencetak setiap objek TaskResult.

import os
import time

import glasswall
from glasswall.multiprocessing import GlasswallProcessManager, Task


INPUT_DIRECTORY = r"C:\gwpw\input"
OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"

glasswall.config.logging.console.setLevel("CRITICAL")
EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
gw_policy = glasswall.content_management.policies.Editor(default="sanitise")


def worker_function(*args, **kwargs):
EDITOR.export_file(*args, **kwargs)


def main():
start_time = time.time()
input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
with GlasswallProcessManager(max_workers=None, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
for input_file in input_files:
relative_path = os.path.relpath(input_file, INPUT_DIRECTORY)
output_file = os.path.join(OUTPUT_DIRECTORY, relative_path) + ".zip"

task = Task(
func=worker_function,
args=tuple(),
kwargs=dict(
input_file=input_file,
output_file=output_file,
content_management_policy=gw_policy,
),
)
process_manager.queue_task(task)

for task_result in process_manager.task_results:
print(task_result)

print(f"Elapsed: {time.time() - start_time} seconds")


if __name__ == "__main__":
main()
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', outp..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507465.3883162, end_time=1710507466.5565898, elapsed_time=1.168273687362671, timed_out=False, max_memory_used_in_gib=0.06385421752929688, out_of_memory=False)
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', outpu..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507466.299694, end_time=1710507467.366209, elapsed_time=1.0665149688720703, timed_out=False, max_memory_used_in_gib=0.06365966796875, out_of_memory=False)
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507465.3763025, end_time=1710507467.7441902, elapsed_time=2.3678877353668213, timed_out=False, max_memory_used_in_gib=0.1662139892578125, out_of_memory=False)
Elapsed: 6.226853370666504 seconds

Menghasilkan objek TaskResult individual saat masing-masing selesai

Contoh ini menggunakan library eksternal tqdm untuk memvisualisasikan progres selama pemrosesan.

Task dimasukkan ke antrean dan diproses secara paralel hingga jumlah worker maksimum, yang secara default sama dengan jumlah CPU logis dalam sistem.

Setelah semua task dimasukkan ke antrean, pemrosesan dimulai di dalam konteks GlasswallProcessManager dengan memanggil metode generator process_manager.as_completed(). Setelah task apa pun selesai, objek TaskResult yang sesuai akan dihasilkan. Ini memungkinkan hasil diakses saat tersedia, alih-alih menunggu semua task selesai. Atribut daftar process_manager.task_results tidak akan diisi.

Saat setiap task selesai, contoh ini mencetak objek TaskResult yang dihasilkan.

import os
import time

from tqdm import tqdm

import glasswall
from glasswall.multiprocessing import GlasswallProcessManager, Task


INPUT_DIRECTORY = r"C:\gwpw\input"
OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"

glasswall.config.logging.console.setLevel("CRITICAL")
EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
gw_policy = glasswall.content_management.policies.Editor(default="sanitise")


def worker_function(*args, **kwargs):
EDITOR.export_file(*args, **kwargs)


def main():
start_time = time.time()
input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
with GlasswallProcessManager(max_workers=None, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
for input_file in tqdm(input_files, desc="Queueing files", miniters=len(input_files) // 10):
relative_path = os.path.relpath(input_file, INPUT_DIRECTORY)
output_file = os.path.join(OUTPUT_DIRECTORY, relative_path) + ".zip"

task = Task(
func=worker_function,
args=tuple(),
kwargs=dict(
input_file=input_file,
output_file=output_file,
content_management_policy=gw_policy,
),
)
process_manager.queue_task(task)

for task_result in tqdm(process_manager.as_completed(), total=len(input_files), desc="Processing tasks", miniters=len(input_files) // 100):
print(task_result)

print(f"Elapsed: {time.time() - start_time} seconds")


if __name__ == "__main__":
main()
Queueing files: 100%|███████████████████████████████████████████████████████████| 3/3 [00:00<00:00, 2993.79it/s]
Processing tasks: 0%| | 0/3 [00:00<?, ?it/s]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', outp..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507493.8832896, end_time=1710507496.1666203, elapsed_time=2.2833306789398193, timed_out=False, max_memory_used_in_gib=0.072662353515625, out_of_memory=False)
Processing tasks: 33%|███████████████████▋ | 1/3 [00:04<00:08, 4.08s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', outpu..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507495.4604173, end_time=1710507497.5040953, elapsed_time=2.043678045272827, timed_out=False, max_memory_used_in_gib=0.06534576416015625, out_of_memory=False)
Processing tasks: 67%|███████████████████████████████████████▎ | 2/3 [00:05<00:02, 2.46s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507495.3951488, end_time=1710507498.389851, elapsed_time=2.9947023391723633, timed_out=False, max_memory_used_in_gib=0.1673431396484375, out_of_memory=False)
Processing tasks: 100%|███████████████████████████████████████████████████████████| 3/3 [00:06<00:00, 2.11s/it]
Elapsed: 6.356592416763306 seconds

Perhatikan bahwa meskipun GlasswallProcessManager dapat menangani data return berukuran besar dari worker_function, menyimpan data ini di memori dapat dengan cepat memenuhi RAM yang tersedia. Jika memungkinkan, disarankan untuk tidak melakukan return dari worker_function, dan sebagai gantinya mengandalkan pemrosesan file ke file. Jika memproses file ke disk tidak diinginkan atau mengembalikan byte file dari worker function diperlukan, kami merekomendasikan langkah-langkah berikut:

  • Batasi max_workers agar setidaknya tersedia 4 GiB memori untuk setiap proses.
  • Gunakan generator as_completed.
  • Pastikan byte file tidak dipertahankan setelah dihasilkan dari as_completed sehingga garbage collector Python akan membebaskan memori setelah byte file tidak lagi direferensikan.

Menghasilkan byte file dalam mode file to memory dan membatasi max_workers

Contoh ini menggunakan library eksternal tqdm untuk memvisualisasikan progres selama pemrosesan.

worker_function telah dimodifikasi untuk mengembalikan hasil dari EDITOR.export_file, yang akan berupa byte file zip ekspor, atau None.

max_workers dibatasi berdasarkan CPU logis dan RAM yang tersedia.

Task diantrekan dan diproses secara paralel hingga jumlah worker yang ditentukan.

Setelah semua task dimasukkan ke antrean, pemrosesan dimulai di dalam konteks GlasswallProcessManager dengan memanggil metode generator process_manager.as_completed(). Setelah task apa pun selesai, objek TaskResult yang sesuai akan dihasilkan. Ini memungkinkan hasil diakses saat tersedia, alih-alih menunggu semua task selesai. Atribut daftar process_manager.task_results tidak akan diisi.

Saat setiap task selesai, contoh ini mencetak objek TaskResult yang dihasilkan, dan jika atribut task_result.result terisi, contoh ini juga mencetak informasi tentang ukuran file zip ekspor.

import os
import time

from tqdm import tqdm

import glasswall
from glasswall.multiprocessing import GlasswallProcessManager, Task
from glasswall.multiprocessing.memory_usage import get_available_memory_gib


INPUT_DIRECTORY = r"C:\gwpw\input"
OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"

glasswall.config.logging.console.setLevel("CRITICAL")
EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
gw_policy = glasswall.content_management.policies.Editor(default="sanitise")


def worker_function(*args, **kwargs):
return EDITOR.export_file(*args, **kwargs)


def main():
start_time = time.time()
available_memory_gib = get_available_memory_gib()
print(f"Available memory: {available_memory_gib} GiB")
# Set max_workers to lowest between cpu_count or available memory // 4 (4gib per process)
cpu_count = os.cpu_count() or 1
max_workers = int(min(cpu_count, available_memory_gib // 4))
print(f"Max workers: {max_workers}")

input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
with GlasswallProcessManager(max_workers=max_workers, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
for input_file in tqdm(input_files, desc="Queueing files", miniters=len(input_files) // 10):
# No output_file specified, export_file will run in file to memory mode
task = Task(
func=worker_function,
args=tuple(),
kwargs=dict(
input_file=input_file,
content_management_policy=gw_policy,
),
)
process_manager.queue_task(task)

for task_result in tqdm(process_manager.as_completed(), total=len(input_files), desc="Processing tasks", miniters=len(input_files) // 100):
print(task_result)
# Do something with export zip file bytes in memory
if task_result.result:
print(f"Export zip file size is: {len(task_result.result)} bytes for input_file: '{task_result.task.kwargs['input_file']}'")
# task_result no longer referenced and is garbage collected here, freeing up memory

print(f"Elapsed: {time.time() - start_time} seconds")


if __name__ == "__main__":
main()
Available memory: 13.020416259765625 GiB
Max workers: 3
Queueing files: 100%|█████████████████████████████████████████████████████████████████████| 3/3 [00:00<?, ?it/s]
Processing tasks: 0%| | 0/3 [00:00<?, ?it/s]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', cont..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xceloX\xc7\x95\x89\xba\xce\x07\x00\x00-\x19\x00\x00\x19\..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509905.3188772, end_time=1710509908.5976403, elapsed_time=3.2787630558013916, timed_out=False, max_memory_used_in_gib=0.06348419189453125, out_of_memory=False)
Export zip file size is: 97553 bytes for input_file: 'C:\gwpw\input\TestFile_11.doc'
Processing tasks: 33%|███████████████████▋ | 1/3 [00:04<00:09, 4.99s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xcdloX\xca@\xc8\x8a\xf0\x1d\x00\x00k\xc6\x00\x00\x19\x00..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509905.1324441, end_time=1710509908.958884, elapsed_time=3.82643985748291, timed_out=False, max_memory_used_in_gib=0.166259765625, out_of_memory=False)
Export zip file size is: 664734 bytes for input_file: 'C:\gwpw\input\PDFWithGifAndJpeg.pdf'
Processing tasks: 67%|███████████████████████████████████████▎ | 2/3 [00:05<00:02, 2.25s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', conte..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xcfloX\xc7\x95\x89\xba\xce\x07\x00\x00-\x19\x00\x00\x19\..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509907.833435, end_time=1710509910.3073368, elapsed_time=2.4739017486572266, timed_out=False, max_memory_used_in_gib=0.0728759765625, out_of_memory=False)
Export zip file size is: 139697 bytes for input_file: 'C:\gwpw\input\TestFile_9.doc'
Processing tasks: 100%|███████████████████████████████████████████████████████████| 3/3 [00:06<00:00, 2.23s/it]
Elapsed: 6.706916809082031 seconds