Coverage for src/qollib/processing/execution.py: 94%

50 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-02 08:34 +0000

1#!/usr/bin/env python3 

2# -*- coding: utf-8 -*- 

3""" 

4provides housekeeping / setup methods to reduce the programming overhead of spawning threads or processes. 

5""" 

6 

7import os 

8import subprocess 

9import time 

10import logging 

11from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor 

12from functools import partial 

13 

14from .thread import Thread 

15 

16 

17log = logging.getLogger() 

18 

19 

20CPU_LIM = round(os.cpu_count() * 0.7) 

21"""70% of CPUs from the current machine (max usage limit)""" 

22 

23 

24def threaded(func, args, workers=10, raise_exception=True): 

25 """ 

26 Calls the given function in multiple threads for the set of given arguments 

27 Note that this does not spawn processes, but threads. Use this for non CPU 

28 CPU dependent tasks, i.e. I/O 

29 Method returns once all calls are done. 

30 

31 ### Params 

32 - func: [Function] the function to call 

33 - args: [Iterable] the 'list' of arguments for each call 

34 - workers: [Integer] the number of concurrent threads to use 

35 - raise_exception: [Bool] Flag if an exception in a thread shall be raised or just logged 

36 

37 ### Returns 

38 Results from all `Threads` as list 

39 """ 

40 if len(args) == 1: 

41 return list(func(arg) for arg in args) 

42 

43 subprocess.Popen.communicate = partial(subprocess.Popen.communicate, timeout=30) 

44 with ThreadPoolExecutor(workers) as ex: 

45 threads = [Thread(func, arg).submit(ex) for arg in args] 

46 return _collect_results(threads, raise_exception) 

47 

48 

49def simultaneous(func, args, workers: int = None, raise_exception: bool = True): 

50 """ 

51 Calls the given function in multiple processes for the set of given arguments 

52 Note that this does spawn processes, not threads. Use this for task that 

53 depend heavily on CPU and can be done in parallel. 

54 Method returns once all calls are done. 

55 

56 ### Params 

57 - func: [Function] the function to call 

58 - args: [Iterable] the 'list' of arguments for each call 

59 - workers: [Integer] the number of concurrent threads to use (Default: NUM_CPUs) 

60 - raise_exception: [Bool] Flag if an exception in a thread shall be raised or just logged 

61 

62 ### Returns 

63 Results from all `Threads` as list 

64 """ 

65 if len(args) == 1: 

66 return list(func(arg) for arg in args) 

67 

68 if workers is None: 

69 workers = CPU_LIM 

70 with ProcessPoolExecutor(workers) as ex: 

71 threads = [Thread(func, arg).submit(ex) for arg in args] 

72 return _collect_results(threads, raise_exception) 

73 

74 

75def _collect_results(threads: list, raise_exception: bool = True) -> list: 

76 """ 

77 Takes a list of threads and busy waits for them to be executed. 

78 

79 ### Params 

80 - threads: [List<Thread>] a list of submitted threads 

81 - raise_exception: [Bool] Flag if an exception in a thread shall be raised or just logged 

82 

83 ### Returns 

84 Results from all `Threads` as list 

85 """ 

86 result = [] 

87 while len(threads) > 0: 

88 for thread in threads: 

89 if not thread.is_submitted(): 

90 log.debug('Removing not submitted thread.') 

91 threads.remove(thread) 

92 if not thread.is_done(): 

93 continue 

94 

95 if thread.exception is not None: 

96 _exception_handling(threads, thread, raise_exception) 

97 else: 

98 result.append(thread.result) 

99 threads.remove(thread) 

100 if len(threads): 

101 time.sleep(0.1) 

102 return result 

103 

104 

105def _exception_handling(threads, thread, raise_exception): 

106 ex = thread.exception 

107 print('') 

108 log.critical("Execution of '%s' caused\n[%s]: %s", 

109 thread.function.__name__, ex.__class__.__name__, ex) 

110 if raise_exception: 

111 # Stop all remaining threads: 

112 for t in threads: 

113 t.cancel() 

114 raise ex