Skip to content

Commit b6d117e

Browse files
committed
Initial commit
0 parents  commit b6d117e

File tree

5 files changed

+341
-0
lines changed

5 files changed

+341
-0
lines changed

req.txt

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
flake8
2+
mypy

scripts/run.sh

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pip install -r req.txt
2+
3+
WORKDIR="src"
4+
MODULE="ProcessController"
5+
6+
cd $WORKDIR && flake8 && mypy .
7+
python -m $MODULE

src/ProcessController.py

+260
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
import os
2+
from queue import Queue
3+
from typing import Optional
4+
5+
import multiprocessing
6+
import threading
7+
8+
from utils import timer, timeout
9+
from functions import Functions
10+
11+
12+
class ProcessController:
13+
def __init__(self):
14+
# Максимальное кол-во одновременно выполняемых заданий
15+
self.max_n_processes = None
16+
# Очередь задач
17+
self.queue = Queue()
18+
# Список процессов
19+
self.processes = {}
20+
# Флаг, который определяет логический этап обработки всей очереди задач
21+
self.running = True
22+
23+
# Мьютекс для очереди задач
24+
self._queue_lock = threading.Lock()
25+
# Переменная состояния для демона _launcher
26+
self._launcher_lock = threading.Condition()
27+
# Переменная состояния для _wait
28+
self._wait_lock = threading.Condition()
29+
# Переменная состояния для демона _waiter
30+
self._waiter_lock = threading.Condition()
31+
32+
# демон _launcher
33+
self._launcher_thread = threading.Thread(target=self._launcher,
34+
args=())
35+
self._launcher_thread.start()
36+
# демон _waiter
37+
self._waiter_thread = threading.Thread(target=self._waiter,
38+
args=())
39+
self._waiter_thread.start()
40+
41+
def __del__(self) -> None:
42+
43+
self.running = False
44+
with self._launcher_lock:
45+
self._launcher_lock.notify()
46+
for k in self.processes.keys():
47+
self.processes[k].kill()
48+
49+
with self._wait_lock:
50+
self._wait_lock.notify()
51+
52+
with self._waiter_lock:
53+
self._waiter_lock.notify()
54+
55+
def _launcher(self) -> None:
56+
"""
57+
Демон для запуска оставшихся в очереди задач, которые нельзя
58+
было запустить из-за ограничения максимального кол-ва
59+
одновременно выполняющихся процесов.
60+
Если очередь задач не пустая и кол-во текущих процесов меньше
61+
максимального кол-ва, то добавляем еще процессы.
62+
Посылает оповещения переменным состояния для функций waiter и wait.
63+
64+
:return: None
65+
"""
66+
67+
while self.running:
68+
with self._launcher_lock:
69+
self._launcher_lock.wait_for(
70+
lambda: (not self.queue.empty() and
71+
len(self.processes) < self.max_n_processes
72+
) or not self.running)
73+
74+
if not self.running:
75+
return
76+
with self._queue_lock:
77+
target = self.queue.get()
78+
p = multiprocessing.Process(target=timeout, args=(target,))
79+
p.start()
80+
self.processes[p.pid] = p
81+
82+
with self._waiter_lock:
83+
self._waiter_lock.notify()
84+
85+
with self._wait_lock:
86+
self._wait_lock.notify()
87+
88+
def _waiter(self) -> None:
89+
"""
90+
Демон, который ожидает, пока процесс завершится
91+
или обработка очереди задач закончится.
92+
Очищает список текущих процессов self.processes
93+
94+
:return: None
95+
"""
96+
while self.running:
97+
with self._waiter_lock:
98+
self._waiter_lock.wait_for(
99+
lambda: (len(self.processes) > 0) or not self.running)
100+
101+
if not self.running:
102+
return
103+
pid, status = os.wait()
104+
self.processes.pop(pid)
105+
with self._launcher_lock:
106+
self._launcher_lock.notify()
107+
with self._wait_lock:
108+
self._wait_lock.notify()
109+
110+
def set_max_proc(self, n: int) -> int:
111+
"""
112+
Метод устанавливает ограничение: максимальное число
113+
одновременно выполняемых заданий не должно превышать n.
114+
При этом обновляет информацию о max_n_processes для условия, которое
115+
использует launcher_lock (condition variable).
116+
117+
:param n: новое значение максимального
118+
кол-ва одновременно выполняемых задач
119+
:return: обновленное значение максимального
120+
кол-ва одновременно выполняемых задач
121+
"""
122+
self.max_n_processes = n
123+
with self._launcher_lock:
124+
self._launcher_lock.notify()
125+
126+
return self.max_n_processes
127+
128+
def start(self,
129+
max_exec_time: Optional[int] = None,
130+
tasks: Optional[list[tuple]] = None) -> None:
131+
"""
132+
Данный метод помещает в очередь все задания из tasks. В случае,
133+
если не достигнуто ограничение на максимальное число одновременно
134+
работающих заданий, метод запускает выполнение заданий из очереди
135+
до тех пор, пока не будет достигнуто это ограничение.
136+
Запуск задания представляет порождение нового процесса, который
137+
выполняет соответствующую функцию с её аргументами. При этом каждый
138+
запущенный процесс для задания из tasks не должен работать дольше
139+
max_exec_time.
140+
141+
:param max_exec_time: максимальное время (в секундах)
142+
работы каждого задания из списка tasks
143+
:param tasks: список заданий, содержащий
144+
информацию о функциях и аргументах функций.
145+
:return: None
146+
"""
147+
if tasks and not max_exec_time:
148+
raise TypeError("start() missing "
149+
"1 required positional argument: 'max_exec_time'")
150+
151+
if not self.max_n_processes:
152+
raise TypeError("max_n_processes must"
153+
" be int and greater than zero")
154+
155+
# Заполнение очереди задач
156+
with self._queue_lock:
157+
if tasks:
158+
for task in tasks:
159+
new_task = dict()
160+
new_task["max_exec_time"] = max_exec_time
161+
new_task["func"] = task[0]
162+
new_task["args"] = task[1]
163+
self.queue.put(new_task)
164+
165+
# Отправка оповещения демону _launcher, который в свою очередь
166+
# будет добавлять новые задачи по мере освобождения списка процессов.
167+
with self._launcher_lock:
168+
self._launcher_lock.notify()
169+
170+
def clean_processes(self) -> None:
171+
self.wait()
172+
self.processes = {}
173+
174+
return
175+
176+
@timer
177+
def wait(self) -> None:
178+
"""
179+
Ожидание выполнения всех задач из очереди задач.
180+
181+
:return: None
182+
"""
183+
with self._wait_lock:
184+
self._wait_lock.wait_for(
185+
lambda: len(self.processes) == 0 and self.queue.empty())
186+
187+
return
188+
189+
def wait_count(self) -> int:
190+
"""
191+
:return: Возвращает число заданий, которые осталось запустить.
192+
"""
193+
return self.queue.qsize()
194+
195+
def alive_count(self) -> int:
196+
"""
197+
:return: Возвращает число выполняемых в данный момент заданий.
198+
"""
199+
return [process.is_alive() for process in self.processes.values()
200+
].count(True)
201+
202+
203+
if __name__ == "__main__":
204+
# Пример выполнения
205+
206+
# Экземпляр класса, содержащий методы, имитирующие бурную деятельность
207+
f = Functions()
208+
209+
print("\n----First list----\n")
210+
211+
# Первый список задач (6 задач)
212+
tasks = [
213+
(f.function0, (1, 1)),
214+
(f.function1, (1, 2, 3)),
215+
(f.function2, (1,)),
216+
(f.function0, (1, 1)),
217+
(f.function1, (1, 2, 3)),
218+
(f.function2, (1,))
219+
]
220+
221+
# Создаем экземпляр ProcessController
222+
controller = ProcessController()
223+
# По умолчанию set_max_proc = None, необходимо установить значение
224+
# max_n_processes. (6 задач одновременно)
225+
controller.set_max_proc(6)
226+
# Добавляем задачи в очередь задач и запускаем процессы.
227+
# Максимальное время выполнения 4 секунды.
228+
# Под условие max_exec_time НЕ попадает функция function0
229+
# (она засыпает на 5 секунд).
230+
# Функция function0 будет завершена досрочно.
231+
# То есть сообщение "End functionN" будет 4 раза
232+
# (function0 выполняется 2 раза).
233+
controller.start(tasks=tasks, max_exec_time=4)
234+
235+
# Ожидание завершения процессов
236+
controller.wait()
237+
238+
print("\n----Second list----\n")
239+
240+
# Второй список задач (5 задач)
241+
tasks = [
242+
(f.function0, (1, 1)),
243+
(f.function1, (1, 2, 3)),
244+
(f.function2, (1,)),
245+
(f.function0, (1, 1)),
246+
(f.function1, (1, 2, 3)),
247+
]
248+
249+
# Устанавливаем максимальное кол-во задач (2 задачи одновременно)
250+
controller.set_max_proc(2)
251+
# Добавляем задачи в очередь задач и запускаем процессы.
252+
# Максимальное время выполнения 6 секунды.
253+
# Под условие max_exec_time попадают ВСЕ функции.
254+
# То есть сообщений "End functionN" будет 5 раз
255+
controller.start(tasks=tasks, max_exec_time=6)
256+
257+
# Ожидание завершения процессов
258+
controller.wait()
259+
260+
controller.__del__()

src/functions.py

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import time
2+
from typing import Union
3+
4+
5+
Number = Union[int, float]
6+
7+
8+
class Functions:
9+
"""
10+
Класс, содержащий методы, имитирующие бурную деятельность
11+
"""
12+
13+
@staticmethod
14+
def function0(f0_arg0: Number, f0_arg1: Number):
15+
print("Start function0")
16+
time.sleep(5)
17+
print("End function0")
18+
return f0_arg0 + f0_arg1
19+
20+
@staticmethod
21+
def function1(f1_arg0: Number, f1_arg1: Number, f1_arg2: Number):
22+
print("Start function1")
23+
time.sleep(1)
24+
print("End function1")
25+
return f1_arg0 + f1_arg1 + f1_arg2
26+
27+
@staticmethod
28+
def function2(arg0: Number):
29+
print("Start function2")
30+
time.sleep(2)
31+
print("End function2")
32+
return arg0

src/utils.py

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import signal
2+
import time
3+
from typing import Callable
4+
5+
6+
def timer(func: Callable):
7+
"""
8+
Декоратор для вычисления времени выполнения произвольной функции.
9+
:param func: функция, время выполнения которой требуется вычислить
10+
"""
11+
def wrapper(*args, **kwargs):
12+
start_time = time.time()
13+
14+
result = func(*args, **kwargs)
15+
16+
end_time = time.time()
17+
execution_time = end_time - start_time
18+
19+
print(f"Execution time {func.__name__}: {execution_time} seconds")
20+
21+
return result
22+
23+
return wrapper
24+
25+
26+
def timeout(*args):
27+
"""
28+
Функция-посредник. Позволяет реализовать таймаут для запускаемых задач.
29+
Таймаут реализован с помощью сигнала.
30+
Из важных особенностей реализации можно выделить, что подобный подход
31+
доступен только в Unix/Unix-like системах.
32+
"""
33+
34+
setting = args[0]
35+
36+
signal.setitimer(signal.ITIMER_REAL, setting.get("max_exec_time"), 0)
37+
38+
func = setting.get("func")
39+
40+
return func(*setting.get("args"))

0 commit comments

Comments
 (0)