-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmain.py
173 lines (143 loc) · 6.01 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import argparse
import subprocess
import sys
import os
import signal
import time
import psutil # type: ignore
import atexit
from database import initialize_database
from data_pipeline import update_stock_data, run_analysis_and_execute_trade
# Default Stock Ticker
DEFAULT_TICKER = "AAPL"
def find_process(name):
"""Find a running process by name and return its PID."""
for proc in psutil.process_iter(["pid", "name", "cmdline"]):
try:
if proc.info["cmdline"] and name in " ".join(proc.info["cmdline"]):
return proc
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
return None
def kill_process(process):
"""Kill a process safely."""
if process:
try:
process.terminate()
process.wait(timeout=5)
except psutil.NoSuchProcess:
pass
except psutil.TimeoutExpired:
process.kill()
def stop_all_processes():
"""Stop Kafka Producer and Consumer if they are running."""
producer_proc = find_process("producer.py")
consumer_proc = find_process("consumer.py")
if producer_proc:
print("Stopping Kafka Producer...")
kill_process(producer_proc)
else:
print("No active Producer to stop.")
if consumer_proc:
print("Stopping Kafka Consumer...")
kill_process(consumer_proc)
else:
print("No active Consumer to stop.")
def is_streamlit_running():
"""Check if Streamlit is already running to prevent duplicate processes."""
for proc in psutil.process_iter(["pid", "name", "cmdline"]):
try:
if proc.info["cmdline"] and "streamlit" in " ".join(proc.info["cmdline"]):
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
return False
def main():
parser = argparse.ArgumentParser(description="Trading Dashboard Controller")
parser.add_argument(
"command",
choices=["init", "update", "analyze", "produce", "consume", "stop", "restart", "show"],
help="""Available commands:
init → Initialize the SQLite database
update → Fetch latest stock, macroeconomic data, news, and sentiment
analyze → Run AI-powered analysis and execute a trade
produce → Start Kafka Producer for real-time price streaming
consume → Start Kafka Consumer for real-time visualization
stop → Stop both Producer and Consumer
restart → Restart both Producer and Consumer
show → Launch stock dashboard (via Streamlit)"""
)
parser.add_argument(
"--ticker",
type=str,
help="Stock ticker for analysis, streaming, or dashboard (e.g., AAPL)"
)
args = parser.parse_args()
# Initialize Database
if args.command == "init":
print("Initializing database...")
initialize_database()
print("Database initialized successfully.")
# Update Data
elif args.command == "update":
ticker = args.ticker or DEFAULT_TICKER
print(f"Fetching latest data for {ticker}...")
update_stock_data(ticker)
print(f"Data updated for {ticker}.")
# Run AI Analysis
elif args.command == "analyze":
ticker = args.ticker or DEFAULT_TICKER
print(f"Running AI-powered analysis for {ticker}...")
result = run_analysis_and_execute_trade(ticker)
print(result)
# Start Kafka Producer (Streaming Data)
elif args.command == "produce":
ticker = args.ticker or DEFAULT_TICKER
if find_process("producer.py"):
print("Producer is already running.")
else:
print(f"Starting Kafka Producer for {ticker}...")
subprocess.Popen(["python", "producer.py", ticker], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
print("Kafka Producer started successfully.")
# Start Kafka Consumer (Real-Time Visualization)
elif args.command == "consume":
ticker = args.ticker or DEFAULT_TICKER
if find_process("consumer.py"):
print("Consumer is already running.")
else:
print(f"Starting Kafka Consumer for {ticker}...")
subprocess.Popen(["python", "consumer.py", ticker], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
print("Kafka Consumer started successfully.")
# Stop Kafka Producer & Consumer
elif args.command == "stop":
stop_all_processes()
# Restart Kafka Producer & Consumer
elif args.command == "restart":
print("Restarting Kafka Producer and Consumer...")
stop_all_processes()
time.sleep(2)
ticker = args.ticker or DEFAULT_TICKER
print(f"Restarting Kafka Producer and Consumer for {ticker}...")
subprocess.Popen(["python", "producer.py", ticker], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
subprocess.Popen(["python", "consumer.py", ticker], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
print("Kafka Producer & Consumer restarted successfully.")
# Launch Streamlit Dashboard
elif args.command == "show":
ticker = args.ticker or DEFAULT_TICKER
if is_streamlit_running():
print("Streamlit is already running.")
return
print(f"Launching stock dashboard for {ticker}...")
try:
process = subprocess.Popen(["streamlit", "run", "stock_dashboard.py", "--", ticker])
# Gracefully handle Ctrl+C
try:
process.wait()
except KeyboardInterrupt:
print("\nStock dashboard interrupted...")
process.terminate()
sys.exit(0)
except FileNotFoundError:
print("Error: Streamlit is not installed or stock_dashboard.py is missing.")
if __name__ == "__main__":
main()