Chuyển đến nội dung chính

Hỗ trợ multiprocessing trong Python

Thời gian chờ và giới hạn bộ nhớ của multiprocessing

Lớp GlasswallProcessManager được thiết kế để quản lý multiprocessing với thời gian chờ và giới hạn bộ nhớ được chỉ định cho từng tệp đang được xử lý bởi Glasswall engine.

GlasswallProcessManager sử dụng các đối tượng Task, các đối tượng này phải được tạo và thêm vào hàng đợi. Một đối tượng Task bao gồm một hàm sẽ được gọi, cùng với các đối số và đối số từ khóa sẽ được truyền vào hàm đó.

GlasswallProcessManager tạo ra либо một danh sách các đối tượng TaskResult sau khi quá trình xử lý hoàn tất, hoặc trả về từng đối tượng TaskResult riêng lẻ khi chúng hoàn thành. Một đối tượng TaskResult chứa các thuộc tính liên quan đến việc xử lý tệp.

Các thuộc tính của TaskResult

task: Task
success: bool # True if function did not raise an exception
result: Any # function return value
exception: Union[Exception, None], # the exception raised by the function
exit_code: Union[int, None], # multiprocessing.Process.exitcode, 0 = success
timeout_seconds: Optional[float] # time limit for each process
memory_limit_in_gib: Optional[float] # memory limit for each process, 1 gibibyte = 1024 ** 3 bytes
start_time: float # uses time.time(), current time in seconds since the Epoch
end_time: float # uses time.time(), current time in seconds since the Epoch
elapsed_time: float # end_time - start_time
timed_out: bool # terminated for exceeding the time limit: 'timeout_seconds'
max_memory_used_in_gib: float # the highest recorded memory usage of the process
out_of_memory: bool # terminated for exceeding the memory limit: 'memory_limit_in_gib'

Tạo danh sách các đối tượng TaskResult sau khi quá trình xử lý hoàn tất

Trong ví dụ này, các tác vụ được đưa vào hàng đợi và xử lý song song tối đa đến số lượng worker lớn nhất, mặc định bằng số CPU logic trong hệ thống.

Sau khi tất cả tác vụ được đưa vào hàng đợi, quá trình xử lý sẽ tự động bắt đầu khi thoát khỏi ngữ cảnh GlasswallProcessManager. Khi tất cả tác vụ hoàn tất, thuộc tính danh sách process_manager.task_results sẽ được điền bằng các đối tượng TaskResult hiển thị kết quả xử lý.

Khi tất cả tác vụ hoàn tất, ví dụ này lặp qua process_manager.task_results trong một vòng lặp for và in ra từng đối tượng TaskResult.

import os
import time

import glasswall
from glasswall.multiprocessing import GlasswallProcessManager, Task


INPUT_DIRECTORY = r"C:\gwpw\input"
OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"

glasswall.config.logging.console.setLevel("CRITICAL")
EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
gw_policy = glasswall.content_management.policies.Editor(default="sanitise")


def worker_function(*args, **kwargs):
EDITOR.export_file(*args, **kwargs)


def main():
start_time = time.time()
input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
with GlasswallProcessManager(max_workers=None, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
for input_file in input_files:
relative_path = os.path.relpath(input_file, INPUT_DIRECTORY)
output_file = os.path.join(OUTPUT_DIRECTORY, relative_path) + ".zip"

task = Task(
func=worker_function,
args=tuple(),
kwargs=dict(
input_file=input_file,
output_file=output_file,
content_management_policy=gw_policy,
),
)
process_manager.queue_task(task)

for task_result in process_manager.task_results:
print(task_result)

print(f"Elapsed: {time.time() - start_time} seconds")


if __name__ == "__main__":
main()
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', outp..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507465.3883162, end_time=1710507466.5565898, elapsed_time=1.168273687362671, timed_out=False, max_memory_used_in_gib=0.06385421752929688, out_of_memory=False)
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', outpu..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507466.299694, end_time=1710507467.366209, elapsed_time=1.0665149688720703, timed_out=False, max_memory_used_in_gib=0.06365966796875, out_of_memory=False)
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507465.3763025, end_time=1710507467.7441902, elapsed_time=2.3678877353668213, timed_out=False, max_memory_used_in_gib=0.1662139892578125, out_of_memory=False)
Elapsed: 6.226853370666504 seconds

Yield từng đối tượng TaskResult riêng lẻ khi chúng hoàn tất

Ví dụ này sử dụng thư viện bên ngoài tqdm để trực quan hóa tiến độ trong quá trình xử lý.

Các tác vụ được đưa vào hàng đợi và xử lý song song tối đa đến số lượng worker lớn nhất, mặc định bằng số CPU logic trong hệ thống.

Sau khi tất cả tác vụ được đưa vào hàng đợi, quá trình xử lý bắt đầu trong ngữ cảnh GlasswallProcessManager bằng cách gọi phương thức generator process_manager.as_completed(). Khi bất kỳ tác vụ nào hoàn tất, đối tượng TaskResult tương ứng của tác vụ đó sẽ được yield. Điều này cho phép truy cập kết quả ngay khi chúng sẵn sàng, thay vì phải chờ tất cả tác vụ hoàn tất. Thuộc tính danh sách process_manager.task_results sẽ không được điền.

Khi mỗi tác vụ hoàn tất, ví dụ này in ra đối tượng TaskResult được yield.

import os
import time

from tqdm import tqdm

import glasswall
from glasswall.multiprocessing import GlasswallProcessManager, Task


INPUT_DIRECTORY = r"C:\gwpw\input"
OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"

glasswall.config.logging.console.setLevel("CRITICAL")
EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
gw_policy = glasswall.content_management.policies.Editor(default="sanitise")


def worker_function(*args, **kwargs):
EDITOR.export_file(*args, **kwargs)


def main():
start_time = time.time()
input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
with GlasswallProcessManager(max_workers=None, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
for input_file in tqdm(input_files, desc="Queueing files", miniters=len(input_files) // 10):
relative_path = os.path.relpath(input_file, INPUT_DIRECTORY)
output_file = os.path.join(OUTPUT_DIRECTORY, relative_path) + ".zip"

task = Task(
func=worker_function,
args=tuple(),
kwargs=dict(
input_file=input_file,
output_file=output_file,
content_management_policy=gw_policy,
),
)
process_manager.queue_task(task)

for task_result in tqdm(process_manager.as_completed(), total=len(input_files), desc="Processing tasks", miniters=len(input_files) // 100):
print(task_result)

print(f"Elapsed: {time.time() - start_time} seconds")


if __name__ == "__main__":
main()
Queueing files: 100%|███████████████████████████████████████████████████████████| 3/3 [00:00<00:00, 2993.79it/s]
Processing tasks: 0%| | 0/3 [00:00<?, ?it/s]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', outp..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507493.8832896, end_time=1710507496.1666203, elapsed_time=2.2833306789398193, timed_out=False, max_memory_used_in_gib=0.072662353515625, out_of_memory=False)
Processing tasks: 33%|███████████████████▋ | 1/3 [00:04<00:08, 4.08s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', outpu..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507495.4604173, end_time=1710507497.5040953, elapsed_time=2.043678045272827, timed_out=False, max_memory_used_in_gib=0.06534576416015625, out_of_memory=False)
Processing tasks: 67%|███████████████████████████████████████▎ | 2/3 [00:05<00:02, 2.46s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507495.3951488, end_time=1710507498.389851, elapsed_time=2.9947023391723633, timed_out=False, max_memory_used_in_gib=0.1673431396484375, out_of_memory=False)
Processing tasks: 100%|███████████████████████████████████████████████████████████| 3/3 [00:06<00:00, 2.11s/it]
Elapsed: 6.356592416763306 seconds

Lưu ý rằng mặc dù GlasswallProcessManager có thể xử lý lượng dữ liệu trả về lớn từ worker_function, việc giữ dữ liệu này trong bộ nhớ có thể nhanh chóng làm đầy RAM khả dụng. Khi có thể, bạn nên không trả về từ worker_function mà thay vào đó dựa vào xử lý file-to-file. Nếu việc xử lý file ra đĩa là không mong muốn hoặc cần trả về byte của file từ worker function, chúng tôi khuyến nghị các bước sau:

  • Giới hạn max_workers để đảm bảo có ít nhất 4 GiB bộ nhớ khả dụng cho mỗi tiến trình.
  • Sử dụng bộ tạo as_completed.
  • Đảm bảo các byte của tệp không được giữ lại sau khi được yield từ as_completed để bộ gom rác Python giải phóng bộ nhớ sau khi các byte của tệp không còn được tham chiếu nữa.

Yield các byte của tệp ở chế độ file to memory và giới hạn max_workers

Ví dụ này sử dụng thư viện bên ngoài tqdm để trực quan hóa tiến độ trong quá trình xử lý.

worker_function đã được sửa đổi để trả về kết quả của EDITOR.export_file, kết quả này sẽ là byte của tệp zip xuất ra hoặc None.

max_workers được giới hạn dựa trên CPU logic và RAM hiện có.

Các tác vụ được đưa vào hàng đợi và xử lý song song tối đa theo số lượng worker đã chỉ định.

Sau khi tất cả tác vụ được đưa vào hàng đợi, quá trình xử lý bắt đầu trong ngữ cảnh GlasswallProcessManager bằng cách gọi phương thức generator process_manager.as_completed(). Khi bất kỳ tác vụ nào hoàn tất, đối tượng TaskResult tương ứng của tác vụ đó sẽ được yield. Điều này cho phép truy cập kết quả ngay khi chúng sẵn sàng, thay vì phải chờ tất cả tác vụ hoàn tất. Thuộc tính danh sách process_manager.task_results sẽ không được điền.

Khi mỗi tác vụ hoàn tất, ví dụ này in đối tượng TaskResult được yield ra, và nếu thuộc tính task_result.result có dữ liệu, ví dụ cũng in thông tin về kích thước tệp zip xuất ra.

import os
import time

from tqdm import tqdm

import glasswall
from glasswall.multiprocessing import GlasswallProcessManager, Task
from glasswall.multiprocessing.memory_usage import get_available_memory_gib


INPUT_DIRECTORY = r"C:\gwpw\input"
OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"

glasswall.config.logging.console.setLevel("CRITICAL")
EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
gw_policy = glasswall.content_management.policies.Editor(default="sanitise")


def worker_function(*args, **kwargs):
return EDITOR.export_file(*args, **kwargs)


def main():
start_time = time.time()
available_memory_gib = get_available_memory_gib()
print(f"Available memory: {available_memory_gib} GiB")
# Set max_workers to lowest between cpu_count or available memory // 4 (4gib per process)
cpu_count = os.cpu_count() or 1
max_workers = int(min(cpu_count, available_memory_gib // 4))
print(f"Max workers: {max_workers}")

input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
with GlasswallProcessManager(max_workers=max_workers, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
for input_file in tqdm(input_files, desc="Queueing files", miniters=len(input_files) // 10):
# No output_file specified, export_file will run in file to memory mode
task = Task(
func=worker_function,
args=tuple(),
kwargs=dict(
input_file=input_file,
content_management_policy=gw_policy,
),
)
process_manager.queue_task(task)

for task_result in tqdm(process_manager.as_completed(), total=len(input_files), desc="Processing tasks", miniters=len(input_files) // 100):
print(task_result)
# Do something with export zip file bytes in memory
if task_result.result:
print(f"Export zip file size is: {len(task_result.result)} bytes for input_file: '{task_result.task.kwargs['input_file']}'")
# task_result no longer referenced and is garbage collected here, freeing up memory

print(f"Elapsed: {time.time() - start_time} seconds")


if __name__ == "__main__":
main()
Available memory: 13.020416259765625 GiB
Max workers: 3
Queueing files: 100%|█████████████████████████████████████████████████████████████████████| 3/3 [00:00<?, ?it/s]
Processing tasks: 0%| | 0/3 [00:00<?, ?it/s]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', cont..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xceloX\xc7\x95\x89\xba\xce\x07\x00\x00-\x19\x00\x00\x19\..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509905.3188772, end_time=1710509908.5976403, elapsed_time=3.2787630558013916, timed_out=False, max_memory_used_in_gib=0.06348419189453125, out_of_memory=False)
Export zip file size is: 97553 bytes for input_file: 'C:\gwpw\input\TestFile_11.doc'
Processing tasks: 33%|███████████████████▋ | 1/3 [00:04<00:09, 4.99s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xcdloX\xca@\xc8\x8a\xf0\x1d\x00\x00k\xc6\x00\x00\x19\x00..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509905.1324441, end_time=1710509908.958884, elapsed_time=3.82643985748291, timed_out=False, max_memory_used_in_gib=0.166259765625, out_of_memory=False)
Export zip file size is: 664734 bytes for input_file: 'C:\gwpw\input\PDFWithGifAndJpeg.pdf'
Processing tasks: 67%|███████████████████████████████████████▎ | 2/3 [00:05<00:02, 2.25s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', conte..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xcfloX\xc7\x95\x89\xba\xce\x07\x00\x00-\x19\x00\x00\x19\..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509907.833435, end_time=1710509910.3073368, elapsed_time=2.4739017486572266, timed_out=False, max_memory_used_in_gib=0.0728759765625, out_of_memory=False)
Export zip file size is: 139697 bytes for input_file: 'C:\gwpw\input\TestFile_9.doc'
Processing tasks: 100%|███████████████████████████████████████████████████████████| 3/3 [00:06<00:00, 2.23s/it]
Elapsed: 6.706916809082031 seconds