Skip to content

Commit 6c5714c

Browse files
committed
feat: 멀티 프로세싱으로 consume 병렬처리 구현
1 parent 1b6a940 commit 6c5714c

File tree

2 files changed

+28
-20
lines changed

2 files changed

+28
-20
lines changed

src/main.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,29 @@
11
from dotenv import load_dotenv
22
load_dotenv(override=True)
33

4-
import os
4+
from multiprocessing import Process
55

6-
from config.rabbitmq import connect
76
from rabbitmq.consume import consume_message
7+
from config.rabbitmq import create_connection
8+
9+
def worker():
10+
connection, channel = create_connection()
11+
12+
channel.basic_qos(prefetch_count=1)
13+
consume_message(channel)
14+
connection.close()
815

916
def main():
10-
if not os.path.exists("temp"):
11-
os.makedirs("temp")
17+
workers = []
18+
19+
for _ in range(3):
20+
p = Process(target=worker)
21+
22+
p.start()
23+
workers.append(p)
1224

13-
connect()
14-
consume_message()
25+
for p in workers:
26+
p.join()
1527

1628
if __name__ == "__main__":
1729
main()

src/rabbitmq/consume.py

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
from analyze_video import analyze_frame
55
from config.constants import MESSAGES
6-
from config.rabbitmq import get_channel
76
from gcs.generate_signed_url import generate_signed_url
87
from gcs.read import get_video
98
from gcs.write import upload_video
@@ -14,7 +13,7 @@
1413
from create_subtitle import make_stack_ass
1514
from insert_subtitle import insert_subtitle_to_video
1615

17-
def callback(ch, method, properties, body):
16+
def process_message(body):
1817
try:
1918
email, file_name, selected_character = json.loads(body).values()
2019
edited_blob_name = f"edited/{file_name}"
@@ -31,13 +30,7 @@ def callback(ch, method, properties, body):
3130
for i, frame in enumerate(extract_frames(data_bytes)):
3231
pose_data = analyze_frame(frame, side=selected_character)
3332

34-
label_frames(
35-
pose_data=pose_data,
36-
frame_id=i,
37-
sit_punch_frames=sit_punch_frames,
38-
uppercut_frames=uppercut_frames,
39-
hit_down_frames=hit_down_frames
40-
)
33+
label_frames(pose_data, i, sit_punch_frames, uppercut_frames, hit_down_frames)
4134

4235
commands = get_commands(sit_punch_frames, uppercut_frames, hit_down_frames)
4336
except:
@@ -58,22 +51,25 @@ def callback(ch, method, properties, body):
5851
"message": MESSAGES.SUCCESS.ANALYZE,
5952
"url": signed_url
6053
}
54+
55+
return message
56+
6157
except Exception as err:
6258
message = {
6359
"email": email,
6460
"message": str(err),
6561
"url": ""
6662
}
6763

68-
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
64+
return message
6965

70-
publish_message(message=message)
66+
def callback(ch, method, properties, body):
67+
message = process_message(body)
68+
publish_message(message)
7169

7270
ch.basic_ack(delivery_tag=method.delivery_tag)
7371

74-
def consume_message():
75-
channel = get_channel()
76-
72+
def consume_message(channel):
7773
channel.basic_consume(
7874
queue=os.environ["MQ_CONSUME_QUEUE"],
7975
auto_ack=False,

0 commit comments

Comments
 (0)