Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 39 additions & 222 deletions bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import requests
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, ContextTypes, ConversationHandler, filters
import matplotlib.pyplot as plt
from io import BytesIO

# Constants
BOT_USERNAME: Final = 'xyz'
Expand All @@ -16,13 +18,8 @@
# Supported currencies
SUPPORTED_CURRENCIES = ['usd', 'eur', 'gbp', 'jpy', 'aud', 'cad', 'chf', 'cny', 'inr']



# API HELPER FUNCTIONS

# def get_top_cryptos(limit=100):
def get_top_cryptos(is_comparing=False,limit=100):

def get_top_cryptos(is_comparing=False, limit=100):
response = requests.get(f"{COINGECKO_API_URL}/coins/markets", params={
'vs_currency': 'usd',
'order': 'market_cap_desc',
Expand All @@ -34,8 +31,6 @@ def get_top_cryptos(is_comparing=False,limit=100):
return response.json()
return []



def get_trending_cryptos():
response = requests.get(f"{COINGECKO_API_URL}/search/trending")
if response.status_code == 200:
Expand All @@ -50,7 +45,6 @@ def get_crypto_details(crypto_id: str, currency: str = 'usd'):
return data.get(crypto_id)
return None


# COMMAND HANDLER FUNCTIONS
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
await show_main_menu(update, context)
Expand All @@ -67,15 +61,12 @@ async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No
)
await update.message.reply_text(help_text)



# Menu Display and Button Handlers
async def show_main_menu(update: Update, context: ContextTypes.DEFAULT_TYPE, is_comparing: bool = False) -> None:

keyboard = [
[InlineKeyboardButton("Top 100 Cryptocurrencies", callback_data='top100')],
[InlineKeyboardButton("Trending Cryptocurrencies", callback_data='trending')],
[InlineKeyboardButton("Search Cryptocurrency", callback_data='search')], # Added missing comma here
[InlineKeyboardButton("Search Cryptocurrency", callback_data='search')],
[InlineKeyboardButton("Quit", callback_data='quit')]
]
reply_markup = InlineKeyboardMarkup(keyboard)
Expand All @@ -91,9 +82,7 @@ async def show_main_menu(update: Update, context: ContextTypes.DEFAULT_TYPE, is_
else:
await update.message.reply_text(text, reply_markup=reply_markup)


async def show_crypto_list(update: Update, context: ContextTypes.DEFAULT_TYPE, cryptos, title) -> None:

keyboard = []
for i in range(0, len(cryptos), 2):
row = []
Expand Down Expand Up @@ -152,10 +141,7 @@ async def show_crypto_details(update: Update, context: ContextTypes.DEFAULT_TYPE
else:
await update.callback_query.edit_message_text("🚫 Unable to retrieve cryptocurrency details.")



async def show_currency_options(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:

keyboard = [
[InlineKeyboardButton(currency.upper(), callback_data=f"currency:{currency}")]
for currency in SUPPORTED_CURRENCIES
Expand All @@ -164,21 +150,43 @@ async def show_currency_options(update: Update, context: ContextTypes.DEFAULT_TY
reply_markup = InlineKeyboardMarkup(keyboard)
await update.callback_query.edit_message_text('Choose a currency:', reply_markup=reply_markup)


# Historical Data
def get_historical_data(crypto_id: str, currency: str = 'usd', days: int = 7):
params = {'vs_currency': currency, 'days': days, 'interval': 'daily'}
response = requests.get(f"{COINGECKO_API_URL}/coins/{crypto_id}/market_chart", params=params)
if response.status_code == 200:
data = response.json()
prices = data.get('prices', [])
return prices
return []

def generate_price_chart(prices, days):
dates = [price[0] for price in prices]
values = [price[1] for price in prices]

plt.figure(figsize=(10, 5))
plt.plot(dates, values)
plt.title(f"{days} Day Price Chart")
plt.xlabel("Date")
plt.ylabel("Price")
plt.grid(True)
plt.xticks(rotation=45)

buf = BytesIO()
plt.savefig(buf, format='png')
buf.seek(0)
return buf

# Callback Query Handler
async def button_click(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:

query = update.callback_query

await query.answer()

if query.data == 'main_menu':
# Instead of deleting the message, just edit it to show the main menu
try:
await show_main_menu(update, context)
await show_main_menu(update, context)
except Exception as e:
print(f"Error displaying main menu: {e}")

return MAIN_MENU

if query.data == 'top100':
Expand All @@ -187,213 +195,22 @@ async def button_click(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No
await show_crypto_list(update, context, cryptos, "Top 100 Cryptocurrencies:")
return CHOOSING_CRYPTO
elif query.data == 'quit':
# Quit the current conversation and show a message
await query.edit_message_text("You can return to the main menu anytime by using /start.")

return MAIN_MENU # This will allow the user to start again later


elif query.data == 'trending':
await query.edit_message_text("Fetching trending cryptocurrencies, please wait...")
cryptos = get_trending_cryptos()
await show_crypto_list(update, context, cryptos, "Trending Cryptocurrencies:")

return CHOOSING_CRYPTO

elif query.data == 'search':
await query.edit_message_text("Please enter the name of the cryptocurrency you want to check:")

await query.edit_message_text("Please type the name or symbol of the cryptocurrency.")
return TYPING_SEARCH
elif query.data.startswith('crypto:'):
context.user_data['crypto'] = query.data.split(':')[1]
await show_currency_options(update, context)

return CHOOSING_CURRENCY
elif query.data.startswith('currency:'):
currency = query.data.split(':')[1]
crypto_id = context.user_data.get('crypto', 'bitcoin')
await show_crypto_details(update, context, crypto_id, currency)

return COMPARE_SELECTION
elif query.data == 'compare_selection':
crypto_id = context.user_data.get('crypto')

if not crypto_id:
await query.edit_message_text("Please select a cryptocurrency before comparing.")
return

# Fetch the top 100 currencies again for selection
await query.edit_message_text("Fetching top 100 currencies for comparison, please wait...")
cryptos = get_top_cryptos(is_comparing=True) # Fetch top 100 for comparison
await show_crypto_list(update, context, cryptos, f"Compare {crypto_id} with another currency:")

# Now wait for the user to select a new currency to compare
return CHOOSING_CURRENCY

elif query.data == 'cancel_compare':
await query.edit_message_text("Comparison cancelled.")

else:
await query.edit_message_text("Invalid selection. Returning to main menu.")
await show_main_menu(update, context)

return MAIN_MENU




# **New Function to Show Comparison Options**
# Updated show_compare_options function
async def show_compare_options(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:

cryptos = get_top_cryptos() # Fetch the top 100 cryptocurrencies

keyboard = []
for i in range(0, len(cryptos), 2):
row = []
for crypto in cryptos[i:i + 2]:
name = crypto.get('name', 'Unknown')
symbol = crypto.get('symbol', 'Unknown')
crypto_id = crypto.get('id', 'unknown')
row.append(InlineKeyboardButton(f"{name} ({symbol.upper()})", callback_data=f"compare:{crypto_id}"))
keyboard.append(row)

keyboard.append([InlineKeyboardButton("Cancel", callback_data='cancel_compare')])
reply_markup = InlineKeyboardMarkup(keyboard)

# Skip the welcome message during comparison
await show_main_menu(update, context, is_comparing=True)



# **New Function to Handle Comparison Prompt**
async def compare_prompt_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
# Send the message asking user to select another cryptocurrency
await update.callback_query.message.reply_text("Select the another Cryptocurrency...")

# Then proceed to show the comparison options
await show_compare_options(update, context)
return COMPARE_SELECTION


# 6. Message and Error Handlers
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:

user_input = update.message.text.lower()
search_results = requests.get(f"{COINGECKO_API_URL}/search", params={'query': user_input}).json()
coins = search_results.get('coins', [])

if coins:
await show_crypto_list(update, context, coins[:10], "Search Results:")
return CHOOSING_CRYPTO
else:
await update.message.reply_text("Sorry, I couldn't find any cryptocurrency matching your search.")
await show_main_menu(update, context)
# Handle currency selection
if query.data.startswith("currency:"):
selected_currency = query.data.split(":")[1]
context.user_data['currency'] = selected_currency
await query.edit_message_text(f"Currency set to {selected_currency.upper()}!\n\nYou can now continue.")
return MAIN_MENU

# Error Handler
async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):

print(f"Update {update} caused error {context.error}")


# Convert Currency into USD
# Function to fetch crypto price
def get_crypto_price(crypto: str, currency: str = 'usd'):
params = {'ids': crypto, 'vs_currencies': currency}
response = requests.get(COINGECKO_API_URL+'/simple/price', params=params)
data = response.json()
return data.get(crypto, {}).get(currency, 'Price not available')


# Convert crypto to different currencies
async def convert_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
if len(context.args) < 3:
await update.message.reply_text("Please use the format: /convert <crypto> <currency> <amount>\n\nFor Example `/convert bitcoin usd 1` - Convert bitcoin price into usd by fettching real time data\n\n")
return
crypto = context.args[0].lower()
currency = context.args[1].lower()
amount = float(context.args[2])
price = get_crypto_price(crypto, currency)
if price != 'Price not available':
converted_amount = price * amount
await update.message.reply_text(f"{amount} {crypto.capitalize()} is worth {converted_amount} {currency.upper()}.")
else:
await update.message.reply_text('Price not available.')


# Add alerts

# Function to set up price alerts
user_alerts = {}

def set_price_alert(user_id, crypto, threshold_price, condition):
user_alerts[user_id] = (crypto, threshold_price, condition)

async def set_alert_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
if len(context.args) < 3:
usage_text = (
"Please use the format: /setalert <crypto> <above|below> <price>\n"
)
await update.message.reply_text(usage_text)
return

crypto = context.args[0].lower() # Get the cryptocurrency (e.g., 'bitcoin')
condition = context.args[1].lower() # Get the condition (e.g., 'above' or 'below')
price = float(context.args[2]) # Get the price threshold

if condition not in ['above', 'below']:
await update.message.reply_text("Please specify 'above' or 'below' for the price alert condition.")
return

user_id = update.message.from_user.id # Get the user's ID

# Save the alert with the condition (above or below)
set_price_alert(user_id, crypto, price, condition)

# Notify the user that the alert has been set
await update.message.reply_text(
f"Price alert set for {crypto.capitalize()} when price is {condition} ${price} USD.\n"
"You'll be notified when this condition is met."
)

async def alert_check(context: ContextTypes.DEFAULT_TYPE):
for user_id, (crypto, threshold_price, condition) in user_alerts.items():
price = get_crypto_price(crypto)

# Check if the condition (above or below) is met
if (condition == 'above' and price >= threshold_price) or (condition == 'below' and price <= threshold_price):
await context.bot.send_message(
chat_id=user_id,
text=f"Price alert! {crypto.capitalize()} has {'exceeded' if condition == 'above' else 'dropped below'} ${threshold_price} USD. Current price: ${price} USD."
)


def main() -> None:
app = Application.builder().token(BOT_TOKEN).build()

conv_handler = ConversationHandler(
entry_points=[CommandHandler("start", start)],
states={
MAIN_MENU: [CallbackQueryHandler(button_click)],
CHOOSING_CRYPTO: [CallbackQueryHandler(button_click)],
CHOOSING_CURRENCY: [CallbackQueryHandler(button_click)],
TYPING_SEARCH: [MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)],
COMPARE_SELECTION: [CallbackQueryHandler(button_click)] # Added COMPARE_SELECTION
},
fallbacks=[CommandHandler("start", start)],
per_message=False
)

app.add_handler(conv_handler)
app.add_handler(CommandHandler('convert', convert_command))
app.add_handler(CommandHandler('setalert', set_alert_command))
app.add_handler(CommandHandler("help", help_command))
app.add_error_handler(error_handler)
# Run alert checker periodically
app.job_queue.run_repeating(alert_check, interval=60)

print('Starting bot...')
app.run_polling(poll_interval=3)

if __name__ == '__main__':
main()