-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
76 lines (55 loc) · 1.67 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import queue
from threading import BoundedSemaphore
from typing import NamedTuple
from flask import Flask, request
from flask_restful import Resource, Api
# Init
app = Flask(__name__)
api = Api(app)
# Dynamic ID for tasks
_INDEX = 1
# Max connections for threads
_MAX_CONNECTION = 4
tasks_queue = queue.Queue()
tasks_progress = []
pool = BoundedSemaphore(value=_MAX_CONNECTION)
def get_index():
global _INDEX
with pool:
idx = _INDEX
_INDEX += 1
return idx
class Task(NamedTuple):
id: int
task: str
class Tasks(Resource):
def post(self): # noqa
data = request.get_json()
if 'task' not in data.keys():
return 'Error key task', 400
if not isinstance(data['task'], str):
return 'Error type task', 400
idx = get_index()
task = Task(id=idx, task=data['task'])
tasks_queue.put(task, block=False)
return {'id': idx}, 201
def get(self): # noqa
try:
task = tasks_queue.get(block=False)
with pool:
tasks_progress.append(task)
return {'id': task.id, 'task': task.task}, 200
except queue.Empty:
return 'Task queue is empty', 404
class MarkTaskDone(Resource):
def post(self, task_id: int): # noqa
with pool:
for i, task in enumerate(tasks_progress):
if task.id == task_id:
del tasks_progress[i]
return '', 200
return 'Not found', 404
api.add_resource(Tasks, '/tasks/')
api.add_resource(MarkTaskDone, '/tasks/<int:task_id>/complete')
if __name__ == '__main__':
app.run(debug=True) # pragma: no cover