forked from shawncavasos23/Data-Engineering-Project
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreddit_analysis.py
65 lines (55 loc) · 2.31 KB
/
reddit_analysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import praw # type: ignore
import datetime
import sqlite3
import pandas as pd # type: ignore
from database import create_connection
# Reddit API credentials
REDDIT_CLIENT_ID = "iGbUVH-wZqqHRysT7wIEfg"
REDDIT_CLIENT_SECRET = "iHq4HqhFESF3WiyLV6mRvCdNdKR_6Q"
REDDIT_USER_AGENT = "RefrigeratorFew6940:WSB-Tracker:v1.0"
# 🔹 Connect to Reddit API
reddit = praw.Reddit(
client_id=REDDIT_CLIENT_ID,
client_secret=REDDIT_CLIENT_SECRET,
user_agent=REDDIT_USER_AGENT
)
def get_recent_ticker_mentions(ticker):
"""Fetch recent mentions of a stock ticker from Reddit's WallStreetBets."""
print(f"Fetching Reddit mentions for {ticker}...")
subreddit = reddit.subreddit("wallstreetbets")
mentions = []
one_year_ago = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=365)
try:
for post in subreddit.search(f"${ticker}", limit=500):
post_time = datetime.datetime.fromtimestamp(post.created_utc, datetime.timezone.utc)
if post_time >= one_year_ago:
mentions.append(
(ticker, post.title, post.score, post.upvote_ratio, post_time.strftime("%Y-%m-%d"), "https://www.reddit.com" + post.permalink)
)
except Exception as e:
print(f"⚠ Error fetching Reddit mentions for {ticker}: {e}")
return []
return mentions
def store_reddit_mentions(ticker):
"""Fetch and store Reddit mentions in the database."""
mentions = get_recent_ticker_mentions(ticker)
if not mentions:
print(f"⚠ No new Reddit mentions found for {ticker}.")
return
conn = create_connection()
cursor = conn.cursor()
try:
cursor.executemany("""
INSERT OR REPLACE INTO reddit_mentions (ticker, title, upvotes, upvote_ratio, date, link)
VALUES (?, ?, ?, ?, ?, ?)
""", mentions)
conn.commit()
print(f"Stored {len(mentions)} Reddit mentions for {ticker}.")
except Exception as e:
print(f"Database error storing Reddit mentions: {e}")
finally:
conn.close()
def run_reddit_analysis(ticker):
"""Fetch and store Reddit mentions."""
store_reddit_mentions(ticker)
return {"ticker": ticker, "reddit_mentions": len(get_recent_ticker_mentions(ticker))}