-
Notifications
You must be signed in to change notification settings - Fork 4
/
tinysoft_datafeed.py
181 lines (145 loc) · 5.59 KB
/
tinysoft_datafeed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
from datetime import datetime, timedelta
from typing import List, Optional
from pytz import timezone
from vnpy.trader.setting import SETTINGS
from vnpy.trader.constant import Exchange, Interval
from vnpy.trader.object import BarData, TickData, HistoryRequest
from vnpy.trader.utility import extract_vt_symbol
from vnpy.trader.datafeed import BaseDatafeed
from .pyTSL import Client, DoubleToDatetime
CHINA_TZ = timezone("Asia/Shanghai")
EXCHANGE_MAP = {
Exchange.SSE: "SH",
Exchange.SZSE: "SZ"
}
INTERVAL_MAP = {
Interval.MINUTE: "cy_1m",
Interval.HOUR: "cy_60m",
Interval.DAILY: "cy_day",
}
SHIFT_MAP = {
Interval.MINUTE: timedelta(minutes=1),
Interval.HOUR: timedelta(hours=1),
}
class TinysoftDatafeed(BaseDatafeed):
"""天软数据服务接口"""
def __init__(self):
""""""
self.username: str = SETTINGS["datafeed.username"]
self.password: str = SETTINGS["datafeed.password"]
self.client: Client = None
self.inited: bool = False
def init(self) -> bool:
"""初始化"""
if self.inited:
return True
self.client = Client(
self.username,
self.password,
"tsl.tinysoft.com.cn",
443
)
n = self.client.login()
if n != 1:
return False
self.inited = True
return True
def query_bar_history(self, req: HistoryRequest) -> Optional[List[BarData]]:
"""查询K线数据"""
if not self.inited:
self.init()
symbol, exchange = extract_vt_symbol(req.vt_symbol)
tsl_exchange = EXCHANGE_MAP.get(exchange, "")
tsl_interval = INTERVAL_MAP[req.interval]
bars: List[BarData] = []
start_str = req.start.strftime("%Y%m%d")
end_str = req.end.strftime("%Y%m%d")
cmd = (
f"setsysparam(pn_cycle(),{tsl_interval}());"
"return select * from markettable "
f"datekey {start_str}T to {end_str}T "
f"of '{tsl_exchange}{symbol}' end;"
)
result = self.client.exec(cmd)
if not result.error():
data = result.value()
shift = SHIFT_MAP.get(req.interval, None)
for d in data:
dt: datetime = DoubleToDatetime(d["date"])
if shift:
dt -= shift
bar = BarData(
symbol=symbol,
exchange=exchange,
datetime=CHINA_TZ.localize(dt),
interval=req.interval,
open_price=d["open"],
high_price=d["high"],
low_price=d["low"],
close_price=d["close"],
volume=d["vol"],
turnover=d["amount"],
gateway_name="TSL"
)
# 期货则获取持仓量字段
if not tsl_exchange:
bar.open_interest = d["sectional_cjbs"]
bars.append(bar)
return bars
def query_tick_history(self, req: HistoryRequest) -> Optional[List[TickData]]:
"""查询Tick数据"""
if not self.inited:
self.init()
symbol, exchange = extract_vt_symbol(req.vt_symbol)
tsl_exchange = EXCHANGE_MAP.get(exchange, "")
ticks: List[TickData] = []
dt: datetime = req.start
while dt <= req.end:
date_str = dt.strftime("%Y%m%d")
cmd = f"return select * from tradetable datekey {date_str}T to {date_str}T+16/24 of '{tsl_exchange}{symbol}' end ; "
result = self.client.exec(cmd)
if not result.error():
data = result.value()
for d in data:
dt = DoubleToDatetime(d["date"])
dt = CHINA_TZ.localize(dt)
tick = TickData(
symbol=symbol,
exchange=exchange,
name=d["StockName"],
datetime=dt,
open_price=d["sectional_open"],
high_price=d["sectional_high"],
low_price=d["sectional_low"],
last_price=d["price"],
volume=d["sectional_vol"],
turnover=d["sectional_amount"],
bid_price_1=d["buy1"],
bid_price_2=d["buy2"],
bid_price_3=d["buy3"],
bid_price_4=d["buy4"],
bid_price_5=d["buy5"],
ask_price_1=d["sale1"],
ask_price_2=d["sale2"],
ask_price_3=d["sale3"],
ask_price_4=d["sale4"],
ask_price_5=d["sale5"],
bid_volume_1=d["bc1"],
bid_volume_2=d["bc2"],
bid_volume_3=d["bc3"],
bid_volume_4=d["bc4"],
bid_volume_5=d["bc5"],
ask_volume_1=d["sc1"],
ask_volume_2=d["sc2"],
ask_volume_3=d["sc3"],
ask_volume_4=d["sc4"],
ask_volume_5=d["sc5"],
localtime=dt,
gateway_name="TSL"
)
# 期货则获取持仓量字段
if not tsl_exchange:
tick.open_interest = d["sectional_cjbs"]
ticks.append(tick)
dt += timedelta(days=1)
return ticks