--- title: "完整实例 | 迅投知识库" source: "https://dict.thinktrader.net/nativeApi/code_examples.html#%E6%8C%87%E5%AE%9A%E5%88%9D%E5%A7%8B%E5%8C%96%E8%A1%8C%E6%83%85%E8%BF%9E%E6%8E%A5%E8%8C%83%E5%9B%B4" author: published: created: 2025-08-01 description: "QMT说明, QMT-Python-API, QMT用户手册。" tags: - "clippings" --- ## 行情示例 ### 获取行情示例 ```python # 用前须知 ## xtdata提供和MiniQmt的交互接口,本质是和MiniQmt建立连接,由MiniQmt处理行情数据请求,再把结果回传返回到python层。使用的行情服务器以及能获取到的行情数据和MiniQmt是一致的,要检查数据或者切换连接时直接操作MiniQmt即可。 ## 对于数据获取接口,使用时需要先确保MiniQmt已有所需要的数据,如果不足可以通过补充数据接口补充,再调用数据获取接口获取。 ## 对于订阅接口,直接设置数据回调,数据到来时会由回调返回。订阅接收到的数据一般会保存下来,同种数据不需要再单独补充。 # 代码讲解 # 从本地python导入xtquant库,如果出现报错则说明安装失败 from xtquant import xtdata import time # 设定一个标的列表 code_list = ["000001.SZ"] # 设定获取数据的周期 period = "1d" # 下载标的行情数据 if 1: ## 为了方便用户进行数据管理,xtquant的大部分历史数据都是以压缩形式存储在本地的 ## 比如行情数据,需要通过download_history_data下载,财务数据需要通过 ## 所以在取历史数据之前,我们需要调用数据下载接口,将数据下载到本地 for i in code_list: xtdata.download_history_data(i,period=period,incrementally=True) # 增量下载行情数据(开高低收,等等)到本地 xtdata.download_financial_data(code_list) # 下载财务数据到本地 xtdata.download_sector_data() # 下载板块数据到本地 # 更多数据的下载方式可以通过数据字典查询 # 读取本地历史行情数据 history_data = xtdata.get_market_data_ex([],code_list,period=period,count=-1) print(history_data) print("=" * 20) # 如果需要盘中的实时行情,需要向服务器进行订阅后才能获取 # 订阅后,get_market_data函数于get_market_data_ex函数将会自动拼接本地历史行情与服务器实时行情 # 向服务器订阅数据 for i in code_list: xtdata.subscribe_quote(i,period=period,count=-1) # 设置count = -1来取到当天所有实时行情 # 等待订阅完成 time.sleep(1) # 获取订阅后的行情 kline_data = xtdata.get_market_data_ex([],code_list,period=period) print(kline_data) # 获取订阅后的行情,并以固定间隔进行刷新,预期会循环打印10次 for i in range(10): # 这边做演示,就用for来循环了,实际使用中可以用while True kline_data = xtdata.get_market_data_ex([],code_list,period=period) print(kline_data) time.sleep(3) # 三秒后再次获取行情 # 如果不想用固定间隔触发,可以以用订阅后的回调来执行 # 这种模式下当订阅的callback回调函数将会异步的执行,每当订阅的标的tick发生变化更新,callback回调函数就会被调用一次 # 本地已有的数据不会触发callback # 定义的回测函数 ## 回调函数中,data是本次触发回调的数据,只有一条 def f(data): # print(data) code_list = list(data.keys()) # 获取到本次触发的标的代码 kline_in_callabck = xtdata.get_market_data_ex([],code_list,period = period) # 在回调中获取klines数据 print(kline_in_callabck) for i in code_list: xtdata.subscribe_quote(i,period=period,count=-1,callback=f) # 订阅时设定回调函数 # 使用回调时,必须要同时使用xtdata.run()来阻塞程序,否则程序运行到最后一行就直接结束退出了。 xtdata.run() ``` ### 连接VIP服务器 ```python # 导入 xtdatacenter 模块 import sys print("Python 版本:", sys.version) import time import pandas as pd from xtquant import xtdatacenter as xtdc from xtquant import xtdata ''' 设置用于登录行情服务的token,此接口应该先于 init_quote 调用 token可以从投研用户中心获取 https://xuntou.net/#/userInfo ''' xtdc.set_token('这里输入token') ''' 设置连接池,使服务器只在连接池内优选 建议将VIP服务器设为连接池 ''' addr_list = [ '115.231.218.73:55310', '115.231.218.79:55310', '42.228.16.211:55300', '42.228.16.210:55300', '36.99.48.20:55300', '36.99.48.21:55300' ] xtdc.set_allow_optmize_address(addr_list) xtdc.set_kline_mirror_enabled(True) # 开启K线全推功能(vip),以获取全市场实时K线数据 """ 初始化 """ xtdc.init() ## 监听端口 port = xtdc.listen(port = 58621) # 指定固定端口进行连接 # port = xtdc.listen(port = (58620, 58630))[1] 通过指定port范围,可以让xtdc在范围内自动寻找可用端口 xtdata.connect(port=port) print('-----连接上了------') print(xtdata.data_dir) servers = xtdata.get_quote_server_status() # print(servers) for k, v in servers.items(): print(k, v) xtdata.run() ``` ### 连接指定服务器 ```python import time from xtquant import xtdata #用token方式连接,不需要账号密码 #其他连接方式,需要账号密码 info = {"ip": '115.231.218.73', "port": 55300, "username": '', "pwd": ''} connect_success = 0 def func(d): ip = d.get('ip', '') port = d.get('port') status = d.get('status', 'disconnected') global connect_success if ip == info['ip'] and port == info['port']: if status == 'connected': connect_success = 1 else: connect_success = 2 # 注册连接回调信息 xtdata.watch_quote_server_status(func) # 行情连接 qs = xtdata.QuoteServer(info) qs.connect() # 获取当前数据连接站点 data_server_info = xtdata.get_quote_server_status() # 显示当前数据连接站点 if 1: for k,v in data_server_info.items(): print(f"data:{k}, connect info:{v.info}") # 等待连接状态 while connect_success == 0: time.sleep(0.3) if connect_success == 2: print("连接失败") ``` ### 指定初始化行情连接范围 ```python if 1: from xtquant import xtdatacenter as xtdc ## 设置数据目录 xtdc.set_data_home_dir('data') ## 设置token token = "你的token" xtdc.set_token(token) ## 限定行情站点的优选范围 opt_list = [ '115.231.218.73:55310', '115.231.218.79:55310', '42.228.16.210:55300', '42.228.16.211:55300', '36.99.48.20:55300', '36.99.48.21:55300', ] xtdc.set_allow_optmize_address(opt_list) ## 开启指定市场的K线全推 xtdc.set_kline_mirror_markets(['SH', 'SZ', 'BJ']) ## 设置要初始化的市场列表 init_markets = [ 'SH', 'SZ', 'BJ', #'DF', 'GF', 'IF', 'SF', 'ZF', 'INE', #'SHO', 'SZO', ] xtdc.set_init_markets(init_markets) ## 初始化xtdc模块 xtdc.init(start_local_service = False) ## 监听端口 #xtdc.listen(port = 58620) listen_port = xtdc.listen(port = (58620, 58650)) #import code; code.interact(local = locals()) import xtquant.xtdata as xtdata xtdata.connect(port = listen_port) import code; code.interact(local = locals()) ``` ### 订阅全推数据/下载历史数据 ### 获取对手价 ```python # 以卖出为例 import pandas as pd import numpy as np from xtquant import xtdata to_do_trade_list = ["000001.SZ"] tick = xtdata.get_full_tick(to_do_trade_list) # 取买一价为对手价,若买一价为0,说明已经跌停,则取最新价 for i in tick: fix_price = tick[i]["bidPrice"][0] if tick[i]["bidPrice"][0] != 0 else tick[i]["lastPrice"] print(fix_price) ``` ```python 10.01 ``` ### 复权计算方式 ```python #coding:utf-8 import numpy as np import pandas as pd from xtquant import xtdata #def gen_divid_ratio(quote_datas, divid_datas): # drl = [] # for qi in range(len(quote_datas)): # q = quote_datas.iloc[qi] # dr = 1.0 # for di in range(len(divid_datas)): # d = divid_datas.iloc[di] # if d.name <= q.name: # dr *= d['dr'] # drl.append(dr) # return pd.DataFrame(drl, index = quote_datas.index, columns = quote_datas.columns) def gen_divid_ratio(quote_datas, divid_datas): drl = [] dr = 1.0 qi = 0 qdl = len(quote_datas) di = 0 ddl = len(divid_datas) while qi < qdl and di < ddl: qd = quote_datas.iloc[qi] dd = divid_datas.iloc[di] if qd.name >= dd.name: dr *= dd['dr'] di += 1 if qd.name <= dd.name: drl.append(dr) qi += 1 while qi < qdl: drl.append(dr) qi += 1 return pd.DataFrame(drl, index = quote_datas.index, columns = quote_datas.columns) def process_forward_ratio(quote_datas, divid_datas): drl = gen_divid_ratio(quote_datas, divid_datas) drlf = drl / drl.iloc[-1] result = (quote_datas * drlf).apply(lambda x: round(x, 2)) return result def process_backward_ratio(quote_datas, divid_datas): drl = gen_divid_ratio(quote_datas, divid_datas) result = (quote_datas * drl).apply(lambda x: round(x, 2)) return result def process_forward(quote_datas1, divid_datas): quote_datas = quote_datas1.copy() def calc_front(v, d): return ((v - d['interest'] + d['allotPrice'] * d['allotNum']) / (1 + d['allotNum'] + d['stockBonus'] + d['stockGift'])) for qi in range(len(quote_datas)): q = quote_datas.iloc[qi] for di in range(len(divid_datas)): d = divid_datas.iloc[di] if d.name <= q.name: continue q.iloc[0] = calc_front(q.iloc[0], d) return quote_datas def process_backward(quote_datas1, divid_datas): quote_datas = quote_datas1.copy() def calc_back(v, d): return ((v * (1.0 + d['stockGift'] + d['stockBonus'] + d['allotNum']) + d['interest'] - d['allotNum'] * d['allotPrice'])) for qi in range(len(quote_datas)): q = quote_datas.iloc[qi] for di in range(len(divid_datas) - 1, -1, -1): d = divid_datas.iloc[di] if d.name > q.name: continue q.iloc[0] = calc_back(q.iloc[0], d) return quote_datas #-------------------------------- s = '002594.SZ' #xtdata.download_history_data(s, '1d', '20100101', '') dd = xtdata.get_divid_factors(s) print(dd) #复权计算用于处理价格字段 field_list = ['open', 'high', 'low', 'close'] datas_ori = xtdata.get_market_data(field_list, [s], '1d', dividend_type = 'none')['close'].T #print(datas_ori) #等比前复权 datas_forward_ratio = process_forward_ratio(datas_ori, dd) print('datas_forward_ratio', datas_forward_ratio) #等比后复权 datas_backward_ratio = process_backward_ratio(datas_ori, dd) print('datas_backward_ratio', datas_backward_ratio) #前复权 datas_forward = process_forward(datas_ori, dd) print('datas_forward', datas_forward) #后复权 datas_backward = process_backward(datas_ori, dd) print('datas_backward', datas_backward) ``` ### 根据商品期货期权代码获取对应的商品期货合约代码 ```python from xtquant import xtdata def get_option_underline_code(code:str) -> str: """ 注意:该函数不适用于股指期货期权与ETF期权 Todo: 根据商品期权代码获取对应的具体商品期货合约 Args: code:str 期权代码 Return: 对应的期货合约代码 """ Exchange_dict = { "SHFE":"SF", "CZCE":"ZF", "DCE":"DF", "INE":"INE", "GFEX":"GF" } if code.split(".")[-1] not in [v for k,v in Exchange_dict.items()]: raise KeyError("此函数不支持该交易所合约") info = xtdata.get_option_detail_data(code) underline_code = info["OptUndlCode"] + "." + Exchange_dict[info["OptUndlMarket"]] return underline_code if __name__ == "__main__": symbol_code = get_option_underline_code('sc2403C465.INE') # 获取期权合约'sc2403C465.INE'对应的期货合约代码 print(symbol_code) ``` ```python 'sc2403.INE' ``` ### 根据指数代码,返回对应的期货合约 ```python from xtquant import xtdata import re def get_financial_futures_code_from_index(index_code:str) -> list: """ ToDo:传入指数代码,返回对应的期货合约(当前) Args: index_code:指数代码,如"000300.SH","000905.SH" Retuen: list: 对应期货合约列表 """ financial_futures = xtdata.get_stock_list_in_sector("中金所") future_list = [] pattern = r'^[a-zA-Z]{1,2}\d{3,4}\.[A-Z]{2}$' for i in financial_futures: if re.match(pattern,i): future_list.append(i) ls = [] for i in future_list: _info = xtdata._get_instrument_detail(i) _index_code = _info["ExtendInfo"]['OptUndlCode'] + "." + _info["ExtendInfo"]['OptUndlMarket'] if _index_code == index_code: ls.append(i) return ls if __name__ == "__main__": ls = get_financial_futures_code_from_index("000905.SH") print(ls) ``` ```python ['IC2402.IF', 'IC2403.IF', 'IC2406.IF', 'IC2409.IF'] ``` ## 交易示例 ### 简单买卖各一笔示例 **需要调整的参数:** - `98` 行的 `path` 变量需要改为本地客户端路径,券商端指定到 f"{安装目录}\\userdata\_mini",投研端指定到f"{安装目录}\\userdata" - `107` 行的资金账号需要调整为自身资金账号 ```python # coding:utf-8 import time, datetime, traceback, sys from xtquant import xtdata from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant # 定义一个类 创建类的实例 作为状态的容器 class _a(): pass A = _a() A.bought_list = [] A.hsa = xtdata.get_stock_list_in_sector('沪深A股') def interact(): """执行后进入repl模式""" import code code.InteractiveConsole(locals=globals()).interact() xtdata.download_sector_data() class MyXtQuantTraderCallback(XtQuantTraderCallback): def on_disconnected(self): """ 连接断开 :return: """ print(datetime.datetime.now(), '连接断开回调') def on_stock_order(self, order): """ 委托回报推送 :param order: XtOrder对象 :return: """ print(datetime.datetime.now(), '委托回调 投资备注', order.order_remark) def on_stock_trade(self, trade): """ 成交变动推送 :param trade: XtTrade对象 :return: """ print(datetime.datetime.now(), '成交回调', trade.order_remark, f"委托方向(48买 49卖) {trade.offset_flag} 成交价格 {trade.traded_price} 成交数量 {trade.traded_volume}") def on_order_error(self, order_error): """ 委托失败推送 :param order_error:XtOrderError 对象 :return: """ # print("on order_error callback") # print(order_error.order_id, order_error.error_id, order_error.error_msg) print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}") def on_cancel_error(self, cancel_error): """ 撤单失败推送 :param cancel_error: XtCancelError 对象 :return: """ print(datetime.datetime.now(), sys._getframe().f_code.co_name) def on_order_stock_async_response(self, response): """ 异步下单回报推送 :param response: XtOrderResponse 对象 :return: """ print(f"异步委托回调 投资备注: {response.order_remark}") def on_cancel_order_stock_async_response(self, response): """ :param response: XtCancelOrderResponse 对象 :return: """ print(datetime.datetime.now(), sys._getframe().f_code.co_name) def on_account_status(self, status): """ :param response: XtAccountStatus 对象 :return: """ print(datetime.datetime.now(), sys._getframe().f_code.co_name) if __name__ == '__main__': print("start") # 指定客户端所在路径, 券商端指定到 userdata_mini文件夹 # 注意:如果是连接投研端进行交易,文件目录需要指定到f"{安装目录}\userdata" path = r'D:\qmt\投研\迅投极速交易终端睿智融科版\userdata' # 生成session id 整数类型 同时运行的策略不能重复 session_id = int(time.time()) xt_trader = XtQuantTrader(path, session_id) # 开启主动请求接口的专用线程 开启后在on_stock_xxx回调函数里调用XtQuantTrader.query_xxx函数不会卡住回调线程,但是查询和推送的数据在时序上会变得不确定 # 详见: http://docs.thinktrader.net/vip/pages/ee0e9b/#开启主动请求接口的专用线程 # xt_trader.set_relaxed_response_order_enabled(True) # 创建资金账号为 800068 的证券账号对象 股票账号为STOCK 信用CREDIT 期货FUTURE acc = StockAccount('2000128', 'STOCK') # 创建交易回调类对象,并声明接收回调 callback = MyXtQuantTraderCallback() xt_trader.register_callback(callback) # 启动交易线程 xt_trader.start() # 建立交易连接,返回0表示连接成功 connect_result = xt_trader.connect() print('建立交易连接,返回0表示连接成功', connect_result) # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功 subscribe_result = xt_trader.subscribe(acc) print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result) #取账号信息 account_info = xt_trader.query_stock_asset(acc) #取可用资金 available_cash = account_info.m_dCash print(acc.account_id, '可用资金', available_cash) #查账号持仓 positions = xt_trader.query_stock_positions(acc) #取各品种 总持仓 可用持仓 position_total_dict = {i.stock_code : i.m_nVolume for i in positions} position_available_dict = {i.stock_code : i.m_nCanUseVolume for i in positions} print(acc.account_id, '持仓字典', position_total_dict) print(acc.account_id, '可用持仓字典', position_available_dict) #买入 浦发银行 最新价 两万元 stock = '600000.SH' target_amount = 20000 full_tick = xtdata.get_full_tick([stock]) print(f"{stock} 全推行情: {full_tick}") current_price = full_tick[stock]['lastPrice'] #买入金额 取目标金额 与 可用金额中较小的 buy_amount = min(target_amount, available_cash) #买入数量 取整为100的整数倍 buy_vol = int(buy_amount / current_price / 100) * 100 print(f"当前可用资金 {available_cash} 目标买入金额 {target_amount} 买入股数 {buy_vol}股") async_seq = xt_trader.order_stock_async(acc, stock, xtconstant.STOCK_BUY, buy_vol, xtconstant.FIX_PRICE, current_price, 'strategy_name', stock) #卖出 500股 stock = '513130.SH' #目标数量 target_vol = 500 #可用数量 available_vol = position_available_dict[stock] if stock in position_available_dict else 0 #卖出量取目标量与可用量中较小的 sell_vol = min(target_vol, available_vol) print(f"{stock} 目标卖出量 {target_vol} 可用数量 {available_vol} 卖出 {sell_vol}股") if sell_vol > 0: async_seq = xt_trader.order_stock_async(acc, stock, xtconstant.STOCK_SELL, sell_vol, xtconstant.LATEST_PRICE, -1, 'strategy_name', stock) print(f"下单完成 等待回调") # 阻塞主线程退出 xt_trader.run_forever() # 如果使用vscode pycharm等本地编辑器 可以进入交互模式 方便调试 (把上一行的run_forever注释掉 否则不会执行到这里) interact() ``` ### 单股订阅实盘示例 **需要调整的参数:** - `113` 行的 `path` 变量需要改为本地客户端路径,券商端指定到 f"{安装目录}\\userdata\_mini",投研端指定到f"{安装目录}\\userdata" - `122` 行的资金账号需要调整为自身资金账号 ```python # coding:utf-8 import time, datetime, traceback, sys from xtquant import xtdata from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant # 定义一个类 创建类的实例 作为状态的容器 class _a(): pass A = _a() A.bought_list = [] A.hsa = xtdata.get_stock_list_in_sector('沪深A股') def interact(): """执行后进入repl模式""" import code code.InteractiveConsole(locals=globals()).interact() xtdata.download_sector_data() def f(data): print(data) now = datetime.datetime.now() for stock in data: if stock not in A.hsa: continue cuurent_price = data[stock][0]['close'] pre_price = data[stock][0]['preClose'] ratio = cuurent_price / pre_price - 1 if pre_price > 0 else 0 if ratio > 0.09 and stock not in A.bought_list: print(f"{now} 最新价 买入 {stock} 100股") async_seq = xt_trader.order_stock_async(acc, stock, xtconstant.STOCK_BUY, 100, xtconstant.LATEST_PRICE, -1, 'strategy_name', stock) A.bought_list.append(stock) class MyXtQuantTraderCallback(XtQuantTraderCallback): def on_disconnected(self): """ 连接断开 :return: """ print(datetime.datetime.now(), '连接断开回调') def on_stock_order(self, order): """ 委托回报推送 :param order: XtOrder对象 :return: """ print(datetime.datetime.now(), '委托回调', order.order_remark) def on_stock_trade(self, trade): """ 成交变动推送 :param trade: XtTrade对象 :return: """ print(datetime.datetime.now(), '成交回调', trade.order_remark) def on_order_error(self, order_error): """ 委托失败推送 :param order_error:XtOrderError 对象 :return: """ # print("on order_error callback") # print(order_error.order_id, order_error.error_id, order_error.error_msg) print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}") def on_cancel_error(self, cancel_error): """ 撤单失败推送 :param cancel_error: XtCancelError 对象 :return: """ print(datetime.datetime.now(), sys._getframe().f_code.co_name) def on_order_stock_async_response(self, response): """ 异步下单回报推送 :param response: XtOrderResponse 对象 :return: """ print(f"异步委托回调 {response.order_remark}") def on_cancel_order_stock_async_response(self, response): """ :param response: XtCancelOrderResponse 对象 :return: """ print(datetime.datetime.now(), sys._getframe().f_code.co_name) def on_account_status(self, status): """ :param response: XtAccountStatus 对象 :return: """ print(datetime.datetime.now(), sys._getframe().f_code.co_name) if __name__ == '__main__': print("start") # 指定客户端所在路径, 券商端指定到 userdata_mini文件夹 # 注意:如果是连接投研端进行交易,文件目录需要指定到f"{安装目录}\userdata" path = r'D:\qmt\投研\迅投极速交易终端睿智融科版\userdata' # 生成session id 整数类型 同时运行的策略不能重复 session_id = int(time.time()) xt_trader = XtQuantTrader(path, session_id) # 开启主动请求接口的专用线程 开启后在on_stock_xxx回调函数里调用XtQuantTrader.query_xxx函数不会卡住回调线程,但是查询和推送的数据在时序上会变得不确定 # 详见: http://docs.thinktrader.net/vip/pages/ee0e9b/#开启主动请求接口的专用线程 # xt_trader.set_relaxed_response_order_enabled(True) # 创建资金账号为 800068 的证券账号对象 股票账号为STOCK 信用CREDIT 期货FUTURE acc = StockAccount('2000128', 'STOCK') # 创建交易回调类对象,并声明接收回调 callback = MyXtQuantTraderCallback() xt_trader.register_callback(callback) # 启动交易线程 xt_trader.start() # 建立交易连接,返回0表示连接成功 connect_result = xt_trader.connect() print('建立交易连接,返回0表示连接成功', connect_result) # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功 subscribe_result = xt_trader.subscribe(acc) print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result) #订阅的品种列表 code_list = ['600000.SH', '000001.SZ'] for code in code_list: xtdata.subscribe_quote(code, '1d', callback = f) # 阻塞主线程退出 xt_trader.run_forever() # 如果使用vscode pycharm等本地编辑器 可以进入交互模式 方便调试 (把上一行的run_forever注释掉 否则不会执行到这里) interact() ``` ### 全推订阅实盘示例 本示例用于展示如何订阅上海及深圳市场全推,对于沪深A股品种策略进行判断当前涨幅超过 9 个点的买入 200 股 **需要调整的参数:** - `111` 行的 `path` 变量需要改为本地客户端路径 - `116` 行的资金账号需要调整为自身资金账号 注意 本策略只用于提供策略写法及参考,若您直接进行实盘下单,造成损失本网站不负担责任。 ```python #coding:utf-8 import time, datetime, traceback, sys from xtquant import xtdata from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant #定义一个类 创建类的实例 作为状态的容器 class _a(): pass A = _a() A.bought_list = [] A.hsa = xtdata.get_stock_list_in_sector('沪深A股') def interact(): """执行后进入repl模式""" import code code.InteractiveConsole(locals=globals()).interact() xtdata.download_sector_data() def f(data): now = datetime.datetime.now() for stock in data: if stock not in A.hsa: continue cuurent_price = data[stock][0]['lastPrice'] pre_price = data[stock][0]['lastClose'] ratio = cuurent_price / pre_price - 1 if pre_price > 0 else 0 if ratio > 0.09 and stock not in A.bought_list: print(f"{now} 最新价 买入 {stock} 200股") async_seq = xt_trader.order_stock_async(acc, stock, xtconstant.STOCK_BUY, 200, xtconstant.LATEST_PRICE, -1, 'strategy_name', stock) A.bought_list.append(stock) class MyXtQuantTraderCallback(XtQuantTraderCallback): def on_disconnected(self): """ 连接断开 :return: """ print(datetime.datetime.now(),'连接断开回调') def on_stock_order(self, order): """ 委托回报推送 :param order: XtOrder对象 :return: """ print(datetime.datetime.now(), '委托回调', order.order_remark) def on_stock_trade(self, trade): """ 成交变动推送 :param trade: XtTrade对象 :return: """ print(datetime.datetime.now(), '成交回调', trade.order_remark) def on_order_error(self, order_error): """ 委托失败推送 :param order_error:XtOrderError 对象 :return: """ # print("on order_error callback") # print(order_error.order_id, order_error.error_id, order_error.error_msg) print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}") def on_cancel_error(self, cancel_error): """ 撤单失败推送 :param cancel_error: XtCancelError 对象 :return: """ print(datetime.datetime.now(), sys._getframe().f_code.co_name) def on_order_stock_async_response(self, response): """ 异步下单回报推送 :param response: XtOrderResponse 对象 :return: """ print(f"异步委托回调 {response.order_remark}") def on_cancel_order_stock_async_response(self, response): """ :param response: XtCancelOrderResponse 对象 :return: """ print(datetime.datetime.now(), sys._getframe().f_code.co_name) def on_account_status(self, status): """ :param response: XtAccountStatus 对象 :return: """ print(datetime.datetime.now(), sys._getframe().f_code.co_name) if __name__ == '__main__': print("start") #指定客户端所在路径, # 注意:如果是连接投研端进行交易,文件目录需要指定到f"{安装目录}\userdata" path = r'D:\qmt\sp3\迅投极速交易终端 睿智融科版\userdata_mini' # 生成session id 整数类型 同时运行的策略不能重复 session_id = int(time.time()) xt_trader = XtQuantTrader(path, session_id) # 开启主动请求接口的专用线程 开启后在on_stock_xxx回调函数里调用XtQuantTrader.query_xxx函数不会卡住回调线程,但是查询和推送的数据在时序上会变得不确定 # 详见: http://docs.thinktrader.net/vip/pages/ee0e9b/#开启主动请求接口的专用线程 # xt_trader.set_relaxed_response_order_enabled(True) # 创建资金账号为 800068 的证券账号对象 acc = StockAccount('800068', 'STOCK') # 创建交易回调类对象,并声明接收回调 callback = MyXtQuantTraderCallback() xt_trader.register_callback(callback) # 启动交易线程 xt_trader.start() # 建立交易连接,返回0表示连接成功 connect_result = xt_trader.connect() print('建立交易连接,返回0表示连接成功', connect_result) # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功 subscribe_result = xt_trader.subscribe(acc) print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result) #这一行是注册全推回调函数 包括下单判断 安全起见处于注释状态 确认理解效果后再放开 # xtdata.subscribe_whole_quote(["SH", "SZ"], callback=f) # 阻塞主线程退出 xt_trader.run_forever() # 如果使用vscode pycharm等本地编辑器 可以进入交互模式 方便调试 (把上一行的run_forever注释掉 否则不会执行到这里) interact() ``` ### 定时判断实盘示例 ```python # coding:utf-8 import time, datetime, traceback, sys from xtquant import xtdata from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant # 定义一个类 创建类的实例 作为状态的容器 class _a(): pass A = _a() A.bought_list = [] A.hsa = xtdata.get_stock_list_in_sector('沪深A股') def interact(): """执行后进入repl模式""" import code code.InteractiveConsole(locals=globals()).interact() xtdata.download_sector_data() def f(data): now = datetime.datetime.now() # print(data) for stock in data: if stock not in A.hsa: continue cuurent_price = data[stock].iloc[-1, 0] pre_price = data[stock].iloc[-2, 0] ratio = cuurent_price / pre_price - 1 if pre_price > 0 else 0 if ratio > 0.09 and stock not in A.bought_list: print(f"{now} 最新价 买入 {stock} 100股") async_seq = xt_trader.order_stock_async(acc, stock, xtconstant.STOCK_BUY, 100, xtconstant.LATEST_PRICE, -1, 'strategy_name', stock) A.bought_list.append(stock) class MyXtQuantTraderCallback(XtQuantTraderCallback): def on_disconnected(self): """ 连接断开 :return: """ print(datetime.datetime.now(), '连接断开回调') def on_stock_order(self, order): """ 委托回报推送 :param order: XtOrder对象 :return: """ print(datetime.datetime.now(), '委托回调', order.order_remark) def on_stock_trade(self, trade): """ 成交变动推送 :param trade: XtTrade对象 :return: """ print(datetime.datetime.now(), '成交回调', trade.order_remark) def on_order_error(self, order_error): """ 委托失败推送 :param order_error:XtOrderError 对象 :return: """ # print("on order_error callback") # print(order_error.order_id, order_error.error_id, order_error.error_msg) print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}") def on_cancel_error(self, cancel_error): """ 撤单失败推送 :param cancel_error: XtCancelError 对象 :return: """ print(datetime.datetime.now(), sys._getframe().f_code.co_name) def on_order_stock_async_response(self, response): """ 异步下单回报推送 :param response: XtOrderResponse 对象 :return: """ print(f"异步委托回调 {response.order_remark}") def on_cancel_order_stock_async_response(self, response): """ :param response: XtCancelOrderResponse 对象 :return: """ print(datetime.datetime.now(), sys._getframe().f_code.co_name) def on_account_status(self, status): """ :param response: XtAccountStatus 对象 :return: """ print(datetime.datetime.now(), sys._getframe().f_code.co_name) if __name__ == '__main__': print("start") # 指定客户端所在路径, 券商端指定到 userdata_mini文件夹 # 注意:如果是连接投研端进行交易,文件目录需要指定到f"{安装目录}\userdata" path = r'D:\qmt\投研\迅投极速交易终端睿智融科版\userdata' # 生成session id 整数类型 同时运行的策略不能重复 session_id = int(time.time()) xt_trader = XtQuantTrader(path, session_id) # 开启主动请求接口的专用线程 开启后在on_stock_xxx回调函数里调用XtQuantTrader.query_xxx函数不会卡住回调线程,但是查询和推送的数据在时序上会变得不确定 # 详见: http://docs.thinktrader.net/vip/pages/ee0e9b/#开启主动请求接口的专用线程 # xt_trader.set_relaxed_response_order_enabled(True) # 创建资金账号为 800068 的证券账号对象 股票账号为STOCK 信用CREDIT 期货FUTURE acc = StockAccount('2000128', 'STOCK') # 创建交易回调类对象,并声明接收回调 callback = MyXtQuantTraderCallback() xt_trader.register_callback(callback) # 启动交易线程 xt_trader.start() # 建立交易连接,返回0表示连接成功 connect_result = xt_trader.connect() print('建立交易连接,返回0表示连接成功', connect_result) # 对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功 subscribe_result = xt_trader.subscribe(acc) print('对交易回调进行订阅,订阅后可以收到交易主推,返回0表示订阅成功', subscribe_result) #订阅的品种列表 code_list = ['600000.SH', '000001.SZ'] #遍历品种 下载历史k线 订阅当日行情 for code in code_list: xtdata.download_history_data(code, period='1d', start_time='20200101') xtdata.subscribe_quote(code, '1d', callback = None) while True: now = datetime.datetime.now() now_time = now.strftime('%H%M%S') if not '093000' <= now_time < '150000': print(f"{now} 非交易时间 循环退出") break #取k线数据 data = xtdata.get_market_data_ex(['close'], code_list, period= '1d', start_time= '20240101') #判断交易 f(data) #每次循环 睡眠三秒后继续 time.sleep(3) # 阻塞主线程退出 xt_trader.run_forever() # 如果使用vscode pycharm等本地编辑器 可以进入交互模式 方便调试 (把上一行的run_forever注释掉 否则不会执行到这里) interact() ``` ### 交易接口重连 该示例演示交易连接断开时重连的代码处理。 提示 1. 该示例 **不是线程安全** 的,仅演示断开连接时应该怎么处理重连代码,实际使用时请注意避免潜在的问题 2. 本策略只用于提供策略写法及参考,若您直接进行实盘下单,造成损失本网站不负担责任。 ```python #本文用一个均线策略演示交易连接断开时怎么处理交易接口重连 # 策略本身不严谨,不能作为实盘策略或者参考策略,本策略仅是演示重连用法 import time from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant from xtquant import xtdata class MyXtQuantTraderCallback(XtQuantTraderCallback): # 更多说明见 http://dict.thinktrader.net/nativeApi/xttrader.html?id=I3DJ97#%E5%A7%94%E6%89%98xtorder def on_disconnected(self): """ 连接断开 :return: """ print("connection lost, 交易接口断开,即将重连") global xt_trader xt_trader = None def on_stock_order(self, order): print(f'委托回报: 股票代码:{order.stock_code} 账号:{order.account_id}, 订单编号:{order.order_id} 柜台合同编号:{order.order_sysid} \ 委托状态:{order.order_status} 成交数量:{order.order_status} 委托数量:{order.order_volume} 已成数量:{order.traded_volume}') def on_stock_trade(self, trade): print(f'成交回报: 股票代码:{trade.stock_code} 账号:{trade.account_id}, 订单编号:{trade.order_id} 柜台合同编号:{trade.order_sysid} \ 成交编号:{trade.traded_id} 成交数量:{trade.traded_volume} 委托数量:{trade.direction} ') def on_order_error(self, order_error): print(f"报单失败: 订单编号:{order_error.order_id} 下单失败具体信息:{order_error.error_msg} 委托备注:{order_error.order_remark}") def on_cancel_error(self, cancel_error): print(f"撤单失败: 订单编号:{cancel_error.order_id} 失败具体信息:{cancel_error.error_msg} 市场:{cancel_error.market}") def on_order_stock_async_response(self, response): print(f"异步下单的请求序号:{response.seq}, 订单编号:{response.order_id} ") def on_account_status(self, status): print(f"账号状态发生变化, 账号:{status.account_id} 最新状态:{status.status}") def create_trader(xt_acc,path, session_id): trader = XtQuantTrader(path, session_id,callback=MyXtQuantTraderCallback()) trader.start() connect_result = trader.connect() trader.subscribe(xt_acc) return trader if connect_result == 0 else None def try_connect(xt_acc,path): session_id_range = [i for i in range(100, 120)] import random random.shuffle(session_id_range) # 遍历尝试session_id列表尝试连接 for session_id in session_id_range: trader = create_trader(xt_acc,path, session_id) if trader: print('连接成功,session_id:{}', session_id) return trader else: print('连接失败,session_id:{},继续尝试下一个id', session_id) continue print('所有id都尝试后仍失败,放弃连接') return None def get_xttrader(xt_acc,path): global xt_trader if xt_trader is None: xt_trader = try_connect(xt_acc,path) return xt_trader if __name__ == "__main__": # 注意实际连接XtQuantTrader时不要写类似while True 这种无限循环的尝试,因为每次连接都会用session_id创建一个对接文件,这样就会占满硬盘导致电脑运行异常 # 要控制session_id在有限的范围内尝试,这里提供10个session_id供重连尝试 # 当所有session_id都尝试后,程序会抛出异常。实际使用过程中当session_id用完时,可以增加邮件等通知方式提醒人工处理 #指定客户端所在路径 path = 'E:\qmt\\userdata_mini' xt_trader = None xt_acc = StockAccount('2000204') xt_trader = get_xttrader(xt_acc,path) if not xt_trader: raise Exception('交易接口连接失败') print('交易接口连接成功, 策略开始') stock = '513050.SH' xtdata.subscribe_quote(stock, '5m','','',count=-1) time.sleep(1) order_record = [] while '093000'<=time.strftime('%H%M%S')<'150000': time.sleep(3) xt_trader = get_xttrader(xt_acc,path) price = xtdata.get_market_data_ex(['close'],[stock],period='5m',)[stock] #计算均线 ma5 = price['close'].rolling(5).mean() ma10 = price['close'].rolling(10).mean() if ma5.iloc[-1]>ma5.iloc[-10]: t = price.index[-1] order_flag = (t, '买') if order_flag not in order_record: #防止重复下单 print(f'发起买入 {stock} k线时间:{t}') # 用最新价买100股 xt_trader.order_stock_async(xt_acc, stock, xtconstant.STOCK_BUY,100,xtconstant.LATEST_PRICE,0) order_record.append(order_flag) elif ma5.iloc[-1]