주요 콘텐츠로 건너뛰기

Python multiprocessing 지원

멀티프로세싱 시간 초과 및 메모리 제한

GlasswallProcessManager 클래스는 Glasswall 엔진이 처리하는 각 파일에 대해 지정된 시간 초과 및 메모리 제한으로 멀티프로세싱을 관리하도록 설계되었습니다.

GlasswallProcessManager는 생성되어 큐에 추가되어야 하는 Task 객체를 사용합니다. Task 객체는 호출될 함수와 해당 함수에 전달될 인수 및 키워드 인수로 구성됩니다.

GlasswallProcessManager는 처리가 완료되면 TaskResult 객체 목록을 생성하거나, 완료되는 대로 개별 TaskResult 객체를 반환합니다. TaskResult 객체에는 파일 처리와 관련된 속성이 포함됩니다.

TaskResult 속성

task: Task
success: bool # True if function did not raise an exception
result: Any # function return value
exception: Union[Exception, None], # the exception raised by the function
exit_code: Union[int, None], # multiprocessing.Process.exitcode, 0 = success
timeout_seconds: Optional[float] # time limit for each process
memory_limit_in_gib: Optional[float] # memory limit for each process, 1 gibibyte = 1024 ** 3 bytes
start_time: float # uses time.time(), current time in seconds since the Epoch
end_time: float # uses time.time(), current time in seconds since the Epoch
elapsed_time: float # end_time - start_time
timed_out: bool # terminated for exceeding the time limit: 'timeout_seconds'
max_memory_used_in_gib: float # the highest recorded memory usage of the process
out_of_memory: bool # terminated for exceeding the memory limit: 'memory_limit_in_gib'

처리가 완료되면 TaskResult 객체 목록 생성

이 예제에서는 작업이 큐에 추가되고 최대 worker 수까지 병렬로 처리되며, 기본값은 시스템의 논리 CPU 수와 같습니다.

모든 작업이 대기열에 추가되면 GlasswallProcessManager 컨텍스트를 종료할 때 처리가 자동으로 시작됩니다. 모든 작업이 완료되면 process_manager.task_results 목록 속성은 처리 결과를 보여주는 TaskResult 객체로 채워집니다.

모든 작업이 완료되면 이 예제는 for 루프에서 process_manager.task_results를 순회하며 각 TaskResult 객체를 출력합니다.

import os
import time

import glasswall
from glasswall.multiprocessing import GlasswallProcessManager, Task


INPUT_DIRECTORY = r"C:\gwpw\input"
OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"

glasswall.config.logging.console.setLevel("CRITICAL")
EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
gw_policy = glasswall.content_management.policies.Editor(default="sanitise")


def worker_function(*args, **kwargs):
EDITOR.export_file(*args, **kwargs)


def main():
start_time = time.time()
input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
with GlasswallProcessManager(max_workers=None, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
for input_file in input_files:
relative_path = os.path.relpath(input_file, INPUT_DIRECTORY)
output_file = os.path.join(OUTPUT_DIRECTORY, relative_path) + ".zip"

task = Task(
func=worker_function,
args=tuple(),
kwargs=dict(
input_file=input_file,
output_file=output_file,
content_management_policy=gw_policy,
),
)
process_manager.queue_task(task)

for task_result in process_manager.task_results:
print(task_result)

print(f"Elapsed: {time.time() - start_time} seconds")


if __name__ == "__main__":
main()
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', outp..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507465.3883162, end_time=1710507466.5565898, elapsed_time=1.168273687362671, timed_out=False, max_memory_used_in_gib=0.06385421752929688, out_of_memory=False)
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', outpu..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507466.299694, end_time=1710507467.366209, elapsed_time=1.0665149688720703, timed_out=False, max_memory_used_in_gib=0.06365966796875, out_of_memory=False)
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507465.3763025, end_time=1710507467.7441902, elapsed_time=2.3678877353668213, timed_out=False, max_memory_used_in_gib=0.1662139892578125, out_of_memory=False)
Elapsed: 6.226853370666504 seconds

완료되는 즉시 개별 TaskResult 객체 yield

이 예제에서는 처리 중 진행 상황을 시각화하기 위해 외부 라이브러리 tqdm을 사용합니다.

작업은 큐에 추가되고 최대 worker 수까지 병렬로 처리되며, 기본값은 시스템의 논리 CPU 수와 같습니다.

모든 작업이 큐에 추가되면, GlasswallProcessManager 컨텍스트 내에서 process_manager.as_completed() 제너레이터 메서드를 호출하여 처리가 시작됩니다. 어떤 작업이든 완료되면 해당하는 TaskResult 객체가 반환됩니다. 이를 통해 모든 작업이 완료될 때까지 기다리지 않고, 결과를 사용 가능해지는 즉시 액세스할 수 있습니다. process_manager.task_results 목록 속성은 채워지지 않습니다.

각 작업이 완료될 때마다 이 예제는 yield된 TaskResult 객체를 출력합니다.

import os
import time

from tqdm import tqdm

import glasswall
from glasswall.multiprocessing import GlasswallProcessManager, Task


INPUT_DIRECTORY = r"C:\gwpw\input"
OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"

glasswall.config.logging.console.setLevel("CRITICAL")
EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
gw_policy = glasswall.content_management.policies.Editor(default="sanitise")


def worker_function(*args, **kwargs):
EDITOR.export_file(*args, **kwargs)


def main():
start_time = time.time()
input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
with GlasswallProcessManager(max_workers=None, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
for input_file in tqdm(input_files, desc="Queueing files", miniters=len(input_files) // 10):
relative_path = os.path.relpath(input_file, INPUT_DIRECTORY)
output_file = os.path.join(OUTPUT_DIRECTORY, relative_path) + ".zip"

task = Task(
func=worker_function,
args=tuple(),
kwargs=dict(
input_file=input_file,
output_file=output_file,
content_management_policy=gw_policy,
),
)
process_manager.queue_task(task)

for task_result in tqdm(process_manager.as_completed(), total=len(input_files), desc="Processing tasks", miniters=len(input_files) // 100):
print(task_result)

print(f"Elapsed: {time.time() - start_time} seconds")


if __name__ == "__main__":
main()
Queueing files: 100%|███████████████████████████████████████████████████████████| 3/3 [00:00<00:00, 2993.79it/s]
Processing tasks: 0%| | 0/3 [00:00<?, ?it/s]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', outp..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507493.8832896, end_time=1710507496.1666203, elapsed_time=2.2833306789398193, timed_out=False, max_memory_used_in_gib=0.072662353515625, out_of_memory=False)
Processing tasks: 33%|███████████████████▋ | 1/3 [00:04<00:08, 4.08s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', outpu..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507495.4604173, end_time=1710507497.5040953, elapsed_time=2.043678045272827, timed_out=False, max_memory_used_in_gib=0.06534576416015625, out_of_memory=False)
Processing tasks: 67%|███████████████████████████████████████▎ | 2/3 [00:05<00:02, 2.46s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507495.3951488, end_time=1710507498.389851, elapsed_time=2.9947023391723633, timed_out=False, max_memory_used_in_gib=0.1673431396484375, out_of_memory=False)
Processing tasks: 100%|███████████████████████████████████████████████████████████| 3/3 [00:06<00:00, 2.11s/it]
Elapsed: 6.356592416763306 seconds

GlasswallProcessManager는 worker_function에서 대량의 데이터를 반환하는 경우도 처리할 수 있지만, 이 데이터를 메모리에 유지하면 사용 가능한 RAM이 빠르게 가득 찰 수 있다는 점에 유의하세요. 가능하다면 worker_function에서 값을 반환하지 말고 대신 file to file 처리에 의존하는 것이 좋습니다. 파일을 디스크로 처리하는 것이 바람직하지 않거나 worker function에서 파일 바이트를 반환해야 하는 경우에는 다음 단계를 권장합니다:

  • 각 프로세스에 최소 4 GiB의 메모리를 사용할 수 있도록 max_workers를 제한합니다.
  • as_completed 생성기를 사용합니다.
  • Python 가비지 컬렉터가 파일 바이트가 더 이상 참조되지 않은 후 메모리를 해제할 수 있도록, as_completed에서 yield된 후에는 파일 바이트가 유지되지 않도록 합니다.

파일-메모리 모드에서 파일 바이트 yield 및 max_workers 제한

이 예제에서는 처리 중 진행 상황을 시각화하기 위해 외부 라이브러리 tqdm을 사용합니다.

EDITOR.export_file의 결과를 반환하도록 worker_function이 수정되었으며, 이 값은 export zip 파일의 바이트이거나 None입니다.

max_workers는 사용 가능한 논리 CPU와 RAM을 기준으로 제한됩니다.

작업은 큐에 추가되며, 지정된 worker 수까지 병렬로 처리됩니다.

모든 작업이 큐에 추가되면, GlasswallProcessManager 컨텍스트 내에서 process_manager.as_completed() 제너레이터 메서드를 호출하여 처리가 시작됩니다. 어떤 작업이든 완료되면 해당하는 TaskResult 객체가 반환됩니다. 이를 통해 모든 작업이 완료될 때까지 기다리지 않고, 결과를 사용 가능해지는 즉시 액세스할 수 있습니다. process_manager.task_results 목록 속성은 채워지지 않습니다.

각 작업이 완료될 때마다 이 예제는 생성된 TaskResult 객체를 출력하며, task_result.result 속성에 값이 있으면 export zip 파일의 파일 크기에 대한 정보도 출력합니다.

import os
import time

from tqdm import tqdm

import glasswall
from glasswall.multiprocessing import GlasswallProcessManager, Task
from glasswall.multiprocessing.memory_usage import get_available_memory_gib


INPUT_DIRECTORY = r"C:\gwpw\input"
OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"

glasswall.config.logging.console.setLevel("CRITICAL")
EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
gw_policy = glasswall.content_management.policies.Editor(default="sanitise")


def worker_function(*args, **kwargs):
return EDITOR.export_file(*args, **kwargs)


def main():
start_time = time.time()
available_memory_gib = get_available_memory_gib()
print(f"Available memory: {available_memory_gib} GiB")
# Set max_workers to lowest between cpu_count or available memory // 4 (4gib per process)
cpu_count = os.cpu_count() or 1
max_workers = int(min(cpu_count, available_memory_gib // 4))
print(f"Max workers: {max_workers}")

input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
with GlasswallProcessManager(max_workers=max_workers, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
for input_file in tqdm(input_files, desc="Queueing files", miniters=len(input_files) // 10):
# No output_file specified, export_file will run in file to memory mode
task = Task(
func=worker_function,
args=tuple(),
kwargs=dict(
input_file=input_file,
content_management_policy=gw_policy,
),
)
process_manager.queue_task(task)

for task_result in tqdm(process_manager.as_completed(), total=len(input_files), desc="Processing tasks", miniters=len(input_files) // 100):
print(task_result)
# Do something with export zip file bytes in memory
if task_result.result:
print(f"Export zip file size is: {len(task_result.result)} bytes for input_file: '{task_result.task.kwargs['input_file']}'")
# task_result no longer referenced and is garbage collected here, freeing up memory

print(f"Elapsed: {time.time() - start_time} seconds")


if __name__ == "__main__":
main()
Available memory: 13.020416259765625 GiB
Max workers: 3
Queueing files: 100%|█████████████████████████████████████████████████████████████████████| 3/3 [00:00<?, ?it/s]
Processing tasks: 0%| | 0/3 [00:00<?, ?it/s]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', cont..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xceloX\xc7\x95\x89\xba\xce\x07\x00\x00-\x19\x00\x00\x19\..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509905.3188772, end_time=1710509908.5976403, elapsed_time=3.2787630558013916, timed_out=False, max_memory_used_in_gib=0.06348419189453125, out_of_memory=False)
Export zip file size is: 97553 bytes for input_file: 'C:\gwpw\input\TestFile_11.doc'
Processing tasks: 33%|███████████████████▋ | 1/3 [00:04<00:09, 4.99s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xcdloX\xca@\xc8\x8a\xf0\x1d\x00\x00k\xc6\x00\x00\x19\x00..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509905.1324441, end_time=1710509908.958884, elapsed_time=3.82643985748291, timed_out=False, max_memory_used_in_gib=0.166259765625, out_of_memory=False)
Export zip file size is: 664734 bytes for input_file: 'C:\gwpw\input\PDFWithGifAndJpeg.pdf'
Processing tasks: 67%|███████████████████████████████████████▎ | 2/3 [00:05<00:02, 2.25s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', conte..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xcfloX\xc7\x95\x89\xba\xce\x07\x00\x00-\x19\x00\x00\x19\..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509907.833435, end_time=1710509910.3073368, elapsed_time=2.4739017486572266, timed_out=False, max_memory_used_in_gib=0.0728759765625, out_of_memory=False)
Export zip file size is: 139697 bytes for input_file: 'C:\gwpw\input\TestFile_9.doc'
Processing tasks: 100%|███████████████████████████████████████████████████████████| 3/3 [00:06<00:00, 2.23s/it]
Elapsed: 6.706916809082031 seconds