-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathquote.py
160 lines (145 loc) · 6.43 KB
/
quote.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import shioaji as sj
from typing import List, Set
import polars as pl
# import polars_talib as plta
from shioaji.contracts import BaseContract
class QuoteManager:
def __init__(self, api: sj.Shioaji):
self.api = api
self.api.quote.set_on_tick_stk_v1_callback(self.on_stk_v1_tick_handler)
self.api.quote.set_on_tick_fop_v1_callback(self.on_fop_v1_tick_handler)
self.ticks_stk_v1: List[sj.TickSTKv1] = []
self.ticks_fop_v1: List[sj.TickFOPv1] = []
self.subscribed_stk_tick: Set[str] = set()
self.subscribed_fop_tick: Set[str] = set()
self.df_stk: pl.DataFrame = pl.DataFrame(
[],
schema=[
("datetime", pl.Datetime),
("code", pl.Utf8),
("price", pl.Float64),
("volume", pl.Int64),
("tick_type", pl.Int8),
],
)
self.df_fop: pl.DataFrame = pl.DataFrame(
[],
schema=[
("datetime", pl.Datetime),
("code", pl.Utf8),
("price", pl.Float64),
("volume", pl.Int64),
("tick_type", pl.Int8),
],
)
def on_stk_v1_tick_handler(self, _exchange: sj.Exchange, tick: sj.TickSTKv1):
self.ticks_stk_v1.append(tick)
def on_fop_v1_tick_handler(self, _exchange: sj.Exchange, tick: sj.TickFOPv1):
self.ticks_fop_v1.append(tick)
def fetch_ticks(self, contract: BaseContract) -> pl.DataFrame:
code = contract.code
ticks = self.api.ticks(contract)
df = pl.DataFrame(ticks.dict()).select(
pl.from_epoch("ts", time_unit="ns").dt.cast_time_unit("us").alias("datetime"),
pl.lit(code).alias("code"),
pl.col("close").alias("price"),
pl.col("volume").cast(pl.Int64),
pl.col("tick_type").cast(pl.Int8),
)
return df
def get_df_stk(self) -> pl.DataFrame:
poped_ticks, self.ticks_stk_v1 = self.ticks_stk_v1, []
if poped_ticks:
df = pl.DataFrame([tick.to_dict() for tick in poped_ticks]).select(
pl.col("datetime", "code"),
pl.col("close").cast(pl.Float64).alias("price"),
pl.col("volume").cast(pl.Int64),
pl.col("tick_type").cast(pl.Int8),
)
self.df_stk = self.df_stk.vstack(df)
return self.df_stk
def get_df_fop(self) -> pl.DataFrame:
poped_ticks, self.ticks_fop_v1 = self.ticks_fop_v1, []
if poped_ticks:
df = pl.DataFrame([tick.to_dict() for tick in poped_ticks]).select(
pl.col("datetime", "code"),
pl.col("close").cast(pl.Float64).alias("price"),
pl.col("volume").cast(pl.Int64),
pl.col("tick_type").cast(pl.Int8),
)
self.df_fop = self.df_fop.vstack(df)
return self.df_fop
def get_df_stk_kbar(
self, unit: str = "1m", exprs: List[pl.Expr] = []
) -> pl.DataFrame:
df = self.get_df_stk()
df = df.group_by(
pl.col("datetime").dt.truncate(unit),
pl.col("code"),
maintain_order=True,
).agg(
pl.col("price").first().alias("open"),
pl.col("price").max().alias("high"),
pl.col("price").min().alias("low"),
pl.col("price").last().alias("close"),
pl.col("volume").sum().alias("volume"),
)
if exprs:
df = df.with_columns(exprs)
return df
def subscribe_stk_tick(self, codes: List[str], recover: bool = False):
for code in codes:
contract = self.api.Contracts.Stocks[code]
if contract is not None and code not in self.subscribed_stk_tick:
self.api.quote.subscribe(contract, "tick")
self.subscribed_stk_tick.add(code)
if recover:
df = self.fetch_ticks(contract)
if not df.is_empty():
code_ticks = [t for t in self.ticks_stk_v1 if t.code == code]
if code_ticks:
t_first = code_ticks[0].datetime
df = df.filter(pl.col("datetime") < t_first)
self.df_stk = self.df_stk.vstack(df)
else:
self.df_stk = self.df_stk.vstack(df)
def subscribe_fop_tick(self, codes: List[str], recover: bool = False):
for code in codes:
contract = self.api.Contracts.Futures[code]
if contract is not None and code not in self.subscribed_fop_tick:
self.api.quote.subscribe(contract, "tick")
self.subscribed_fop_tick.add(code)
if recover:
df = self.fetch_ticks(contract)
if not df.is_empty():
code_ticks = [t for t in self.ticks_fop_v1 if t.code == code]
if code_ticks:
t_first = code_ticks[0].datetime
df = df.filter(pl.col("datetime") < t_first)
self.df_fop = self.df_fop.vstack(df)
else:
self.df_fop = self.df_fop.vstack(df)
def unsubscribe_stk_tick(self, codes: List[str]):
for code in codes:
contract = self.api.Contracts.Stocks[code]
if contract is not None and code in self.subscribed_stk_tick:
self.api.quote.unsubscribe(contract, "tick")
self.subscribed_stk_tick.remove(code)
def unsubscribe_fop_tick(self, codes: List[str]):
for code in codes:
contract = self.api.Contracts.Futures[code]
if contract is not None and code in self.subscribed_fop_tick:
self.api.quote.unsubscribe(contract, "tick")
self.subscribed_fop_tick.remove(code)
def unsubscribe_all_stk_tick(self):
for code in self.subscribed_stk_tick:
contract = self.api.Contracts.Stocks[code]
if contract is not None:
self.api.quote.unsubscribe(contract, "tick")
self.subscribed_stk_tick.clear()
def unsubscribe_all_fop_tick(self):
for code in self.subscribed_fop_tick:
contract = self.api.Contracts.Futures[code]
if contract is not None:
self.api.quote.unsubscribe(contract, "tick")
self.subscribed_fop_tick.clear()