-
Notifications
You must be signed in to change notification settings - Fork 2
/
Tradining_Strategies.py
198 lines (148 loc) · 8.3 KB
/
Tradining_Strategies.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import torch as tc
import numpy as np
import utils
# 3 strategies: buy-hold, MA, MOM
# Price process: [M, T], M is market dim, T is total time steps
# Portfolio weight: [M, 1] or [M, T], assuming static portfolio weights
class TradingStrategy(object):
def __init__(self, strategy: str, look_back: int or tuple, investment_horizon:tuple,
buy_signal_bounds:tuple, device=None):
assert strategy in ["buy-hold", "MA", "MOM"], ValueError("Invalid strategy!")
self.strategy = strategy
self.look_back = look_back # takes int for strategy == ["buy-hold", "MA"]; takes tuple for strategy == ["MOM"]
self.start, self.end = investment_horizon
self.signal_upper, self.signal_lower = buy_signal_bounds
# if tc.backends.mps.is_available(): # on MacOS
# self.device = tc.device("mps")
# elif tc.cuda.is_available(): # on WindowOS
# self.device = tc.device("cuda")
# else:
# self.device= tc.device("cpu")
self.device = tc.device(device) if device is not None else tc.device("cpu")
def get_portfolio_process(self, market_price_process: tc.Tensor,
portfolio_weight:tc.Tensor):
'''
:param market_price_process: [M, T]
:param portfolio_weight: [1, M] or [M, T], the way a portfolio is constructed.
:return: the portfolio value process (V_t) and return process (R_t), where
R_t = (S_t - S_{t-1})/S_{t-1}.
In our analysis, V_t is the price scenario and R_t is the PnL process.
For now, we assume that V_t and R_t are given, because portfolios are pre-constructed.
'''
if portfolio_weight.shape == market_price_process.shape: # all [M, T]
Vt = tc.multiply(portfolio_weight, market_price_process).sum(dim=0) # [1, T]
elif portfolio_weight.shape[0] == market_price_process.shape[0]:
Vt = tc.matmul(portfolio_weight.T, market_price_process) # [1,T]
else:
ValueError("market_price_process and portfolio_weight has mismatched dimension!")
Rt = (Vt[1:-1] - Vt[0:-2])/Vt[0:-2] # [1, T-1]
return Vt, Rt
def _get_strategy_PnL(self, market_price_process, portfolio_weight, return_signal=False):
'''
(This is used if PnL process is not given, for now this is deprecated, please use
the new function get_strategy_PnL() below. )
:param market_price_process: [M, T]
:param portfolio_weight: [1, M] or [M, T]
:param return_signal: if true, also return the trade signals, default is true.
:return: the PnL process if we trade following the self.strategy strategy.
'''
if self.strategy == "buy-hold":
Vt, Rt = self.get_portfolio_process(market_price_process, portfolio_weight)
PnL = Rt[self.start : self.end]
return PnL
elif self.strategy == "MA":
Vt, Rt = self.get_portfolio_process(market_price_process, portfolio_weight)
Rt_ma = utils.moving_average(Rt, window_size=self.look_back) # [1, T]
sell_signal = -((Rt[self.start : self.end] - Rt_ma[self.start : self.end]) > self.signal_upper).long().to(dtype=tc.float32) # astype(int)
buy_signal = ((Rt[self.start : self.end] - Rt_ma[self.start : self.end]) < - self.signal_lower).long().to(dtype=tc.float32)# astype(int)
signal = np.linspace(0,0,num=len(buy_signal)) + buy_signal + sell_signal
PnL = np.multiply(Rt[self.start : self.end], signal)
if return_signal:
return PnL, signal
else:
return PnL
elif self.strategy == "MOM":
Vt, Rt = self.get_portfolio_process(market_price_process, portfolio_weight)
if len(self.look_back) == 1:
Rt_ma_long = utils.moving_average(Rt, window_size=self.look_back) # [1, T]
Rt_ma_short = Rt
else:
Rt_ma_long = utils.moving_average(Rt, window_size=max(self.look_back)) # [1, T]
Rt_ma_short = utils.moving_average(Rt, window_size=min(self.look_back)) # [1, T]
sell_signal = -((Rt_ma_long[self.start : self.end] - Rt_ma_short[self.start : self.end]) > self.signal_upper).astype(int)
buy_signal = ((Rt_ma_long[self.start : self.end] - Rt_ma_short[self.start : self.end]) < -self.signal_lower).astype(int)
signal = np.linspace(0, 0, num=len(buy_signal)) + buy_signal + sell_signal
PnL = np.multiply(Rt[self.start : self.end], signal)
if return_signal:
return PnL, signal
else:
return PnL
def get_strategy_PnL(self, Rt, return_signal=False):
'''
:param Rt: The given return process of the pre-constructed portfolio
:param return_signal: if true, also return the trade signals, default is true.
:return: the PnL process if we trade following the self.strategy strategy, over the
trading horizon from self.strat to self.end.
'''
# print("--------------------------------")
# print(prices)
# Rt = tc.log(prices[:,:]/prices[:,0].reshape(-1,1))
# print(prices)
# print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
# # if tc.isnan(Rt).any():
# # print(prices,Rt)
if self.strategy == "buy-hold":
PnL = tc.as_tensor(Rt, dtype=tc.float32, device=self.device)
return PnL
elif self.strategy == "MA":
Rt_ma = utils.moving_average(Rt, window_size=self.look_back) # [1, T]
sell_signal = -((Rt - Rt_ma) > self.signal_upper).long().to(dtype=tc.float32)
buy_signal = ((Rt - Rt_ma) < - self.signal_lower).long().to(dtype=tc.float32)
signal = (tc.linspace(0, 0, steps=buy_signal.shape[-1]).to(device=self.device) + buy_signal + sell_signal).to(dtype=tc.float32)
PnL = tc.multiply(Rt, signal).to(dtype=tc.float32, device=self.device)
if return_signal:
return PnL, signal # [1, T]
else:
return PnL # [1, T]
elif self.strategy == "MOM":
if type(self.look_back) == int:
Rt_ma_long = utils.moving_average(Rt, window_size=self.look_back) # [1, T]
Rt_ma_short = Rt # [1, T]
else:
Rt_ma_long = utils.moving_average(Rt, window_size=max(self.look_back)) # [1, T]
Rt_ma_short = utils.moving_average(Rt, window_size=min(self.look_back)) # [1, T]
sell_signal = -((Rt_ma_long - Rt_ma_short) > self.signal_upper).long().to(dtype=tc.float32)
buy_signal = ((Rt_ma_long - Rt_ma_short) < -self.signal_lower).long().to(dtype=tc.float32)
signal = (tc.linspace(0, 0, steps=buy_signal.shape[-1]).to(device=self.device) + buy_signal + sell_signal).to(dtype=tc.float32)
PnL = tc.multiply(Rt, signal).to(dtype=tc.float32, device=self.device)
if return_signal:
return PnL, signal # [1, T]
else:
return PnL # [1, T]
if __name__ == '__main__':
from toy_example import toy_sampler
from matplotlib import pyplot as plt
ps = toy_sampler(500, 100)
ps.shape
rt = (ps[:, 1:] - ps[:, :-1])/ps[:, :-1]
rt_ma = utils.moving_average(rt, 10)
rt_ma.std()
(rt - rt_ma).min()
sell_signal = -((rt - rt_ma) > rt_ma.std()).long().to(dtype=tc.float32)
buy_signal = ((rt - rt_ma) < - rt_ma.std()).long().to(dtype=tc.float32)
signal = (tc.linspace(0, 0, steps=buy_signal.shape[-1]).to(device="mps") + buy_signal + sell_signal).to(
dtype=tc.float32)
PnL = tc.multiply(rt, signal).to(dtype=tc.float32, device="mps")
ps_ma = utils.moving_average(ps, 10)
ps_ma.std()
(ps - ps_ma).std()
sell_signal = -((ps - ps_ma) > (ps - ps_ma).std()).long().to(dtype=tc.float32)
buy_signal = ((ps - ps_ma) < - (ps - ps_ma).std()).long().to(dtype=tc.float32)
signal = (tc.linspace(0, 0, steps=buy_signal.shape[-1]).to(device="mps") + buy_signal + sell_signal).to(
dtype=tc.float32)
PnL = tc.multiply(ps, signal).to(dtype=tc.float32, device="mps")
plt.figure()
for i in tc.randint(0, PnL.shape[0], size=[5]):
plt.plot(list(range(PnL.shape[1])), PnL[i,:].cpu(), color="gray", alpha=0.1)
signal[1, :].cpu()
ma = TradingStrategy("MA", 10, (0,0), ())