-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathsignal_data.py
176 lines (148 loc) · 6.04 KB
/
signal_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import os
import asyncio
import sys
from datetime import datetime
import aiohttp
import nest_asyncio
import sqlite3
import pandas as pd
import pandas_ta as ta
import numpy as np
import scipy.signal
from binance.websockets import BinanceSocketManager
from twisted.internet import reactor
from trader import Trader
from utils import get_popular_coins
nest_asyncio.apply()
file_path = os.path.abspath(os.path.dirname(__file__))
os.chdir(file_path)
client = Trader().client
loop = asyncio.get_event_loop()
TIME_FRAMES = ['15m', '1h', '4h']
class SymbolData:
"""Start websocket for live klines and get historical klines that don't exist"""
class SignalData:
@classmethod
async def return_dataframes(cls, symbol, event_loop):
"""Get complete dataframes for given symbol with signals data for all given timeframes
Main method for initial data preparation"""
return await cls.add_ta_data(await cls.get_original_data(symbol), event_loop)
# return await cls.get_original_data(symbol)
@classmethod
async def get_original_data(cls, symbol):
"""Load price data from the database
This has to happen first"""
conn = sqlite3.connect('symbols.db')
dfs = []
for tf in TIME_FRAMES:
query = f'SELECT * from {symbol}_{tf}'
df = pd.read_sql_query(query, conn)
dfs.append(df)
conn.close()
return dfs
@classmethod
async def add_ta_data(cls, dfs, event_loop):
"""Create columns for RSI, MACD(m/s/h), EMAs(20/50/200), Heiken Ashi
These can all be run as coroutine tasks"""
coroutines = [[],[],[]]
for i, df in enumerate(dfs):
coroutines[i].append(cls.get_rsi(df))
coroutines[i].append(cls.get_macd(df))
coroutines[i].append(cls.get_emas(df))
coroutines[i].append(cls.get_heiken_ashi(df))
# run coroutines with event loop and get return VALUES
dfs_15 = event_loop.run_until_complete(asyncio.gather(*coroutines[0]))
dfs_1h = event_loop.run_until_complete(asyncio.gather(*coroutines[1]))
dfs_4h = event_loop.run_until_complete(asyncio.gather(*coroutines[2]))
comp_dfs = [pd.concat([dfs_15[i] for i in range(len(dfs_15))], axis=1),
pd.concat([dfs_1h[i] for i in range(len(dfs_1h))], axis=1),
pd.concat([dfs_4h[i] for i in range(len(dfs_4h))], axis=1)]
# return return values
return comp_dfs
@classmethod
async def get_rsi(cls, df):
return ta.rsi(df.close, 14)
@classmethod
async def get_macd(cls, df):
return ta.macd(df.close, 12, 26, 9)
@classmethod
async def get_emas(cls, df):
if len(df) >= 20:
df['ema_20'] = ta.ema(df.close, 20)
else:
df['ema_20'], df['ema_50'] = ta.ema(df.close, len(df.close) - 3), ta.ema(df.close, len(df.close) - 3)
if len(df) >= 50:
df['ema_20'], df['ema_50'] = ta.ema(df.close, len(df.close) - 3), ta.ema(df.close, len(df.close) - 3)
else:
df['ema_20'] = ta.ema(df.close, 20)
df['ema_50'] = ta.ema(df.close, len(df.close) - 3)
df['ema_200'] = ta.ema(df.close, len(df.close) - 3)
df['ema_20'], df['ema_50'] = ta.ema(df.close, 20), ta.ema(df.close, 50)
if len(df) >= 200:
df['ema_20'], df['ema_50'] = ta.ema(df.close, 20), ta.ema(df.close, 50)
df['ema_200'] = ta.ema(df.close, 200)
else:
df['ema_200'] = ta.ema(df.close, len(df.close) - 3)
df = df.tail(88)
return df[['ema_20', 'ema_50', 'ema_200']]
@classmethod
async def get_heiken_ashi(cls, df):
df['HA_Close'] = (df['open'] + df['high'] + df['low'] + df['close']) / 4
idx = df.index.name
df.reset_index(inplace=True)
for i in range(0, len(df)):
if i == 0:
df.at[i, 'HA_Open'] = ((df._get_value(i, 'open') + df._get_value(i, 'close')) / 2)
else:
df.at[i, 'HA_Open'] = ((df._get_value(i - 1, 'HA_Open') + df._get_value(i - 1, 'HA_Close')) / 2)
if idx:
df.set_index(idx, inplace=True)
df['HA_High'] = df[['HA_Open', 'HA_Close', 'high']].max(axis=1)
df['HA_Low'] = df[['HA_Open', 'HA_Close', 'low']].min(axis=1)
return df[['HA_Open', 'HA_High', 'HA_Low', 'HA_Close']]
@classmethod
async def check_df(cls, df, event_loop):
coroutines = [cls.check_rsi(df), cls.check_macd(df), cls.check_heiken_ashi(df)]
results = event_loop.run_until_complete(asyncio.gather(*coroutines))
return results
@classmethod
async def check_rsi(cls, df):
# await asyncio.sleep(2)
# print('checked rsi')
return 'RSI'
@classmethod
async def check_macd(cls, df):
# await asyncio.sleep(1)
# print('checked macd')
return False, True
@classmethod
async def check_heiken_ashi(cls, df):
# await asyncio.sleep(2)
# print('checked ha')
return 'HA'
@classmethod
async def main(cls, symbol, event_loop):
# start_time = datetime.now()
dfs = await cls.return_dataframes(symbol, event_loop)
coroutines = [cls.check_df(dfs[i], event_loop) for i in range(len(dfs))]
results = event_loop.run_until_complete(asyncio.gather(*coroutines))
# print(f'took: ' + str(datetime.now() - start_time))
return results
if __name__ == '__main__':
try:
conn = sqlite3.connect('symbols.db')
curs = conn.cursor()
tabs = {tab[0] for tab in curs.execute("select name from sqlite_master where type = 'table'").fetchall()}
conn.close()
coroutines = []
start_time = datetime.now()
for s in get_popular_coins():
if s not in {'XEMUSDT', '1INCHUSDT'}:
coroutines.append(SignalData.main(s, loop))
data = loop.run_until_complete(asyncio.gather(*coroutines))
loop.close()
print(data)
except IndexError as e:
print(e)
finally:
print(f'took: ' + str(datetime.now() - start_time))