# coding:utf-8 import time, sys from peewee import ModelSelect import xtquant.xtconstant as xtconstant sys.stdout.reconfigure(encoding='utf-8') # 设置标准输出编码为UTF-8 import core.strategy_db as strategy_db import sfgrid_constants from core.sfgrid_trade_controller import StockTradeController from core.util import getInstrumentName, getStockPosition from xtquant.xttrader import XtQuantTrader from xtquant.xttype import StockAccount, XtAsset, XtOrder, XtPosition, XtTrade from xtquant import xtdata from xtquant.xttrader import XtQuantTraderCallback import datetime # 量化核心控制对象 class SFGridController(XtQuantTraderCallback): def __init__(self, account_no: str, miniQmtPath: str): super().__init__() xtdata.enable_hello = False strategy_db.db.connect() strategy_db.db.create_tables([strategy_db.TradeTarget]) print('- 数据库模块初始化完成') session_id = int(time.time()) self.xt_trader = XtQuantTrader(miniQmtPath, session_id) self.xt_trader.register_callback(self) self.xt_trader.start() connect_result = self.xt_trader.connect() print(f'- 市场交易连接{connect_result}: {'成功' if self.xt_trader.connected else '失败'}') self.account= StockAccount(account_no, 'STOCK') print(f'- 交易账号对象初始化完成, 账号: {self.account.account_id}') subscribe_result = self.xt_trader.subscribe(self.account) print(f'- 交易状态订阅{'成功' if subscribe_result == 0 else '失败'}') self.stock_trade_ctrl = {} self.init_instrument_pool(self.xt_trader, self.account) self.seq = None print('- 三疯交易系统初始化完成') def startMarketData(self): print('- 启动市场数据订阅') self.seq = xtdata.subscribe_whole_quote(['SH', 'SZ'], callback=self.onDataUpdate) if self.seq == -1: print('- 市场数据订阅失败') else: print(f'- 市场数据订阅成功, 订阅号={self.seq}') def stopMarketData(self): print('- 停止市场数据订阅') if self.seq is not None and self.seq > 0: xtdata.unsubscribe_quote(self.seq) def add_trade_target(self, stock_code: str): try: stock_name = getInstrumentName(stock_code) new_target = strategy_db.TradeTarget.create( stock_name=stock_name, stock_code=stock_code, current_position=0, grid_index=0, last_trade_price=0.0, current_buy_price=0.0, current_buy_order_no='', current_sell_price=0.0, current_sell_order_no='' ) new_target.save() print(f'新增交易标的 {stock_code} {stock_name}, {new_target.id}') # 刷新标的持仓 pos = getStockPosition(stock_code, self.xt_trader, self.account) strategy_db.TradeTarget.update(current_position=pos).where(strategy_db.TradeTarget.stock_code == stock_code).execute() # 更新标的池 self.refresh_targets() # 添加交易控制器 stockTradeController = StockTradeController(new_target, self.xt_trader, self.account, new_target.enabled) self.stock_trade_ctrl[stock_code] = stockTradeController except Exception as e: print(f'新增交易标的失败 {stock_code} {e}') def del_trade_target(self, index:int): target: strategy_db.TradeTarget = self.instrument_pool[index] # self.stock_trade_ctrl. del self.stock_trade_ctrl[target.stock_code] target.delete_instance() self.refresh_targets() def init_instrument_pool(self, xtTrader:XtQuantTrader, account:StockAccount): self.refresh_targets() for temp in self.instrument_pool: tradeTarget:strategy_db.TradeTarget = temp tradeTarget.current_position = getStockPosition(tradeTarget.stock_code, xtTrader, account) result = tradeTarget.save() print(f' |- 同步当前持仓信息 {tradeTarget.stock_code}, {tradeTarget.current_position}, result = {result}') stockTradeController = StockTradeController(tradeTarget, self.xt_trader, self.account, tradeTarget.enabled) self.stock_trade_ctrl[tradeTarget.stock_code] = stockTradeController print(f'- 初始化标的池初始化完成 , 共 {len(self.instrument_pool)} 个标的') def refresh_targets(self): # 更新标的池 self.instrument_pool:ModelSelect = strategy_db.TradeTarget.select() self.print_pool() def print_pool(self): print("- 标的池信息") for i in range(len(self.instrument_pool)): target: strategy_db.TradeTarget = self.instrument_pool[i] status = "新建" if target.status == 0 else "已建初始仓" print(f' [序号-{i}] 股票代码: {target.stock_code}-{target.stock_name} 当前持仓: {getStockPosition(target.stock_code, self.xt_trader, self.account)} 网格索引: {target.grid_index} Price {sfgrid_constants.grid_price[target.grid_index]} 状态: {status} 启用交易线程: {target.enabled}') def print_position_info(self): positions = self.xt_trader.query_stock_positions(self.account) if positions: print("\n- 持仓信息") for temp in positions: pos : XtPosition = temp if pos.m_nVolume <=0: continue print(f"股票代码: {pos.stock_code}-{getInstrumentName(pos.stock_code)}") print(f"总持仓: {pos.volume}") print(f"可用持仓: {pos.can_use_volume}") print(f"持仓成本: {pos.avg_price}") print("---") else: print("\n当前无持仓") def print_account_info(self): temp = self.xt_trader.query_stock_asset(self.account) asset: XtAsset =temp print(f"=== 账户信息 {self.account.account_id} ===") print(f"可用资金: {asset.cash}") print(f"总资产: {asset.total_asset}") print(f"证券市值: {asset.market_value}") def print_stock_orders(self): orders = self.xt_trader.query_stock_orders(self.account, cancelable_only=True) if orders: print("\n=== 委托信息 ===") for order in orders: print(f"委托编号: {order.order_id}") print(f"股票代码: {order.stock_code} {getInstrumentName(order.stock_code)}") print(f"委托方向: {order.offset_flag} ") print(f"委托价格: {order.price}") print(f"委托数量: {order.order_volume}") print(f"已成交数量: {order.traded_volume}") print(f"委托状态: {order.order_status} ") print("---") else: print("\n当前无委托记录") # 初始化指定标的交易控制器 def start_stock_trade(self, index: int): tradeTarget = self.instrument_pool[index] # check existing thread if tradeTarget.stock_code in self.stock_trade_ctrl: tradeController: StockTradeController = self.stock_trade_ctrl[tradeTarget.stock_code] if tradeController.isEnabled(): print(f"标的交易控制器已存在且正在运行 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n") else: print(f"标的交易控制器已存在但未运行,重新启动 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n") tradeController.enabledTrading(True) else: stockTradeController = StockTradeController(tradeTarget, self.xt_trader, self.account, tradeTarget.enabled) self.stock_trade_ctrl[tradeTarget.stock_code] = stockTradeController print(f"\t创建标的交易控制器 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}") def pause_stock_trade(self, index: int): tradeTarget = self.instrument_pool[index] if tradeTarget.stock_code in self.stock_trade_ctrl: tradeController: StockTradeController = self.stock_trade_ctrl[tradeTarget.stock_code] if tradeController.isEnabled(): print(f"暂停标的交易 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n") tradeController.enabledTrading(False) else: print(f"标的交易已暂停 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n") else: print(f"标的交易控制器不存在 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n") # ====== 市场回调方法 -- 以下方法由XtQuantData调用 ====== def onDataUpdate(self, data): if sfgrid_constants.max_enabled_targets <= 0: # 全推 for stock_code, tickData in data.items(): lastPrice = tickData['lastPrice'] if lastPrice == 10.0 and stock_code not in self.stock_trade_ctrl: print(f'New trade target = {stock_code} - {getInstrumentName(stock_code)} {tickData['lastPrice']}') self.add_trade_target(stock_code) self.stock_trade_ctrl[stock_code].enabledTrading(True) else: # 指定目标 当前主要使用这种模式 for target in self.instrument_pool: stock_code = target.stock_code # 如果存在对应的StockTradeController,则调用其onDataUpdate方法 if stock_code not in self.stock_trade_ctrl or stock_code not in data: # print(f"股票代码 {stock_code} 未在交易控制器中找到,跳过处理。\n") continue stock_controller: StockTradeController = self.stock_trade_ctrl[stock_code] stock_controller.onDataUpdate(data) # ====== 市场回调方法 -- 以下方法由XtQuantTrader调用 ====== def on_connected(self): """ 连接成功推送 """ print(datetime.datetime.now(), '连接成功回调') def on_disconnected(self): """ 连接断开 :return: """ print(datetime.datetime.now(), '连接断开回调') def on_stock_order(self, order:XtOrder): """ 委托回报推送 :param order: XtOrder对象 :return: """ stockCode = order.stock_code ctrl:StockTradeController = self.stock_trade_ctrl[stockCode] # 如果存在对应的StockTradeController,则调用其onDataUpdate方法 if ctrl is not None and order.strategy_name == ctrl.getName(): ctrl.onOrderTrade(order) else: print(f"委托下单回调 投资备注 {order.order_remark} 不匹配 {ctrl.getName()}") def test_sim_trade(self, index: int, orderType: int): tradeTarget:strategy_db.TradeTarget = self.instrument_pool[index] ctrl:StockTradeController = self.stock_trade_ctrl[tradeTarget.stock_code] trade: XtTrade = None if orderType == xtconstant.STOCK_BUY: trade = XtTrade( sfgrid_constants.account_no, '300083.SZ', xtconstant.STOCK_BUY, 1, 1, tradeTarget.current_buy_price, sfgrid_constants.grid_volume, 1000, tradeTarget.current_buy_order_no, None, ctrl.getName(), None, None, None, None, None, tradeTarget.stock_name) else: trade = XtTrade(sfgrid_constants.account_no, '300083.SZ', xtconstant.STOCK_SELL, 1, 1, price, sfgrid_constants.grid_volume, 1000, tradeTarget.current_sell_order_no, None, ctrl.getName(), None, None, None, None, None, tradeTarget.stock_name) self.on_stock_trade(trade) def on_stock_trade(self, trade:XtTrade): """ 成交变动推送 :param trade: XtTrade对象 :return: """ stockCode = trade.stock_code ctrl:StockTradeController = self.stock_trade_ctrl[stockCode] # 如果存在对应的StockTradeController,则调用其onDataUpdate方法 if ctrl is not None and trade.strategy_name == ctrl.getName(): ctrl.onOrderTrade(trade) else: print(f"委托回调 投资备注 {trade.strategy_name} 不匹配 {ctrl.getName()}") def on_order_error(self, order_error): """ 委托失败推送 :param order_error:XtOrderError 对象 :return: """ # print("on order_error callback") # print(order_error.order_id, order_error.error_id, order_error.error_msg) print(f"\n委托报错回调 {order_error.order_remark} {order_error.error_msg}") def on_account_status(self, status): """ :param response: XtAccountStatus 对象 :return: """ print(datetime.datetime.now(), sys._getframe().f_code.co_name) ctrl = SFGridController(sfgrid_constants.account_no, sfgrid_constants.miniQMTPath)