-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathleverage_trading_env.py
204 lines (159 loc) · 8.11 KB
/
leverage_trading_env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import logging
import numpy as np
from dataclasses import dataclass
@dataclass
class info:
remain_time: int
budget: float
position: float
price_mean: float
cur_price: float
action2position = {
0: 0.2, # use 20% of budget to long
1: 0.4,
2: 0.6,
3: 0.8,
4: 1.0, # full long
5: 0, # no position
6: -0.2, # use 20% of budget to short
7: -0.4,
8: -0.6,
9: -0.8,
10: -1.0, # full short
}
class TradingEnv:
def __init__(self, env_id, df, sample_len, obs_data_len, step_len,
fee, initial_budget, deal_col_name='c',
feature_names=['c', 'v'], leverage=3, sell_at_end=True, *args, **kwargs):
assert 0 <= fee <= 0.01, "fee must be between 0 and 1 (0% to 1%)"
assert deal_col_name in df.columns, "deal_col not in Dataframe please define the correct column name of which column want to calculate the profit."
for col in feature_names:
assert col in df.columns, "feature name: {} not in Dataframe.".format(col)
self.total_fee = 0
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(message)s')
self.logger = logging.getLogger(env_id)
self.df = df
self.sample_len = sample_len
self.obs_len = obs_data_len
self.step_len = step_len
self.fee = fee
self.initial_budget = initial_budget
self.budget = initial_budget
self.feature_len = len(feature_names)
self.observation_space = np.array([obs_data_len, self.feature_len])
self.using_feature = feature_names
self.price_name = deal_col_name
self.leverage = leverage # 10x -> 0.1, 100x -> 0.01
def _random_choice_section(self):
begin_point = np.random.randint(len(self.df) - self.sample_len + 1)
end_point = begin_point + self.sample_len
df_section = self.df.iloc[begin_point: end_point]
return df_section
def reset(self):
self.total_fee = 0
self.df_sample = self._random_choice_section()
self.step_st = 0
# define the price to calculate the reward
self.price = self.df_sample[self.price_name].to_numpy()
self.long_liquidation_price = 0
self.short_liquidation_price = np.Inf
self.remain_time = (self.sample_len - self.obs_len) / self.step_len
self.budget = self.initial_budget
self.position = 0
self.price_mean = 0
self.margin = 0
# define the observation feature
self.obs_features = self.df_sample[self.using_feature].to_numpy()
# maybe make market position feature in final feature, set as option
# observation part : features + eps 남은 기간 + 현재 포지션 + budget (평단가는 고려안함)
self.obs_state = self.obs_features[self.step_st: self.step_st + self.obs_len]
# self.state = np.hstack(
# [self.obs_state, ])
return self.obs_state, info(self.remain_time, self.budget, self.position, self.price_mean, self.obs_state[-1][3])
@staticmethod
def get_liq_price(price_mean, budget, position):
if position >= 0: # long_position
liq_price = price_mean - budget/(position + 1e-9)
else:
liq_price = price_mean + budget/(position + 1e-9)
return liq_price
def test_state(self):
self.total_fee = 0
self.df_sample = self.df.iloc[-self.obs_len:]
self.price = self.df_sample[self.price_name].to_numpy()
self.remain_time = (self.sample_len - self.obs_len) / self.step_len
self.budget = self.initial_budget
self.position = 0
self.price_mean = 0
self.obs_state = self.df_sample[self.using_feature].to_numpy()
return self.obs_state, info(self.remain_time, self.budget, self.position, self.price_mean, self.obs_state[-1][3])
def step(self, action): # action = [-1 , 1]
"""
action : what position to choose at next_state
ex) current pos : BTC 1.0 , action=1.0 -> Use all available cash to extend long position
current pos : BTC 1.0 , action=-1.0 -> Sell all BTC and use all available cash to open short position
current pos : BTC 1.0 , action=0 -> Clear you position. Sell all BTC and do nothing.
Reward is generated when you modify your position.
This means that no reward is generated when opening a position.
For example,
Changing position to -1.0BTC to 1.0BTC
=> reward is generated when you clear your position by -1.0BTC to 0.0BTC
but, not generated by opening a long position 0.0BTC to 1.0BTC
:param action: [long, short] * n_interval + clear_action
:return: next_state, reward, done, info[remain_time, budget, position, price_mean, current_price]
"""
current_price = self.price[self.step_st + self.obs_len - 1]
# next_price = self.price[self.step_st + self.obs_len - 1]
pnl = (current_price - self.price_mean) * self.position
self.budget += pnl
liquidation_price = TradingEnv.get_liq_price(self.price_mean, self.budget, self.position)
low = self.obs_state[-1][2]
high = self.obs_state[-1][1]
if (self.position > 0 and low < liquidation_price) or \
(self.position < 0 and high > liquidation_price): # long liquidation or short liquidation
reward = pnl / self.initial_budget
self.position = 0
if self.budget < 10: # if budget is lower than 10 due to liquidation : game ends
done = True
return self.obs_state, np.clip(reward, -1, 1), done, info(0, 0, 0, 0, self.obs_state[-1][3])
current_price_mean = self.price_mean
current_mkt_position = self.position
current_asset = self.budget
# observation part
self.remain_time -= 1
self.step_st += self.step_len
self.obs_state = self.obs_features[self.step_st: self.step_st + self.obs_len]
done = False
reward = pnl / self.initial_budget
if self.step_st + self.obs_len >= len(self.obs_features) and self.remain_time == 0: # episode ends
done = True
reward = (current_price - current_price_mean) * current_mkt_position / self.initial_budget
return self.obs_state, np.clip(reward, -1, 1), done, info(self.remain_time, self.budget, self.position,
self.price_mean, current_price)
if action == 0: # Clear position
self.price_mean = 0
self.position = 0
return self.obs_state, np.clip(reward, -1, 1), done, info(self.remain_time, self.budget, self.position, self.price_mean, current_price)
# 보유수량, 평단가, pnl 계산
target_position = current_asset * action * self.leverage / current_price # 같은 action 이여도 current_asset 이 줄면 position 에 변화를 주게 되는구나
if action > 0: # If target position is long
if self.position < 0: # full short_cover
self.price_mean = 0
self.position = 0
if self.position > target_position: # partial long cover
self.position = target_position
else: # extend long position
self.price_mean = (current_price_mean * self.position + current_price * (
target_position - self.position)) / target_position
self.position = target_position
else: # target position is short
if self.position > 0: # full long_cover
self.price_mean = 0
self.position = 0
if self.position < target_position: # partial short cover
self.position = target_position
else: # extend short position
self.price_mean = (self.position * current_price_mean + current_price * (
target_position - self.position)) / target_position
self.position = target_position
return self.obs_state, np.clip(reward, -1, 1), done, info(self.remain_time, self.budget, self.position, self.price_mean, current_price)