-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbot.py
199 lines (159 loc) · 5.99 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
import asyncio
from helpers import check_and_analyze, input_sanity_check_analyzing, check_signal_limit
from plot_build_helpers import plot_price_chart
from message_handlers import (
select_indicators,
handle_indicator_selection,
manage_signals,
handle_signal_menu_callback,
handle_signal_text_input,
CHOOSING_ACTION,
TYPING_SIGNAL_DATA,
)
from signal_detection import (
generate_price_prediction_signal_proba,
createSignalJob,
deleteSignalJob,
initialize_jobs,
)
from database import get_user_preferences
from utils import plural_helper
from telegram import Update # type: ignore
from telegram.ext import ( # type: ignore
ApplicationBuilder,
CommandHandler,
ContextTypes,
CallbackContext,
CallbackQueryHandler,
MessageHandler,
filters,
ConversationHandler,
)
from dotenv import load_dotenv # type: ignore
import os
import logging
# Configure logging
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
load_dotenv()
async def send_crypto_chart(update: Update, context: CallbackContext):
"""
Telegram handler to fetch OHLC data, analyze indicators, and send the chart back to the user.
"""
chat_id = update.effective_chat.id
user_id = update.effective_user.id
preferences = get_user_preferences(user_id)
# Analyze data
(indicators, df) = await check_and_analyze(
update, user_id, preferences, context.args
)
# Plot chart
chart_path = plot_price_chart(df, indicators)
if chart_path is None:
await update.message.reply_text("Error generating the chart. Please try again.")
return
# 1) Generate the probability-based signal:
_, _, _, reason_str = generate_price_prediction_signal_proba(df, indicators)
# Send the chart to the user
with open(chart_path, "rb") as f:
await context.bot.send_photo(chat_id=chat_id, photo=f)
await update.message.reply_text(
f" {reason_str}"
) # Indentation needed for correct representation of the message
df = df.reset_index(drop=True)
async def send_text_data(update: Update, context: CallbackContext):
"""
Telegram handler to fetch OHLC data for a user-specified crypto pair, time period, interval, and liquidity level detection tolerance
plot the candlestick chart, and send it back to the user.
Usage: /text_result <symbol> <hours> <interval> <tolerance>, e.g. /text_result BTCUSDT 42 15m 0.03
"""
user_id = update.effective_user.id
preferences = get_user_preferences(user_id)
# Analyze data
(indicators, df) = await check_and_analyze(
update, user_id, preferences, context.args
)
await update.message.reply_text(str(indicators))
df = df.reset_index(drop=True)
async def create_signal_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""
Command handler to create a signal job.
Usage: /create_signal <SYMBOL> <MINUTES> [<IS_WITH_CHART>]
Example: /create_signal BTCUSDT 60 True
Note: is_with_chart is an optional argunent. Default value is false
"""
if await check_signal_limit(update):
return
args = context.args
pair = await input_sanity_check_analyzing(True, args, update)
if not pair:
await update.message.reply_text(
f"Usage: /create_signal <symbol> <period_in_minutes> [<is_with_chart>], you've sent {len(args)} argument{plural_helper(len(args))}."
)
else:
try:
await createSignalJob(pair[0], pair[1], pair[2], update, context)
except Exception as e:
print(f"Unexpected error: {e}")
await update.message.reply_text("❌ An unexpected error occurred.")
async def delete_signal_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""
Command handler to delete a specific signal job.
Usage: /delete_signal <SYMBOL>
Example: /delete_signal BTCUSDT
"""
args = context.args
pair = await input_sanity_check_analyzing(False, args, update)
if not pair:
await update.message.reply_text(
f"Usage: /delete_signal <symbol>, you've sent {len(args)} argument{plural_helper(len(args))}."
)
else:
try:
await deleteSignalJob(pair[0], update)
except Exception as e:
print(f"Unexpected error: {e}")
await update.message.reply_text("❌ An unexpected error occurred.")
async def initialize_jobs_handler(application):
"""
Initialize all user signal jobs from the database when the application starts.
"""
await initialize_jobs(application)
if __name__ == "__main__":
from database import init_db
init_db()
TOKEN = os.getenv("API_TELEGRAM_KEY")
app = ApplicationBuilder().token(TOKEN).build()
job_queue = app.job_queue
app.add_handler(CommandHandler("chart", send_crypto_chart))
app.add_handler(CommandHandler("text_result", send_text_data))
app.add_handler(CommandHandler("select_indicators", select_indicators))
app.add_handler(
CallbackQueryHandler(handle_indicator_selection, pattern=r"^indicator_")
)
app.add_handler(CommandHandler("create_signal", create_signal_command))
app.add_handler(CommandHandler("delete_signal", delete_signal_command))
manage_signals_conv_handler = ConversationHandler(
entry_points=[CommandHandler("manage_signals", manage_signals)],
states={
CHOOSING_ACTION: [
CallbackQueryHandler(handle_signal_menu_callback),
],
TYPING_SIGNAL_DATA: [
MessageHandler(
filters.TEXT & ~filters.COMMAND, handle_signal_text_input
),
],
},
fallbacks=[
# Could add a "/cancel" fallback
],
)
app.add_handler(manage_signals_conv_handler)
# Initialize jobs after the bot starts
app.job_queue.run_once(
lambda _: asyncio.create_task(initialize_jobs_handler(app)), when=0
)
print("Bot is running...")
app.run_polling()