Coverage for src/qollib/processing/execution.py: 94%
49 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-06-24 08:41 +0000
« prev ^ index » next coverage.py v7.5.4, created at 2024-06-24 08:41 +0000
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4provides housekeeping / setup methods to reduce the programming overhead of spawning threads or processes.
5"""
7import os
8import subprocess
9import time
10import logging
11from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
12from functools import partial
14from .thread import Thread
17log = logging.getLogger()
20CPU_LIM = round(os.cpu_count() * 0.7)
21"""70% of CPUs from the current machine (max usage limit)"""
24def threaded(func, args, workers=10, raise_exception=True):
25 """
26 Calls the given function in multiple threads for the set of given arguments
27 Note that this does not spawn processes, but threads. Use this for non CPU
28 CPU dependent tasks, i.e. I/O
29 Method returns once all calls are done.
31 ### Params
32 - func: [Function] the function to call
33 - args: [Iterable] the 'list' of arguments for each call
34 - workers: [Integer] the number of concurrent threads to use
35 - raise_exception: [Bool] Flag if an exception in a thread shall be raised or just logged
37 ### Returns
38 Results from all `Threads` as list
39 """
40 if len(args) == 1:
41 return list(func(arg) for arg in args)
43 subprocess.Popen.communicate = partial(subprocess.Popen.communicate, timeout=30)
44 with ThreadPoolExecutor(workers) as ex:
45 threads = [Thread(func, arg).submit(ex) for arg in args]
46 return _collect_results(threads, raise_exception)
49def simultaneous(func, args, workers: int = None, raise_exception: bool = True):
50 """
51 Calls the given function in multiple processes for the set of given arguments
52 Note that this does spawn processes, not threads. Use this for task that
53 depend heavily on CPU and can be done in parallel.
54 Method returns once all calls are done.
56 ### Params
57 - func: [Function] the function to call
58 - args: [Iterable] the 'list' of arguments for each call
59 - workers: [Integer] the number of concurrent threads to use (Default: NUM_CPUs)
60 - raise_exception: [Bool] Flag if an exception in a thread shall be raised or just logged
62 ### Returns
63 Results from all `Threads` as list
64 """
65 if len(args) == 1:
66 return list(func(arg) for arg in args)
68 if workers is None:
69 workers = CPU_LIM
70 with ProcessPoolExecutor(workers) as ex:
71 threads = [Thread(func, arg).submit(ex) for arg in args]
72 return _collect_results(threads, raise_exception)
75def _collect_results(threads: list, raise_exception: bool = True) -> list:
76 """
77 Takes a list of threads and busy waits for them to be executed.
79 ### Params
80 - threads: [List<Thread>] a list of submitted threads
81 - raise_exception: [Bool] Flag if an exception in a thread shall be raised or just logged
83 ### Returns
84 Results from all `Threads` as list
85 """
86 result = []
87 while len(threads) > 0:
88 for thread in threads:
89 if not thread.is_submitted():
90 log.debug('Removing not submitted thread.')
91 threads.remove(thread)
92 if not thread.is_done():
93 continue
95 if thread.exception is not None:
96 _exception_handling(threads, thread, raise_exception)
97 else:
98 result.append(thread.result)
99 threads.remove(thread)
100 if len(threads):
101 time.sleep(0.1)
102 return result
105def _exception_handling(threads, thread, raise_exception):
106 ex = thread.exception
107 print('')
108 log.critical("Execution of '%s' caused\n[%s]: %s",
109 thread.function.__name__, ex.__class__.__name__, ex)
110 if raise_exception:
111 # Stop all remaining threads:
112 for t in threads:
113 t.cancel()
114 raise ex