मुख्य सामग्री पर जाएँ

Python multiprocessing समर्थन

Multiprocessing टाइमआउट और मेमोरी सीमाएँ

GlasswallProcessManager class को Glasswall engine द्वारा प्रोसेस की जा रही प्रत्येक फ़ाइल के लिए निर्धारित टाइमआउट और मेमोरी सीमा के साथ multiprocessing को प्रबंधित करने के लिए डिज़ाइन किया गया है।

GlasswallProcessManager Task objects का उपयोग करता है, जिन्हें बनाया जाना चाहिए और queue में जोड़ा जाना चाहिए। एक Task object में एक function होता है जिसे कॉल किया जाएगा, और arguments तथा keyword arguments होते हैं जो उस function को पास किए जाएंगे।

GlasswallProcessManager या तो प्रोसेसिंग पूरी होने के बाद TaskResult objects की एक सूची उत्पन्न करता है, या जैसे-जैसे वे पूरे होते हैं, अलग-अलग TaskResult objects प्रदान करता है। एक TaskResult object में फ़ाइल की प्रोसेसिंग से संबंधित attributes होते हैं।

TaskResult attributes

task: Task
success: bool # True if function did not raise an exception
result: Any # function return value
exception: Union[Exception, None], # the exception raised by the function
exit_code: Union[int, None], # multiprocessing.Process.exitcode, 0 = success
timeout_seconds: Optional[float] # time limit for each process
memory_limit_in_gib: Optional[float] # memory limit for each process, 1 gibibyte = 1024 ** 3 bytes
start_time: float # uses time.time(), current time in seconds since the Epoch
end_time: float # uses time.time(), current time in seconds since the Epoch
elapsed_time: float # end_time - start_time
timed_out: bool # terminated for exceeding the time limit: 'timeout_seconds'
max_memory_used_in_gib: float # the highest recorded memory usage of the process
out_of_memory: bool # terminated for exceeding the memory limit: 'memory_limit_in_gib'

प्रोसेसिंग पूरी होने के बाद TaskResult objects की सूची तैयार करना

इस उदाहरण में tasks को queue किया जाता है और workers की अधिकतम संख्या तक समानांतर रूप से process किया जाता है, जो डिफ़ॉल्ट रूप से system में logical CPUs की संख्या के बराबर होती है।

सभी tasks queue हो जाने के बाद, GlasswallProcessManager context से बाहर निकलते समय processing अपने-आप शुरू हो जाती है। सभी tasks पूरे हो जाने पर, process_manager.task_results list attribute में TaskResult objects भर दिए जाते हैं, जो processing results दिखाते हैं।

सभी tasks पूरे हो जाने पर, यह उदाहरण for loop में process_manager.task_results पर iterate करता है और प्रत्येक TaskResult object को print करता है।

import os
import time

import glasswall
from glasswall.multiprocessing import GlasswallProcessManager, Task


INPUT_DIRECTORY = r"C:\gwpw\input"
OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"

glasswall.config.logging.console.setLevel("CRITICAL")
EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
gw_policy = glasswall.content_management.policies.Editor(default="sanitise")


def worker_function(*args, **kwargs):
EDITOR.export_file(*args, **kwargs)


def main():
start_time = time.time()
input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
with GlasswallProcessManager(max_workers=None, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
for input_file in input_files:
relative_path = os.path.relpath(input_file, INPUT_DIRECTORY)
output_file = os.path.join(OUTPUT_DIRECTORY, relative_path) + ".zip"

task = Task(
func=worker_function,
args=tuple(),
kwargs=dict(
input_file=input_file,
output_file=output_file,
content_management_policy=gw_policy,
),
)
process_manager.queue_task(task)

for task_result in process_manager.task_results:
print(task_result)

print(f"Elapsed: {time.time() - start_time} seconds")


if __name__ == "__main__":
main()
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', outp..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507465.3883162, end_time=1710507466.5565898, elapsed_time=1.168273687362671, timed_out=False, max_memory_used_in_gib=0.06385421752929688, out_of_memory=False)
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', outpu..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507466.299694, end_time=1710507467.366209, elapsed_time=1.0665149688720703, timed_out=False, max_memory_used_in_gib=0.06365966796875, out_of_memory=False)
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507465.3763025, end_time=1710507467.7441902, elapsed_time=2.3678877353668213, timed_out=False, max_memory_used_in_gib=0.1662139892578125, out_of_memory=False)
Elapsed: 6.226853370666504 seconds

पूरा होते ही अलग-अलग TaskResult objects yield करना

यह उदाहरण processing के दौरान प्रगति को visualise करने के लिए external library tqdm का उपयोग करता है।

Tasks को queue किया जाता है और workers की अधिकतम संख्या तक समानांतर रूप से process किया जाता है, जो डिफ़ॉल्ट रूप से system में logical CPUs की संख्या के बराबर होती है।

सभी tasks queue हो जाने के बाद, GlasswallProcessManager context के भीतर process_manager.as_completed() generator method को invoke करके processing शुरू होती है। जैसे ही कोई task पूरा होता है, उसका संबंधित TaskResult object yield किया जाता है। इससे results उपलब्ध होते ही उन तक पहुँचा जा सकता है, बजाय इसके कि सभी tasks के पूरा होने की प्रतीक्षा की जाए। process_manager.task_results list attribute populate नहीं होगा।

जैसे-जैसे प्रत्येक task पूरा होता है, यह उदाहरण yielded TaskResult object को print करता है।

import os
import time

from tqdm import tqdm

import glasswall
from glasswall.multiprocessing import GlasswallProcessManager, Task


INPUT_DIRECTORY = r"C:\gwpw\input"
OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"

glasswall.config.logging.console.setLevel("CRITICAL")
EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
gw_policy = glasswall.content_management.policies.Editor(default="sanitise")


def worker_function(*args, **kwargs):
EDITOR.export_file(*args, **kwargs)


def main():
start_time = time.time()
input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
with GlasswallProcessManager(max_workers=None, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
for input_file in tqdm(input_files, desc="Queueing files", miniters=len(input_files) // 10):
relative_path = os.path.relpath(input_file, INPUT_DIRECTORY)
output_file = os.path.join(OUTPUT_DIRECTORY, relative_path) + ".zip"

task = Task(
func=worker_function,
args=tuple(),
kwargs=dict(
input_file=input_file,
output_file=output_file,
content_management_policy=gw_policy,
),
)
process_manager.queue_task(task)

for task_result in tqdm(process_manager.as_completed(), total=len(input_files), desc="Processing tasks", miniters=len(input_files) // 100):
print(task_result)

print(f"Elapsed: {time.time() - start_time} seconds")


if __name__ == "__main__":
main()
Queueing files: 100%|███████████████████████████████████████████████████████████| 3/3 [00:00<00:00, 2993.79it/s]
Processing tasks: 0%| | 0/3 [00:00<?, ?it/s]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', outp..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507493.8832896, end_time=1710507496.1666203, elapsed_time=2.2833306789398193, timed_out=False, max_memory_used_in_gib=0.072662353515625, out_of_memory=False)
Processing tasks: 33%|███████████████████▋ | 1/3 [00:04<00:08, 4.08s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', outpu..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507495.4604173, end_time=1710507497.5040953, elapsed_time=2.043678045272827, timed_out=False, max_memory_used_in_gib=0.06534576416015625, out_of_memory=False)
Processing tasks: 67%|███████████████████████████████████████▎ | 2/3 [00:05<00:02, 2.46s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=None, exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710507495.3951488, end_time=1710507498.389851, elapsed_time=2.9947023391723633, timed_out=False, max_memory_used_in_gib=0.1673431396484375, out_of_memory=False)
Processing tasks: 100%|███████████████████████████████████████████████████████████| 3/3 [00:06<00:00, 2.11s/it]
Elapsed: 6.356592416763306 seconds

ध्यान दें कि जबकि GlasswallProcessManager worker_function से डेटा की बड़ी वापसी को संभाल सकता है, इस डेटा को मेमोरी में रखने से उपलब्ध RAM जल्दी भर सकती है। जहाँ संभव हो, worker_function से return न करने की सलाह दी जाती है, और इसके बजाय file-to-file processing पर निर्भर रहें। यदि फ़ाइलों को disk पर process करना वांछनीय नहीं है या worker function से file bytes लौटाना आवश्यक है, तो हम निम्नलिखित चरणों की अनुशंसा करते हैं:

  • max_workers को सीमित करें ताकि प्रत्येक process के लिए कम से कम 4 GiB memory उपलब्ध रहे।
  • as_completed generator का उपयोग करें।
  • सुनिश्चित करें कि as_completed से yield किए जाने के बाद file bytes सुरक्षित न रहें, ताकि जब file bytes का अब कोई reference न रहे तो Python garbage collector memory खाली कर सके।

file to memory mode में file bytes को yield करना और max_workers को सीमित करना

यह उदाहरण processing के दौरान प्रगति को visualise करने के लिए external library tqdm का उपयोग करता है।

EDITOR.export_file का परिणाम लौटाने के लिए worker_function को संशोधित किया गया है, जो या तो export zip file के bytes होंगे, या None।

max_workers उपलब्ध logical CPUs और RAM के आधार पर सीमित है।

Tasks को queue में डाला जाता है और workers की निर्दिष्ट संख्या तक समानांतर रूप से process किया जाता है।

सभी tasks queue हो जाने के बाद, GlasswallProcessManager context के भीतर process_manager.as_completed() generator method को invoke करके processing शुरू होती है। जैसे ही कोई task पूरा होता है, उसका संबंधित TaskResult object yield किया जाता है। इससे results उपलब्ध होते ही उन तक पहुँचा जा सकता है, बजाय इसके कि सभी tasks के पूरा होने की प्रतीक्षा की जाए। process_manager.task_results list attribute populate नहीं होगा।

जैसे ही प्रत्येक task पूरा होता है, यह उदाहरण प्राप्त TaskResult object को print करता है, और यदि task_result.result attribute populated है, तो यह export zip file के file size की जानकारी भी print करता है।

import os
import time

from tqdm import tqdm

import glasswall
from glasswall.multiprocessing import GlasswallProcessManager, Task
from glasswall.multiprocessing.memory_usage import get_available_memory_gib


INPUT_DIRECTORY = r"C:\gwpw\input"
OUTPUT_DIRECTORY = r"C:\gwpw\output\editor\multiprocessing"
LIBRARY_DIRECTORY = r"C:\gwpw\libraries\10.0"

glasswall.config.logging.console.setLevel("CRITICAL")
EDITOR = glasswall.Editor(LIBRARY_DIRECTORY)
gw_policy = glasswall.content_management.policies.Editor(default="sanitise")


def worker_function(*args, **kwargs):
return EDITOR.export_file(*args, **kwargs)


def main():
start_time = time.time()
available_memory_gib = get_available_memory_gib()
print(f"Available memory: {available_memory_gib} GiB")
# Set max_workers to lowest between cpu_count or available memory // 4 (4gib per process)
cpu_count = os.cpu_count() or 1
max_workers = int(min(cpu_count, available_memory_gib // 4))
print(f"Max workers: {max_workers}")

input_files = glasswall.utils.list_file_paths(INPUT_DIRECTORY)
with GlasswallProcessManager(max_workers=max_workers, worker_timeout_seconds=5, memory_limit_in_gib=4) as process_manager:
for input_file in tqdm(input_files, desc="Queueing files", miniters=len(input_files) // 10):
# No output_file specified, export_file will run in file to memory mode
task = Task(
func=worker_function,
args=tuple(),
kwargs=dict(
input_file=input_file,
content_management_policy=gw_policy,
),
)
process_manager.queue_task(task)

for task_result in tqdm(process_manager.as_completed(), total=len(input_files), desc="Processing tasks", miniters=len(input_files) // 100):
print(task_result)
# Do something with export zip file bytes in memory
if task_result.result:
print(f"Export zip file size is: {len(task_result.result)} bytes for input_file: '{task_result.task.kwargs['input_file']}'")
# task_result no longer referenced and is garbage collected here, freeing up memory

print(f"Elapsed: {time.time() - start_time} seconds")


if __name__ == "__main__":
main()
Available memory: 13.020416259765625 GiB
Max workers: 3
Queueing files: 100%|█████████████████████████████████████████████████████████████████████| 3/3 [00:00<?, ?it/s]
Processing tasks: 0%| | 0/3 [00:00<?, ?it/s]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_11.doc', cont..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xceloX\xc7\x95\x89\xba\xce\x07\x00\x00-\x19\x00\x00\x19\..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509905.3188772, end_time=1710509908.5976403, elapsed_time=3.2787630558013916, timed_out=False, max_memory_used_in_gib=0.06348419189453125, out_of_memory=False)
Export zip file size is: 97553 bytes for input_file: 'C:\gwpw\input\TestFile_11.doc'
Processing tasks: 33%|███████████████████▋ | 1/3 [00:04<00:09, 4.99s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\PDFWithGifAndJpeg.pdf'..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xcdloX\xca@\xc8\x8a\xf0\x1d\x00\x00k\xc6\x00\x00\x19\x00..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509905.1324441, end_time=1710509908.958884, elapsed_time=3.82643985748291, timed_out=False, max_memory_used_in_gib=0.166259765625, out_of_memory=False)
Export zip file size is: 664734 bytes for input_file: 'C:\gwpw\input\PDFWithGifAndJpeg.pdf'
Processing tasks: 67%|███████████████████████████████████████▎ | 2/3 [00:05<00:02, 2.25s/it]
TaskResult(task=Task(func=worker_function, args=(), kwargs=(input_file='C:\\gwpw\\input\\TestFile_9.doc', conte..., success=True, result=b'PK\x03\x04\x14\x00\x0e\x00\x08\x00\xcfloX\xc7\x95\x89\xba\xce\x07\x00\x00-\x19\x00\x00\x19\..., exception=None, exit_code=0, timeout_seconds=5, memory_limit_in_gib=4, start_time=1710509907.833435, end_time=1710509910.3073368, elapsed_time=2.4739017486572266, timed_out=False, max_memory_used_in_gib=0.0728759765625, out_of_memory=False)
Export zip file size is: 139697 bytes for input_file: 'C:\gwpw\input\TestFile_9.doc'
Processing tasks: 100%|███████████████████████████████████████████████████████████| 3/3 [00:06<00:00, 2.23s/it]
Elapsed: 6.706916809082031 seconds