-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathindicators.py
More file actions
394 lines (342 loc) · 15 KB
/
indicators.py
File metadata and controls
394 lines (342 loc) · 15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
"""
Features Needed:
* taapi.io api key
* Aggregate each symbol and indicator, then pull all available indicators using the taapi.io client
* Allow the user to save indicator templates so that they don't have to enter long ones again
* Message to telegram should be a list of indicators with their documentation hyperlinked:
- https://stackoverflow.com/questions/45268368/how-to-send-a-hyperlink-in-bot-sendmessage
- Example:
ABOVE
BELOW
PCTCHG
MAXINDEX (Index of highest value over a specified period)
MEDPRICE (Median Price)
SAR (Parabolic Sar)
SIN (Vector Trigonometric Sin)
* Need a combination of indicators (AND/OR)
"""
import json
from time import time, sleep
from typing import Union
import os
from .user_configuration import (
get_whitelist,
LocalUserConfiguration,
MongoDBUserConfiguration,
)
from .config import *
from .logger import logger
from .utils import get_ratelimits
import requests
from ratelimit import limits, sleep_and_retry
class TADatabaseClient:
"""This client should handle the cross-process operations of the technical analysis indicators database"""
def build_ta_db(self):
pass
def dump_ref(self, data: dict) -> None:
"""Update the technical analysis indicators database"""
with open(TA_DB_PATH, "w") as out:
out.write(json.dumps(data, indent=2))
def fetch_ref(self) -> dict:
"""Get the location of and return the technical analysis indicators database in JSON format"""
return json.loads(open(TA_DB_PATH).read())
def add_indicator(
self,
indicator_id: str,
name: str,
endpoint: str,
reference_url: str,
params: list[tuple[str, str, bool]],
output: list[str],
indicator_type: str = "t",
) -> None:
"""
Add an indicator to the TA database.
If the indicator already exists, it will be overwritten with the new data.
:param indicator_id: The uppercase indicator ID a shown on taapi.io (e.g. 2CROWS)
:param name: The indicator name as shown on taapi.io (e.g. Two Crows)
:param endpoint: The endpoint URL following this format:
https://api.taapi.io/2crows?secret={api_key}&exchange=binance
:param reference_url: The link to the indicator's documentation on taapi.io
:param params: List of tuples containing parameter data
- param_id: The ID for the parameter as shown on the API parameters documentation (e.g. "symbol")
- param_description: A docstring that lets the user know what data the parameter expects
- param_default: The default value for the parameter, or None if the parameter is strictly required
:param output: List of output variables that are returned by the API so that the user can choose which
to use, and so that the bot can know how to process the response.
NOTE: All return types for the output_values are considered as FLOAT
:param indicator_type: "s" for simple indicator, and "t" for technical indicator
"""
db = self.fetch_ref()
db[indicator_id.upper()] = {
"name": name,
"endpoint": endpoint,
"ref": reference_url,
"params": params,
"output": output,
"type": indicator_type,
}
self.dump_ref(db)
print(f"{name} ({indicator_id}) indicator added to TA database.")
def get_indicator(self, _id: str, key=None):
"""Returns the indicator data from the database"""
try:
if key is not None:
return self.fetch_ref()[_id.upper()][key]
else:
return self.fetch_ref()[_id.upper()]
except KeyError:
raise ValueError(f"'{_id}' is an invalid indicator ID")
def validate_indicator(
self, indicator: str, args: list = None
) -> Union[dict, None]:
"""
:param indicator: The uppercase indicator symbol
:param args: Any param or output IDs
:return: indicator if one is found and valid, None if not valid.
"""
db = self.fetch_ref()
try:
indicator = db[indicator]
except KeyError:
return None
if args is not None:
params = [param[0] for param in indicator["params"]]
output_vals = [val[0] for val in indicator["output"]]
if not any(arg in params or arg in output_vals for arg in args):
return None
return indicator
class TAAggregateClient:
def __init__(self):
self.indicators_db_cli = TADatabaseClient()
self.indicators_reference = self.indicators_db_cli.fetch_ref()
def build_ta_aggregate(self, ta_db: dict = None):
"""
Build the TA aggregate of all users' alert databases to update the technical analysis reference
(Simply constructs the aggregate, does not call the API)
:param ta_db: Can optionally be provided if the ta_db is already stored in a higher level function.
Structure Reference:
{
symbol: {
interval: {
[bulk_query_formatted_alert]
}
}
}
"""
# logger.info("Building TA aggregate...")
if ta_db is None:
ta_db = self.indicators_reference
# Fetch the old aggregate to get previous values
try:
old_agg = self.load_agg()
except FileNotFoundError:
old_agg = {}
self.dump_agg(old_agg)
except Exception as exc:
logger.exception("Could not load TA aggregate", exc_info=exc)
raise exc
# Create the new aggregate to weed out unused indicators:
agg = {}
for user in get_whitelist():
alerts_data = (
LocalUserConfiguration(user).load_alerts()
if not USE_MONGO_DB
else MongoDBUserConfiguration(user).load_alerts()
)
for symbol, alerts in alerts_data.items():
if symbol not in agg.keys():
agg[symbol] = {}
for alert in alerts:
if alert["type"] == "s":
continue
if alert["interval"] not in agg[symbol].keys():
agg[symbol][alert["interval"]] = []
# Build the alert to store in the aggregate with format prepared to be sent to the API in bulk call
formatted_alert = self.format_alert_for_match(alert)
# Attempt to find an existing match to have previous values persist
match = None
try:
for indicator in old_agg[symbol][alert["interval"]]:
try:
if all(
indicator[k] == v
for k, v in formatted_alert.items()
):
match = indicator
break
except KeyError:
continue
except KeyError:
pass
if match is not None:
formatted_alert = match
else:
formatted_alert["values"] = {
var: None
for var in ta_db[alert["indicator"].upper()]["output"]
}
formatted_alert["last_update"] = 0
# Add the formatted alert to the database
agg[symbol][alert["interval"]].append(formatted_alert)
# Update the aggregate with the new data
self.dump_agg(agg)
# logger.info("TA aggregate built.")
def format_alert_for_match(self, alert: dict):
formatted_alert = {"indicator": alert["indicator"].lower()}
for param, _, default_value in self.indicators_db_cli.get_indicator(
_id=alert["indicator"].upper(), key="params"
):
try:
formatted_alert[param] = alert["params"][param]
except KeyError:
formatted_alert[param] = default_value
return formatted_alert
def dump_agg(self, data: dict) -> None:
if not os.path.isdir(os.path.dirname(AGG_DATA_LOCATION)):
os.mkdir(os.path.dirname(AGG_DATA_LOCATION))
with open(AGG_DATA_LOCATION, "w") as outfile:
outfile.write(json.dumps(data, indent=2))
def load_agg(self) -> dict:
try:
with open(AGG_DATA_LOCATION, "r") as infile:
contents = infile.read()
return json.loads(contents)
except FileNotFoundError:
self.dump_agg({})
return self.load_agg()
def clean_agg(self) -> None:
"""Remove all unused indicators from the aggregate"""
try:
agg = self.load_agg()
except Exception as exc:
logger.exception("Could not load TA aggregate to clean", exc_info=exc)
raise exc
class TaapiioProcess:
"""Taapi.io process should be run in a separate thread to allow for sleeping between API calls"""
def __init__(self, taapiio_apikey: str, telegram_bot_token: str = None):
self.apikey = taapiio_apikey
self.last_call = 0 # Implemented instead of the ratelimit package solution to solve the buffer issue
self.ta_db = (
TADatabaseClient().fetch_ref()
) # TA DB is static and can be loaded once
self.agg_cli = TAAggregateClient()
self.tg_bot_token = telegram_bot_token # Can be left blank, but the process wont be able to report errors
@sleep_and_retry
@limits(
calls=get_ratelimits()[0],
period=round(get_ratelimits()[1] * (1 + REQUEST_BUFFER), 1),
)
# @tiered_rate_limit()
def call_api(self, endpoint: str, params: dict, r_type: str = "POST") -> dict:
"""
Calls the taapi.io API and returned the response in JSON format
Free API key limit is 1 call every 15 seconds, we use +1 to add a safety buffer
"""
if r_type == "GET":
return requests.get(
endpoint.format(api_key=self.apikey), params=params
).json()
elif r_type == "POST":
logger.info(f"Sending bulk query to API: {params}")
return requests.post(endpoint, json=params).json()
def mainloop(self):
"""
Run the process mainloop as fast as possible while respecting the API call limit
Exceptions should be handled at a higher level than this function
"""
logger.warn("Taapi.io process started.")
previous_rates = (
[]
) # Store the last 5 values for process time to fetch and update all values in the aggregate
while True:
start = time()
num_indicators = 0
# 1. Build to aggregate before every cycle
self.agg_cli.build_ta_aggregate(self.ta_db)
# 2. Poll all values from the aggregate using bulk queries to the taapi.io API
aggregate = self.agg_cli.load_agg()
if all(len(v) == 0 for v in aggregate.values()):
sleep(0.1) # To prevent excessive spamming
continue
for symbol, intervals in aggregate.items():
for interval, indicators in intervals.items():
num_indicators += len(indicators) # For logging
# Prepare the bulk query for the API
exclude_keys = ["values", "last_update"]
indicators_query = [
{k: v for k, v in indicator.items() if k not in exclude_keys}
for indicator in indicators
]
query = {
"secret": self.apikey,
"construct": {
"exchange": DEFAULT_EXCHANGE,
"symbol": symbol,
"interval": interval,
"indicators": indicators_query,
},
}
r = self.call_api(endpoint=BULK_ENDPOINT, params=query)
# print("TAAPI.IO RESPONSE:", r)
try:
responses = r["data"]
except KeyError:
# if "error" in r.keys():
# logger.warn(f"Taapio error occurred when building aggregate: {r['error']}")
raise Exception(f"Error occurred calling taapi.io API - {r}")
# Assign returned values and update aggregate:
for i, response in enumerate(responses):
for output_variable in self.ta_db[
indicators[i]["indicator"].upper()
]["output"]:
indicators[i]["values"][output_variable] = response[
"result"
][output_variable]
indicators[i]["last_update"] = int(time())
# 3. Dump aggregate with updated values so that the alerts client can reference it
self.agg_cli.dump_agg(aggregate)
# print("End Aggregate:")
# print(json.dumps(aggregate, indent=2))
# Logging
previous_rates.append(round(time() - start, 1))
if len(previous_rates) > 3:
del previous_rates[0]
logger.info(
f"TA Aggregate updated. "
f"Average through process rate: {round(sum(previous_rates) / len(previous_rates), 1)} seconds"
)
def alert_admins(self, message: str) -> None:
if self.tg_bot_token is None:
logger.warn(
f"Attempted to alert admins, but no telegram bot token was set: {message}"
)
return None
for user in get_whitelist():
admin = (
LocalUserConfiguration(user).admin_status()
if not USE_MONGO_DB
else MongoDBUserConfiguration(user).admin_status()
)
if admin:
requests.post(
url=f"https://api.telegram.org/bot{self.tg_bot_token}/sendMessage",
params={"chat_id": user, "text": message},
)
def run(self) -> None:
restart_period = 15
try:
self.mainloop()
except KeyboardInterrupt:
return
except Exception as exc:
logger.critical(
f"An error has occurred in the mainloop - restarting in 5 seconds...",
exc_info=exc,
)
self.alert_admins(
message=f"A critical error has occurred in the TaapiioProcess "
f"(Restarting in {restart_period} seconds) - {exc}"
)
sleep(restart_period)
return self.run()