# coding:utf-8 import time, sys sys.stdout.reconfigure(encoding='utf-8') # 设置标准输出编码为UTF-8 import strategy_db import sfgrid_constants from sfgrid_trade_controller import StockTradeController from util import getInstrumentName, getStockPosition from xtquant.xttrader import XtQuantTrader from xtquant.xttype import StockAccount from xtquant import xtdata from xtquant.xttrader import XtQuantTraderCallback import datetime class SFGridController(XtQuantTraderCallback): def __init__(self, account_no: str, miniQmtPath: str): super().__init__() xtdata.enable_hello = False strategy_db.db.connect() strategy_db.db.create_tables([strategy_db.TradeTarget]) print('- 数据库模块初始化完成') self.init_trader(miniQmtPath) self.init_trade_account(account_no, 'STOCK') self.stock_trade_ctrl = {} self.init_instrument_pool() self.seq = None print('- 三疯交易系统初始化完成') def startMarketData(self): print('- 启动市场数据订阅') self.seq = xtdata.subscribe_whole_quote(['SH', 'SZ'], callback=self.onDataUpdate) if self.seq == -1: print('- 市场数据订阅失败') else: print(f'- 市场数据订阅成功, 订阅号={self.seq}') def stopMarketData(self): print('- 停止市场数据订阅') if self.seq is not None and self.seq > 0: xtdata.unsubscribe_quote(self.seq) def onDataUpdate(self, data): # 遍历股池 for target in self.instrument_pool: stock_code = target.stock_code # 如果存在对应的StockTradeController,则调用其onDataUpdate方法 if stock_code not in self.stock_trade_ctrl: print(f"股票代码 {stock_code} 未在交易控制器中找到,跳过处理。\n") continue if stock_code not in data: print(f"股票代码 {stock_code} 未在行情数据中找到,跳过处理。\n") continue stock_controller: StockTradeController = self.stock_trade_ctrl[stock_code] stock_controller.onDataUpdate(data) def add_trade_target(self, stock_code): try: stock_name = getInstrumentName(stock_code) new_target = strategy_db.TradeTarget.create( stock_name=stock_name, stock_code=stock_code, current_position=0, grid_index=0, last_trade_price=0.0, current_buy_price=0.0, current_sell_price=0.0 ) new_target.save() print(f'新增交易标的 {stock_code} {stock_name}, {new_target.id}') # 刷新标的持仓 pos = getStockPosition(stock_code, self.xt_trader, self.account) strategy_db.TradeTarget.update(current_position=pos).where(strategy_db.TradeTarget.stock_code == stock_code).execute() # 更新标的池 self.instrument_pool = strategy_db.TradeTarget.select() # 添加交易控制器 stockTradeController = StockTradeController(new_target, self.xt_trader, self.account, new_target.enabled) self.stock_trade_ctrl[stock_code] = stockTradeController except Exception as e: print(f'新增交易标的失败 {stock_code} {e}') def init_trader(self, path): session_id = int(time.time()) self.xt_trader = XtQuantTrader(path, session_id) self.xt_trader.register_callback(self) self.xt_trader.start() self.xt_trader.connect() print(f'- 交易对象初始化完成, {self.xt_trader.connected}') def init_trade_account(self, account_id, account_type): self.account= StockAccount(account_id, account_type) print(f'- 交易账号对象初始化完成, 账号: {self.account.account_id}') def init_instrument_pool(self): self.instrument_pool = strategy_db.TradeTarget.select() for tradeTarget in self.instrument_pool: stockTradeController = StockTradeController(tradeTarget, self.xt_trader, self.account, tradeTarget.enabled) self.stock_trade_ctrl[tradeTarget.stock_code] = stockTradeController print(f'- 初始化标的池初始化完成 , 共 {len(self.instrument_pool)} 个标的') self.print_pool() def print_pool(self): print("- 标的池信息") for i in range(len(self.instrument_pool)): target: strategy_db.TradeTarget = self.instrument_pool[i] status = "新建" if target.status == 0 else "已建初始仓" print(f' [序号-{i}] 股票代码: {target.stock_code}-{target.stock_name} 当前持仓: {target.current_position} 网格索引: {target.grid_index+1} Price {sfgrid_constants.grid_price[target.grid_index]} 状态: {status} 启用交易线程: {target.enabled}') def print_position_info(self): positions = self.xt_trader.query_stock_positions(self.account) if positions: print("\n- 持仓信息") for pos in positions: if pos.m_nVolume <=0: continue print(f"股票代码: {pos.stock_code}-{getInstrumentName(pos.stock_code)}") print(f"总持仓: {pos.m_nVolume}") print(f"可用持仓: {pos.m_nCanUseVolume}") print(f"持仓成本: {pos.avg_price}") print("---") else: print("\n当前无持仓") def print_account_info(self): account_info = self.xt_trader.query_stock_asset(self.account) print(f"\n=== 账户信息 {self.account.account_id} ===") print(f"可用资金: {account_info.m_dCash}") print(f"总资产: {account_info.m_dTotalAsset}") print(f"证券市值: {account_info.m_dMarketValue}") def print_stock_orders(self): orders = self.xt_trader.query_stock_orders(self.account, cancelable_only=True) if orders: print("\n=== 委托信息 ===") for order in orders: print(f"委托编号: {order.order_id}") print(f"股票代码: {order.stock_code} {getInstrumentName(order.stock_code)}") print(f"委托方向: {order.offset_flag} ") print(f"委托价格: {order.price}") print(f"委托数量: {order.order_volume}") print(f"已成交数量: {order.traded_volume}") print(f"委托状态: {order.order_status} ") print("---") else: print("\n当前无委托记录") # 初始化指定标的交易控制器 def start_stock_trade(self, index: int): tradeTarget = self.instrument_pool[index] # check existing thread if tradeTarget.stock_code in self.stock_trade_ctrl: tradeController: StockTradeController = self.stock_trade_ctrl[tradeTarget.stock_code] if tradeController.isEnabled(): print(f"标的交易控制器已存在且正在运行 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n") else: print(f"标的交易控制器已存在但未运行,重新启动 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n") tradeController.enabledTrading(True) else: stockTradeController = StockTradeController(tradeTarget, self.xt_trader, self.account, tradeTarget.enabled) self.stock_trade_ctrl[tradeTarget.stock_code] = stockTradeController print(f"\t创建标的交易控制器 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}") def pause_stock_trade(self, index: int): tradeTarget = self.instrument_pool[index] if tradeTarget.stock_code in self.stock_trade_ctrl: tradeController: StockTradeController = self.stock_trade_ctrl[tradeTarget.stock_code] if tradeController.isEnabled(): print(f"暂停标的交易 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n") tradeController.enabledTrading(False) else: print(f"标的交易已暂停 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n") else: print(f"标的交易控制器不存在 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n") # ====== 市场回调方法 -- 以下方法由XtQuantTrader调用 ====== def on_connected(self): """ 连接成功推送 """ print(datetime.datetime.now(), '连接成功回调') def on_disconnected(self): """ 连接断开 :return: """ print(datetime.datetime.now(), '连接断开回调') def on_stock_order(self, order): """ 委托回报推送 :param order: XtOrder对象 :return: """ print(datetime.datetime.now(), '委托回调 投资备注', order.order_remark) def on_stock_trade(self, trade): """ 成交变动推送 :param trade: XtTrade对象 :return: """ print(datetime.datetime.now(), '成交回调', trade.order_remark, f"委托方向(48买 49卖) {trade.offset_flag} 成交价格 {trade.traded_price} 成交数量 {trade.traded_volume}") def on_order_error(self, order_error): """ 委托失败推送 :param order_error:XtOrderError 对象 :return: """ # print("on order_error callback") # print(order_error.order_id, order_error.error_id, order_error.error_msg) print(f"委托报错回调 {order_error.order_remark} {order_error.error_msg}") def on_cancel_error(self, cancel_error): """ 撤单失败推送 :param cancel_error: XtCancelError 对象 :return: """ print(datetime.datetime.now(), sys._getframe().f_code.co_name) def on_order_stock_async_response(self, response): """ 异步下单回报推送 :param response: XtOrderResponse 对象 :return: """ print(f"异步委托回调 投资备注: {response.order_remark}") def on_cancel_order_stock_async_response(self, response): """ :param response: XtCancelOrderResponse 对象 :return: """ print(datetime.datetime.now(), sys._getframe().f_code.co_name) def on_account_status(self, status): """ :param response: XtAccountStatus 对象 :return: """ print(datetime.datetime.now(), sys._getframe().f_code.co_name)