Langkau ke kandungan utama

Sokongan multiprocessing Python

Had masa tamat multiprocessing dan had memori

Kelas GlasswallProcessManager direka untuk mengurus multiprocessing dengan had masa tamat dan had memori yang ditetapkan bagi setiap fail yang diproses oleh enjin Glasswall.

GlasswallProcessManager menggunakan objek Task yang mesti dicipta dan ditambah ke dalam baris gilir. Objek Task terdiri daripada fungsi yang akan dipanggil, serta argumen dan argumen kata kunci yang akan dihantar kepada fungsi tersebut.

GlasswallProcessManager menghasilkan sama ada senarai objek TaskResult setelah pemprosesan selesai, atau mengembalikan objek TaskResult secara individu apabila ia selesai. Objek TaskResult mengandungi atribut yang berkaitan dengan pemprosesan fail tersebut.

Atribut TaskResult

task: Task
success: bool # True if function did not raise an exception
result: Any # function return value
exception: Union[Exception, None], # the exception raised by the function
exit_code: Union[int, None], # multiprocessing.Process.exitcode, 0 = success
timeout_seconds: Optional[float] # time limit for each process
memory_limit_in_gib: Optional[float] # memory limit for each process, 1 gibibyte = 1024 ** 3 bytes
start_time: float # uses time.time(), current time in seconds since the Epoch
end_time: float # uses time.time(), current time in seconds since the Epoch
elapsed_time: float # end_time - start_time
timed_out: bool # terminated for exceeding the time limit: 'timeout_seconds'
max_memory_used_in_gib: float # the highest recorded memory usage of the process
out_of_memory: bool # terminated for exceeding the memory limit: 'memory_limit_in_gib'

Menghasilkan senarai objek TaskResult setelah pemprosesan selesai

Dalam contoh ini tugasan dibariskan dan diproses secara selari sehingga bilangan maksimum pekerja, yang secara lalai adalah sama dengan bilangan CPU logik dalam sistem.

Selepas semua tugasan dibariskan, pemprosesan bermula secara automatik apabila keluar daripada konteks GlasswallProcessManager. Setelah semua tugasan selesai, atribut senarai process_manager.task_results diisi dengan objek TaskResult yang menunjukkan hasil pemprosesan.

Setelah semua tugasan selesai, contoh ini mengiterasi process_manager.task_results dalam gelung for dan mencetak setiap objek TaskResult.

import os
import time

import glasswall
from glasswall.multiprocessing import GlasswallProcessManager, Task


INPUT_DIRECTORY = r"C:\gwpw\input"
OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"

glasswall.config.logging.console.setLevel("CRITICAL")
EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
gw_policy = glasswall.content_management.policies.Editor(default="sanitise")


def worker_function(*args, **kwargs):
EDITOR.export_file(*args, **kwargs)


def main():
start_time = time.time()
input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
with GlasswallProcessManager(max_workers=None, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
for input_file in input_files:
relative_path = os.path.relpath(input_file, INPUT_DIRECTORY)
output_file = os.path.join(OUTPUT_DIRECTORY, relative_path) + ".zip"

task = Task(
func=worker_function,
args=tuple(),
kwargs=dict(
input_file=input_file,
output_file=output_file,
content_management_policy=gw_policy,
),
)
process_manager.queue_task(task)

for task_result in process_manager.task_results:
print(task_result)

print(f"Elapsed: {time.time() - start_time} seconds")


if __name__ == "__main__":
main()
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', outp..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507465.3883162, end_time=1710507466.5565898, elapsed_time=1.168273687362671, timed_out=False, max_memory_used_in_gib=0.06385421752929688, out_of_memory=False)
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', outpu..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507466.299694, end_time=1710507467.366209, elapsed_time=1.0665149688720703, timed_out=False, max_memory_used_in_gib=0.06365966796875, out_of_memory=False)
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507465.3763025, end_time=1710507467.7441902, elapsed_time=2.3678877353668213, timed_out=False, max_memory_used_in_gib=0.1662139892578125, out_of_memory=False)
Elapsed: 6.226853370666504 seconds

Menghasilkan objek TaskResult individu apabila ia selesai

Contoh ini menggunakan pustaka luaran tqdm untuk memvisualisasikan kemajuan semasa pemprosesan.

Tugas dimasukkan ke dalam baris gilir dan diproses secara selari sehingga bilangan maksimum pekerja, yang secara lalai adalah sama dengan bilangan CPU logik dalam sistem.

Selepas semua tugas dimasukkan ke dalam baris gilir, pemprosesan bermula dalam konteks GlasswallProcessManager dengan memanggil kaedah penjana process_manager.as_completed(). Sebaik sahaja mana-mana tugas selesai, objek TaskResult yang sepadan akan dihasilkan. Ini membolehkan hasil diakses apabila ia tersedia, dan bukannya menunggu semua tugas selesai. Atribut senarai process_manager.task_results tidak akan diisi.

Apabila setiap tugas selesai, contoh ini mencetak objek TaskResult yang dihasilkan.

import os
import time

from tqdm import tqdm

import glasswall
from glasswall.multiprocessing import GlasswallProcessManager, Task


INPUT_DIRECTORY = r"C:\gwpw\input"
OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"

glasswall.config.logging.console.setLevel("CRITICAL")
EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
gw_policy = glasswall.content_management.policies.Editor(default="sanitise")


def worker_function(*args, **kwargs):
EDITOR.export_file(*args, **kwargs)


def main():
start_time = time.time()
input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
with GlasswallProcessManager(max_workers=None, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
for input_file in tqdm(input_files, desc="Queueing files", miniters=len(input_files) // 10):
relative_path = os.path.relpath(input_file, INPUT_DIRECTORY)
output_file = os.path.join(OUTPUT_DIRECTORY, relative_path) + ".zip"

task = Task(
func=worker_function,
args=tuple(),
kwargs=dict(
input_file=input_file,
output_file=output_file,
content_management_policy=gw_policy,
),
)
process_manager.queue_task(task)

for task_result in tqdm(process_manager.as_completed(), total=len(input_files), desc="Processing tasks", miniters=len(input_files) // 100):
print(task_result)

print(f"Elapsed: {time.time() - start_time} seconds")


if __name__ == "__main__":
main()
Queueing files: 100%|███████████████████████████████████████████████████████████| 3/3 [00:00<00:00, 2993.79it/s]
Processing tasks: 0%| | 0/3 [00:00<?, ?it/s]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', outp..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507493.8832896, end_time=1710507496.1666203, elapsed_time=2.2833306789398193, timed_out=False, max_memory_used_in_gib=0.072662353515625, out_of_memory=False)
Processing tasks: 33%|███████████████████▋ | 1/3 [00:04<00:08, 4.08s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', outpu..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507495.4604173, end_time=1710507497.5040953, elapsed_time=2.043678045272827, timed_out=False, max_memory_used_in_gib=0.06534576416015625, out_of_memory=False)
Processing tasks: 67%|███████████████████████████████████████▎ | 2/3 [00:05<00:02, 2.46s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507495.3951488, end_time=1710507498.389851, elapsed_time=2.9947023391723633, timed_out=False, max_memory_used_in_gib=0.1673431396484375, out_of_memory=False)
Processing tasks: 100%|███████████████████████████████████████████████████████████| 3/3 [00:06<00:00, 2.11s/it]
Elapsed: 6.356592416763306 seconds

Perhatikan bahawa walaupun GlasswallProcessManager boleh mengendalikan pemulangan data yang besar daripada worker_function, menyimpan data ini dalam memori boleh dengan cepat memenuhi RAM yang tersedia. Jika boleh, adalah disarankan untuk tidak memulangkan data daripada worker_function, dan sebaliknya bergantung pada pemprosesan fail ke fail. Jika memproses fail ke cakera tidak diingini atau memulangkan bait fail daripada fungsi pekerja diperlukan, kami mengesyorkan langkah-langkah berikut:

  • Hadkan max_workers untuk membolehkan sekurang-kurangnya 4 GiB memori tersedia bagi setiap proses.
  • Gunakan penjana as_completed.
  • Pastikan bait fail tidak dikekalkan selepas dihasilkan daripada as_completed supaya pengutip sampah Python akan membebaskan memori selepas bait fail tidak lagi dirujuk.

Menghasilkan bait fail dalam mod fail ke memori dan mengehadkan max_workers

Contoh ini menggunakan pustaka luaran tqdm untuk memvisualisasikan kemajuan semasa pemprosesan.

worker_function telah diubah suai untuk mengembalikan hasil EDITOR.export_file, yang sama ada bait fail zip eksport, atau None.

max_workers dihadkan berdasarkan CPU logik dan RAM yang tersedia.

Tugas dibariskan dan diproses secara selari sehingga bilangan worker yang ditentukan.

Selepas semua tugas dimasukkan ke dalam baris gilir, pemprosesan bermula dalam konteks GlasswallProcessManager dengan memanggil kaedah penjana process_manager.as_completed(). Sebaik sahaja mana-mana tugas selesai, objek TaskResult yang sepadan akan dihasilkan. Ini membolehkan hasil diakses apabila ia tersedia, dan bukannya menunggu semua tugas selesai. Atribut senarai process_manager.task_results tidak akan diisi.

Apabila setiap tugas selesai, contoh ini mencetak objek TaskResult yang dihasilkan, dan jika atribut task_result.result diisi, ia juga mencetak maklumat tentang saiz fail bagi fail zip eksport.

import os
import time

from tqdm import tqdm

import glasswall
from glasswall.multiprocessing import GlasswallProcessManager, Task
from glasswall.multiprocessing.memory_usage import get_available_memory_gib


INPUT_DIRECTORY = r"C:\gwpw\input"
OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"

glasswall.config.logging.console.setLevel("CRITICAL")
EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
gw_policy = glasswall.content_management.policies.Editor(default="sanitise")


def worker_function(*args, **kwargs):
return EDITOR.export_file(*args, **kwargs)


def main():
start_time = time.time()
available_memory_gib = get_available_memory_gib()
print(f"Available memory: {available_memory_gib} GiB")
# Set max_workers to lowest between cpu_count or available memory // 4 (4gib per process)
cpu_count = os.cpu_count() or 1
max_workers = int(min(cpu_count, available_memory_gib // 4))
print(f"Max workers: {max_workers}")

input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
with GlasswallProcessManager(max_workers=max_workers, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
for input_file in tqdm(input_files, desc="Queueing files", miniters=len(input_files) // 10):
# No output_file specified, export_file will run in file to memory mode
task = Task(
func=worker_function,
args=tuple(),
kwargs=dict(
input_file=input_file,
content_management_policy=gw_policy,
),
)
process_manager.queue_task(task)

for task_result in tqdm(process_manager.as_completed(), total=len(input_files), desc="Processing tasks", miniters=len(input_files) // 100):
print(task_result)
# Do something with export zip file bytes in memory
if task_result.result:
print(f"Export zip file size is: {len(task_result.result)} bytes for input_file: '{task_result.task.kwargs['input_file']}'")
# task_result no longer referenced and is garbage collected here, freeing up memory

print(f"Elapsed: {time.time() - start_time} seconds")


if __name__ == "__main__":
main()
Available memory: 13.020416259765625 GiB
Max workers: 3
Queueing files: 100%|█████████████████████████████████████████████████████████████████████| 3/3 [00:00<?, ?it/s]
Processing tasks: 0%| | 0/3 [00:00<?, ?it/s]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', cont..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xceloX\xc7\x95\x89\xba\xce\x07\x00\x00-\x19\x00\x00\x19\..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509905.3188772, end_time=1710509908.5976403, elapsed_time=3.2787630558013916, timed_out=False, max_memory_used_in_gib=0.06348419189453125, out_of_memory=False)
Export zip file size is: 97553 bytes for input_file: 'C:\gwpw\input\TestFile_11.doc'
Processing tasks: 33%|███████████████████▋ | 1/3 [00:04<00:09, 4.99s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xcdloX\xca@\xc8\x8a\xf0\x1d\x00\x00k\xc6\x00\x00\x19\x00..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509905.1324441, end_time=1710509908.958884, elapsed_time=3.82643985748291, timed_out=False, max_memory_used_in_gib=0.166259765625, out_of_memory=False)
Export zip file size is: 664734 bytes for input_file: 'C:\gwpw\input\PDFWithGifAndJpeg.pdf'
Processing tasks: 67%|███████████████████████████████████████▎ | 2/3 [00:05<00:02, 2.25s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', conte..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xcfloX\xc7\x95\x89\xba\xce\x07\x00\x00-\x19\x00\x00\x19\..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509907.833435, end_time=1710509910.3073368, elapsed_time=2.4739017486572266, timed_out=False, max_memory_used_in_gib=0.0728759765625, out_of_memory=False)
Export zip file size is: 139697 bytes for input_file: 'C:\gwpw\input\TestFile_9.doc'
Processing tasks: 100%|███████████████████████████████████████████████████████████| 3/3 [00:06<00:00, 2.23s/it]
Elapsed: 6.706916809082031 seconds