การรองรับ multiprocessing ของ Python
การหมดเวลาของ multiprocessing และขีดจำกัดหน่วยความจำ
คลาส GlasswallProcessManager ได้รับการออกแบบมาเพื่อจัดการ multiprocessing โดยกำหนดเวลา timeout และขีดจำกัดหน่วยความจำสำหรับแต่ละไฟล์ที่ประมวลผลโดย Glasswall engine
GlasswallProcessManager ใช้งานอ็อบเจ็กต์ Task ซึ่งต้องถูกสร้างและเพิ่มลงในคิว อ็อบเจ็กต์ Task ประกอบด้วยฟังก์ชันที่จะถูกเรียกใช้ รวมถึงอาร์กิวเมนต์และคีย์เวิร์ดอาร์กิวเมนต์ที่จะถูกส่งไปยังฟังก์ชันนั้น
GlasswallProcessManager สร้างรายการของอ็อบเจ็กต์ TaskResult เมื่อการประมวลผลเสร็จสมบูรณ์ หรือส่งคืนอ็อบเจ็กต์ TaskResult ทีละรายการเมื่อแต่ละรายการเสร็จสิ้น อ็อบเจ็กต์ TaskResult มีแอตทริบิวต์ที่เกี่ยวข้องกับการประมวลผลไฟล์
แอตทริบิวต์ของ TaskResult
task: Task
success: bool # True if function did not raise an exception
result: Any # function return value
exception: Union[Exception, None], # the exception raised by the function
exit_code: Union[int, None], # multiprocessing.Process.exitcode, 0 = success
timeout_seconds: Optional[float] # time limit for each process
memory_limit_in_gib: Optional[float] # memory limit for each process, 1 gibibyte = 1024 ** 3 bytes
start_time: float # uses time.time(), current time in seconds since the Epoch
end_time: float # uses time.time(), current time in seconds since the Epoch
elapsed_time: float # end_time - start_time
timed_out: bool # terminated for exceeding the time limit: 'timeout_seconds'
max_memory_used_in_gib: float # the highest recorded memory usage of the process
out_of_memory: bool # terminated for exceeding the memory limit: 'memory_limit_in_gib'
การสร้างรายการของอ็อบเจ็กต์ TaskResult เมื่อการประมวลผลเสร็จสมบูรณ์
ในตัวอย่างนี้ task จะถูกจัดคิวและประมวลผลแบบขนานจนถึงจำนวน worker สูงสุด ซึ่งค่าเริ่มต้นจะเท่ากับจำนวน logical CPU ในระบบ
หลังจากจัดคิว task ทั้งหมดแล้ว การประมวลผลจะเริ่มโดยอัตโนมัติเมื่อออกจาก context ของ GlasswallProcessManager เมื่อ task ทั้งหมดเสร็จสมบูรณ์แล้ว แอตทริบิวต์รายการ process_manager.task_results จะถูกเติมด้วยอ็อบเจ็กต์ TaskResult ที่แสดงผลลัพธ์การประมวลผล
เมื่อ task ทั้งหมดเสร็จสมบูรณ์แล้ว ตัวอย่างนี้จะวนซ้ำ process_manager.task_results ในลูป for และพิมพ์อ็อบเจ็กต์ TaskResult แต่ละรายการ
import os
import time
import glasswall
from glasswall.multiprocessing import GlasswallProcessManager, Task
INPUT_DIRECTORY = r"C:\gwpw\input"
OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"
glasswall.config.logging.console.setLevel("CRITICAL")
EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
gw_policy = glasswall.content_management.policies.Editor(default="sanitise")
def worker_function(*args, **kwargs):
EDITOR.export_file(*args, **kwargs)
def main():
start_time = time.time()
input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
with GlasswallProcessManager(max_workers=None, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
for input_file in input_files:
relative_path = os.path.relpath(input_file, INPUT_DIRECTORY)
output_file = os.path.join(OUTPUT_DIRECTORY, relative_path) + ".zip"
task = Task(
func=worker_function,
args=tuple(),
kwargs=dict(
input_file=input_file,
output_file=output_file,
content_management_policy=gw_policy,
),
)
process_manager.queue_task(task)
for task_result in process_manager.task_results:
print(task_result)
print(f"Elapsed: {time.time() - start_time} seconds")
if __name__ == "__main__":
main()
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', outp..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507465.3883162, end_time=1710507466.5565898, elapsed_time=1.168273687362671, timed_out=False, max_memory_used_in_gib=0.06385421752929688, out_of_memory=False)
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', outpu..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507466.299694, end_time=1710507467.366209, elapsed_time=1.0665149688720703, timed_out=False, max_memory_used_in_gib=0.06365966796875, out_of_memory=False)
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507465.3763025, end_time=1710507467.7441902, elapsed_time=2.3678877353668213, timed_out=False, max_memory_used_in_gib=0.1662139892578125, out_of_memory=False)
Elapsed: 6.226853370666504 seconds
การส่งคืนอ็อบเจ็กต์ TaskResult ทีละรายการเมื่อแต่ละงานเสร็จสมบูรณ์
ตัวอย่างนี้ใช้ไลบรารีภายนอก tqdm เพื่อแสดงภาพความคืบหน้าระหว่างการประมวลผล
task จะถูกจัดคิวและประมวลผลแบบขนานจนถึงจำนวน worker สูงสุด ซึ่งค่าเริ่มต้นจะเท่ากับจำนวน logical CPU ในระบบ
หลังจากจัดคิว task ทั้งหมดแล้ว การประมวลผลจะเริ่มภายใน context ของ GlasswallProcessManager โดยเรียกใช้เมธอด generator process_manager.as_completed() เมื่อ task ใดเสร็จสมบูรณ์ อ็อบเจ็กต์ TaskResult ที่สอดคล้องกันจะถูกส่งคืนทันที วิธีนี้ทำให้สามารถเข้าถึงผลลัพธ์ได้ทันทีที่พร้อมใช้งาน แทนที่จะต้องรอให้ task ทั้งหมดเสร็จสิ้น แอตทริบิวต์รายการ process_manager.task_results จะไม่ถูกเติมข้อมูล
เมื่อแต่ละ task เสร็จสมบูรณ์ ตัวอย่างนี้จะพิมพ์อ็อบเจ็กต์ TaskResult ที่ถูกส่งคืน
import os
import time
from tqdm import tqdm
import glasswall
from glasswall.multiprocessing import GlasswallProcessManager, Task
INPUT_DIRECTORY = r"C:\gwpw\input"
OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"
glasswall.config.logging.console.setLevel("CRITICAL")
EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
gw_policy = glasswall.content_management.policies.Editor(default="sanitise")
def worker_function(*args, **kwargs):
EDITOR.export_file(*args, **kwargs)
def main():
start_time = time.time()
input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
with GlasswallProcessManager(max_workers=None, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
for input_file in tqdm(input_files, desc="Queueing files", miniters=len(input_files) // 10):
relative_path = os.path.relpath(input_file, INPUT_DIRECTORY)
output_file = os.path.join(OUTPUT_DIRECTORY, relative_path) + ".zip"
task = Task(
func=worker_function,
args=tuple(),
kwargs=dict(
input_file=input_file,
output_file=output_file,
content_management_policy=gw_policy,
),
)
process_manager.queue_task(task)
for task_result in tqdm(process_manager.as_completed(), total=len(input_files), desc="Processing tasks", miniters=len(input_files) // 100):
print(task_result)
print(f"Elapsed: {time.time() - start_time} seconds")
if __name__ == "__main__":
main()
Queueing files: 100%|███████████████████████████████████████████████████████████| 3/3 [00:00<00:00, 2993.79it/s]
Processing tasks: 0%| | 0/3 [00:00<?, ?it/s]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', outp..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507493.8832896, end_time=1710507496.1666203, elapsed_time=2.2833306789398193, timed_out=False, max_memory_used_in_gib=0.072662353515625, out_of_memory=False)
Processing tasks: 33%|███████████████████▋ | 1/3 [00:04<00:08, 4.08s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', outpu..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507495.4604173, end_time=1710507497.5040953, elapsed_time=2.043678045272827, timed_out=False, max_memory_used_in_gib=0.06534576416015625, out_of_memory=False)
Processing tasks: 67%|███████████████████████████████████████▎ | 2/3 [00:05<00:02, 2.46s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507495.3951488, end_time=1710507498.389851, elapsed_time=2.9947023391723633, timed_out=False, max_memory_used_in_gib=0.1673431396484375, out_of_memory=False)
Processing tasks: 100%|███████████████████████████████████████████████████████████| 3/3 [00:06<00:00, 2.11s/it]
Elapsed: 6.356592416763306 seconds
โปรดทราบว่าแม้
GlasswallProcessManagerจะสามารถจัดการกับข้อมูลจำนวนมากที่ส่งกลับจาก worker_function ได้ แต่การเก็บข้อมูลนี้ไว้ในหน่วยความจำอาจทำให้ RAM ที่มีอยู่เต็มอย่างรวดเร็ว หากเป็นไปได้ แนะนำว่าไม่ควรส่งค่ากลับจาก worker_function และให้ใช้การประมวลผลแบบไฟล์ต่อไฟล์แทน หากการประมวลผลไฟล์ลงดิสก์ไม่เป็นที่ต้องการ หรือจำเป็นต้องส่งคืน file bytes จาก worker function เราแนะนำขั้นตอนต่อไปนี้:
- จำกัด
max_workersเพื่อให้มีหน่วยความจำอย่างน้อย 4 GiB สำหรับแต่ละ process- ใช้ generator
as_completed.- ตรวจสอบให้แน่ใจว่าไม่ได้เก็บไบต์ของไฟล์ไว้หลังจากที่ถูก yield จาก
as_completedเพื่อให้ตัวเก็บขยะของ Python คืนหน่วยความจำได้หลังจากไม่มีการอ้างอิงถึงไบต์ของไฟล์นั้นอีกต่อไป
การ yield ไบต์ของไฟล์ในโหมด file to memory และการจำกัด max_workers
ตัวอย่างนี้ใช้ไลบรารีภายนอก tqdm เพื่อแสดงภาพความคืบหน้าระหว่างการประมวลผล
มีการแก้ไข worker_function ให้ส่งคืนผลลัพธ์ของ EDITOR.export_file ซึ่งจะเป็นไบต์ของไฟล์ zip ที่ส่งออก หรือเป็น None
มีการจำกัด max_workers ตามจำนวน CPU เชิงตรรกะและ RAM ที่มีอยู่
งานจะถูกจัดคิวและประมวลผลแบบขนานตามจำนวน workers ที่ระบุ
หลังจากจัดคิว task ทั้งหมดแล้ว การประมวลผลจะเริ่มภายใน context ของ GlasswallProcessManager โดยเรียกใช้เมธอด generator process_manager.as_completed() เมื่อ task ใดเสร็จสมบูรณ์ อ็อบเจ็กต์ TaskResult ที่สอดคล้องกันจะถูกส่งคืนทันที วิธีนี้ทำให้สามารถเข้าถึงผลลัพธ์ได้ทันทีที่พร้อมใช้งาน แทนที่จะต้องรอให้ task ทั้งหมดเสร็จสิ้น แอตทริบิวต์รายการ process_manager.task_results จะไม่ถูกเติมข้อมูล
เมื่อแต่ละงานเสร็จสิ้น ตัวอย่างนี้จะพิมพ์อ็อบเจ็กต์ TaskResult ที่ถูก yield และหากแอตทริบิวต์ task_result.result มีข้อมูล ก็จะพิมพ์ข้อมูลเกี่ยวกับขนาดไฟล์ของไฟล์ zip ที่ส่งออกด้วย
import os
import time
from tqdm import tqdm
import glasswall
from glasswall.multiprocessing import GlasswallProcessManager, Task
from glasswall.multiprocessing.memory_usage import get_available_memory_gib
INPUT_DIRECTORY = r"C:\gwpw\input"
OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"
glasswall.config.logging.console.setLevel("CRITICAL")
EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
gw_policy = glasswall.content_management.policies.Editor(default="sanitise")
def worker_function(*args, **kwargs):
return EDITOR.export_file(*args, **kwargs)
def main():
start_time = time.time()
available_memory_gib = get_available_memory_gib()
print(f"Available memory: {available_memory_gib} GiB")
# Set max_workers to lowest between cpu_count or available memory // 4 (4gib per process)
cpu_count = os.cpu_count() or 1
max_workers = int(min(cpu_count, available_memory_gib // 4))
print(f"Max workers: {max_workers}")
input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
with GlasswallProcessManager(max_workers=max_workers, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
for input_file in tqdm(input_files, desc="Queueing files", miniters=len(input_files) // 10):
# No output_file specified, export_file will run in file to memory mode
task = Task(
func=worker_function,
args=tuple(),
kwargs=dict(
input_file=input_file,
content_management_policy=gw_policy,
),
)
process_manager.queue_task(task)
for task_result in tqdm(process_manager.as_completed(), total=len(input_files), desc="Processing tasks", miniters=len(input_files) // 100):
print(task_result)
# Do something with export zip file bytes in memory
if task_result.result:
print(f"Export zip file size is: {len(task_result.result)} bytes for input_file: '{task_result.task.kwargs['input_file']}'")
# task_result no longer referenced and is garbage collected here, freeing up memory
print(f"Elapsed: {time.time() - start_time} seconds")
if __name__ == "__main__":
main()
Available memory: 13.020416259765625 GiB
Max workers: 3
Queueing files: 100%|█████████████████████████████████████████████████████████████████████| 3/3 [00:00<?, ?it/s]
Processing tasks: 0%| | 0/3 [00:00<?, ?it/s]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', cont..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xceloX\xc7\x95\x89\xba\xce\x07\x00\x00-\x19\x00\x00\x19\..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509905.3188772, end_time=1710509908.5976403, elapsed_time=3.2787630558013916, timed_out=False, max_memory_used_in_gib=0.06348419189453125, out_of_memory=False)
Export zip file size is: 97553 bytes for input_file: 'C:\gwpw\input\TestFile_11.doc'
Processing tasks: 33%|███████████████████▋ | 1/3 [00:04<00:09, 4.99s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xcdloX\xca@\xc8\x8a\xf0\x1d\x00\x00k\xc6\x00\x00\x19\x00..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509905.1324441, end_time=1710509908.958884, elapsed_time=3.82643985748291, timed_out=False, max_memory_used_in_gib=0.166259765625, out_of_memory=False)
Export zip file size is: 664734 bytes for input_file: 'C:\gwpw\input\PDFWithGifAndJpeg.pdf'
Processing tasks: 67%|███████████████████████████████████████▎ | 2/3 [00:05<00:02, 2.25s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', conte..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xcfloX\xc7\x95\x89\xba\xce\x07\x00\x00-\x19\x00\x00\x19\..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509907.833435, end_time=1710509910.3073368, elapsed_time=2.4739017486572266, timed_out=False, max_memory_used_in_gib=0.0728759765625, out_of_memory=False)
Export zip file size is: 139697 bytes for input_file: 'C:\gwpw\input\TestFile_9.doc'
Processing tasks: 100%|███████████████████████████████████████████████████████████| 3/3 [00:06<00:00, 2.23s/it]
Elapsed: 6.706916809082031 seconds