-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathsignals.py
207 lines (180 loc) · 9.38 KB
/
signals.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import os
from datetime import datetime
import pandas as pd
import pandas_ta as ta
import numpy as np
import scipy.signal
from trader import Trader
from coin_data import CoinData
client = Trader().client
file_path = os.path.abspath(os.path.dirname(__file__))
os.chdir(file_path)
class Signals:
def __init__(self, symbol='BTCUSDT', tf='4h'):
"""Check for signals for given symbol and timeframe"""
self.symbol = symbol.upper()
self.tf = tf
self.df = CoinData.get_dataframe(symbol, tf)
self.df = self.df_ta()
# self.df = self.get_heiken_ashi()
self.vol_signal = self.vol_rise_fall()
self.vol_candle = self.large_vol_candle()
# self.HA_trend = self.get_heiken_ashi_trend(self.get_heiken_ashi(self.df))
self.rsi_ob_os_dict = {
'overbought': False,
'oversold': False,
}
self.rsi_overbought_oversold()
self.rsi_div_dict = {
'possible bearish divergence': False,
'possible bullish divergence': False,
'confirmed bearish divergence': False,
'confirmed bullish divergence': False,
}
self.rsi_divergence()
self.macd_dict = {
'MACD cross': None,
'MACD 0 cross': None,
}
self.macd_signals()
self.ema_signals_dict = {
'Price crossing EMA200': None,
'EMA20 crossing EMA50': None,
'EMA50 crossing EMA200': None,
}
self.ema_signals()
def full_check(self):
self.rsi_divergence()
self.ema_signals()
self.macd_signals()
self.rsi_overbought_oversold()
self.vol_rise_fall()
self.large_vol_candle()
def df_ta(self) -> pd.DataFrame:
df = self.df
df['rsi'] = ta.rsi(df.close, 14)
df = pd.concat((df, ta.macd(df.close, 12, 26, 9)), axis=1)
df['ema_20'], df['ema_50'] = ta.ema(df.close, 20), ta.ema(df.close, 50)
if len(df) >= 288:
df['ema_200'] = ta.ema(df.close, 200)
else:
df['ema_200'] = ta.ema(df.close, len(df.close) - 3)
df = df.tail(88)
return df
@staticmethod
def get_heiken_ashi(df):
df['HA_Close'] = (df['open'] + df['high'] + df['low'] + df['close']) / 4
idx = df.index.name
df.reset_index(inplace=True)
for i in range(0, len(df)):
if i == 0:
df.at[i, 'HA_Open'] = ((df._get_value(i, 'open') + df._get_value(i, 'close')) / 2)
else:
df.at[i, 'HA_Open'] = ((df._get_value(i - 1, 'HA_Open') + df._get_value(i - 1, 'HA_Close')) / 2)
if idx:
df.set_index(idx, inplace=True)
df['HA_High'] = df[['HA_Open', 'HA_Close', 'high']].max(axis=1)
df['HA_Low'] = df[['HA_Open', 'HA_Close', 'low']].min(axis=1)
return df
@staticmethod
def get_heiken_ashi_trend(df):
if df['HA_Close'].iloc[-1] > df['HA_Open'].iloc[-1]:
if df['HA_Close'].iloc[-2] > df['HA_Open'].iloc[-2] or all([df['HA_Low'].iloc[-2] < df['HA_Open'].iloc[-2],
df['HA_High'].iloc[-2] > df['HA_Close'].iloc[-2]]):
return True
elif df['HA_Close'].iloc[-1] < df['HA_Open'].iloc[-1]:
if df['HA_Close'].iloc[-2] < df['HA_Open'].iloc[-2] or all([df['HA_High'].iloc[-2] > df['HA_Open'].iloc[-2],
df['HA_Low'].iloc[-2] < df['HA_Close'].iloc[-2]]):
return False
else:
return None
def rsi_divergence(self):
rsi_array = np.array(self.df['rsi'].tail(20).array)
close_array = np.array(self.df['close'].tail(20).array)
rsi_peaks, _ = scipy.signal.find_peaks(rsi_array)
rsi_troughs, _ = scipy.signal.find_peaks(-rsi_array)
original_index = len(close_array)
indices = np.array([])
# bearish divergence confirmed: rsi formed lower peak while price formed higher peak
if 70 <= rsi_array[rsi_peaks[-2]] >= rsi_array[rsi_peaks[-1]] >= rsi_array[-2] >= rsi_array[-1]:
if close_array[rsi_peaks[-2]] <= close_array[rsi_peaks[-1]]:
close_array = np.array([close_array[rsi_peaks[-2]], close_array[rsi_peaks[-1]]])
rsi_array = np.array([rsi_array[rsi_peaks[-2]], rsi_array[rsi_peaks[-1]]])
indices = np.array([rsi_peaks[-2], rsi_peaks[-1]])
self.rsi_div_dict['confirmed bearish divergence'] = True
# possible bearish divergence: rsi forming lower peak while price forming higher peak
elif 70 <= rsi_array[rsi_peaks[-1]] >= rsi_array[-2] > rsi_array[-1]:
if close_array[rsi_peaks[-1]] <= close_array[-1]:
close_array = np.array([close_array[rsi_peaks[-1]], close_array[-1]])
rsi_array = np.array([rsi_array[rsi_peaks[-1]], rsi_array[-1]])
indices = np.array([rsi_peaks[-1], original_index])
self.rsi_div_dict['possible bearish divergence'] = True
# bullish divergence confirmed: rsi formed higher trough while price formed lower trough
elif 30 >= rsi_array[rsi_troughs[-2]] <= rsi_array[rsi_troughs[-1]] <= rsi_array[-2] <= rsi_array[-1]:
if close_array[rsi_troughs[-2]] >= close_array[rsi_troughs[-1]]:
close_array = np.array([close_array[rsi_troughs[-2]], close_array[rsi_troughs[-1]]])
rsi_array = np.array([rsi_array[rsi_troughs[-2]], rsi_array[rsi_troughs[-1]]])
indices = np.array([rsi_troughs[-2], rsi_troughs[-1]])
self.rsi_div_dict['confirmed bullish divergence'] = True
# possible bullish divergence: rsi forming higher trough while price forming lower trough
elif 30 >= rsi_array[rsi_troughs[-1]] <= rsi_array[-2] < rsi_array[-1]:
if close_array[rsi_troughs[-1]] >= close_array[-1]:
close_array = np.array([close_array[rsi_troughs[-1]], close_array[-1]])
rsi_array = np.array([rsi_array[rsi_troughs[-1]], rsi_array[-1]])
indices = np.array([rsi_troughs[-1], original_index])
self.rsi_div_dict['possible bullish divergence'] = True
return self.rsi_div_dict, close_array, rsi_array, indices
def rsi_overbought_oversold(self, o_s=30, o_b=70):
rsi_array = self.df['rsi'].array
if rsi_array[-3] <= o_s <= rsi_array[-2]:
self.rsi_ob_os_dict['oversold'] = True
elif rsi_array[-3] >= o_b >= rsi_array[-2]:
self.rsi_ob_os_dict['overbought'] = True
return self.rsi_ob_os_dict
def macd_signals(self):
if self.df['MACD_12_26_9'].array[-2] > self.df['MACDs_12_26_9'].array[-2]:
if self.df['MACD_12_26_9'].array[-3] < self.df['MACDs_12_26_9'].array[-3]:
self.macd_dict['MACD cross'] = True
elif self.df['MACD_12_26_9'].array[-2] < self.df['MACDs_12_26_9'].array[-2]:
if self.df['MACD_12_26_9'].array[-3] > self.df['MACDs_12_26_9'].array[-3]:
self.macd_dict['MACD cross'] = False
if (self.df['MACD_12_26_9'].array[-2], self.df['MACDs_12_26_9'].array[-2]) > (0, 0):
if (self.df['MACD_12_26_9'].array[-3], self.df['MACDs_12_26_9'].array[-3]) <= (0, 0):
self.macd_dict['MACD 0 cross'] = True
elif (self.df['MACD_12_26_9'].array[-2], self.df['MACDs_12_26_9'].array[-2]) < (0, 0):
if (self.df['MACD_12_26_9'].array[-3], self.df['MACDs_12_26_9'].array[-3]) >= (0, 0):
self.macd_dict['MACD 0 cross'] = False
if self.df['MACD_12_26_9'].array[-1] > self.df['MACDs_12_26_9'].array[-1]:
self.macd_dict['MACD up'] = True
else:
self.macd_dict['MACD up'] = False
def ema_signals(self):
ema_200 = self.df['ema_200'].array[-3:]
ema_50 = self.df['ema_50'].array[-3:]
ema_20 = self.df['ema_20'].array[-3:]
price = self.df['close'].array[-3:]
if ema_200[0] > price[0] and ema_200[1] >= price[1] and ema_200[2] < price[2]:
self.ema_signals_dict['Price crossing EMA200'] = True
elif ema_200[0] < price[0] and ema_200[1] <= price[1] and ema_200[2] > price[2]:
self.ema_signals_dict['Price crossing EMA200'] = False
if ema_20[0] > ema_50[0] and ema_20[1] >= ema_50[1] and ema_20[2] < ema_50[2]:
self.ema_signals_dict['EMA20 crossing EMA50'] = False
elif ema_20[0] < ema_50[0] and ema_20[1] <= ema_50[1] and ema_20[2] > ema_50[2]:
self.ema_signals_dict['EMA20 crossing EMA50'] = True
if ema_50[0] > ema_200[0] and ema_50[1] >= ema_200[1] and ema_50[2] < ema_200[2]:
self.ema_signals_dict['EMA50 crossing EMA200'] = False
elif ema_50[0] < ema_200[0] and ema_50[1] <= ema_200[1] and ema_50[2] > ema_200[2]:
self.ema_signals_dict['EMA50 crossing EMA200'] = True
return self.ema_signals_dict
def vol_rise_fall(self):
recent_vol = self.df.volume.tail(3).array
self.vol_signal = True if recent_vol[0] < recent_vol[1] < recent_vol[2] else False
return self.vol_signal
def large_vol_candle(self):
self.vol_candle = True if self.df.volume.array[-1] >= self.df.volume.tail(14).values.mean()*2 else False
return self.vol_candle
if __name__ == '__main__':
x = datetime.now()
df = CoinData.get_dataframe('BTCUSDT', '15m')
print(Signals.get_heiken_ashi(df))
print(datetime.now() - x)