-
-
Notifications
You must be signed in to change notification settings - Fork 780
/
Copy pathcustsignalmod.py
84 lines (69 loc) · 3.15 KB
/
custsignalmod.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# Available indicators here: https://python-tradingview-ta.readthedocs.io/en/latest/usage.html#retrieving-the-analysis
from tradingview_ta import TA_Handler, Interval, Exchange
# use for environment variables
import os
# use if needed to pass args to external modules
import sys
# used for directory handling
import glob
import time
import threading
OSC_INDICATORS = ['MACD', 'Stoch.RSI', 'Mom'] # Indicators to use in Oscillator analysis
OSC_THRESHOLD = 2 # Must be less or equal to number of items in OSC_INDICATORS
MA_INDICATORS = ['EMA10', 'EMA20'] # Indicators to use in Moving averages analysis
MA_THRESHOLD = 2 # Must be less or equal to number of items in MA_INDICATORS
INTERVAL = Interval.INTERVAL_5_MINUTES #Timeframe for analysis
EXCHANGE = 'BINANCE'
SCREENER = 'CRYPTO'
PAIR_WITH = 'USDT'
TICKERS = 'signalsample.txt'
TIME_TO_WAIT = 4 # Minutes to wait between analysis
FULL_LOG = False # List analysis result to console
def analyze(pairs):
signal_coins = {}
analysis = {}
handler = {}
if os.path.exists('signals/custsignalmod.exs'):
os.remove('signals/custsignalmod.exs')
for pair in pairs:
handler[pair] = TA_Handler(
symbol=pair,
exchange=EXCHANGE,
screener=SCREENER,
interval=INTERVAL,
timeout= 10)
for pair in pairs:
try:
analysis = handler[pair].get_analysis()
except Exception as e:
print("Signalsample:")
print("Exception:")
print(e)
print (f'Coin: {pair}')
print (f'handler: {handler[pair]}')
oscCheck=0
maCheck=0
for indicator in OSC_INDICATORS:
if analysis.oscillators ['COMPUTE'][indicator] == 'BUY': oscCheck +=1
for indicator in MA_INDICATORS:
if analysis.moving_averages ['COMPUTE'][indicator] == 'BUY': maCheck +=1
if FULL_LOG:
print(f'Custsignalmod:{pair} Oscillators:{oscCheck}/{len(OSC_INDICATORS)} Moving averages:{maCheck}/{len(MA_INDICATORS)}')
if oscCheck >= OSC_THRESHOLD and maCheck >= MA_THRESHOLD:
signal_coins[pair] = pair
print(f'Custsignalmod: Signal detected on {pair} at {oscCheck}/{len(OSC_INDICATORS)} oscillators and {maCheck}/{len(MA_INDICATORS)} moving averages.')
with open('signals/custsignalmod.exs','a+') as f:
f.write(pair + '\n')
return signal_coins
def do_work():
signal_coins = {}
pairs = {}
pairs=[line.strip() for line in open(TICKERS)]
for line in open(TICKERS):
pairs=[line.strip() + PAIR_WITH for line in open(TICKERS)]
while True:
if not threading.main_thread().is_alive(): exit()
print(f'Custsignalmod: Analyzing {len(pairs)} coins')
signal_coins = analyze(pairs)
print(f'Custsignalmod: {len(signal_coins)} coins above {OSC_THRESHOLD}/{len(OSC_INDICATORS)} oscillators and {MA_THRESHOLD}/{len(MA_INDICATORS)} moving averages Waiting {TIME_TO_WAIT} minutes for next analysis.')
time.sleep((TIME_TO_WAIT*60))