""" QMT 真实交易实现 - 封装 xtquant SDK """ import datetime import threading import time import config import core.eventbus as eBus from core.logger import LogLevel, PrintLog class RealQmtV: """ 真实 QMT 交易器 封装 xtquant 的 XtQuantTrader,提供与模拟器一致的接口 """ @staticmethod def _to_plain_code(stock_code: str) -> str: """将 xtquant 格式 '600519.SH' 转换为数据库格式 '600519'""" return stock_code.split('.')[0] if '.' in stock_code else stock_code @staticmethod def _to_full_code(stock_code: str) -> str: """将数据库格式 '600519' 转换为 xtquant 格式 '600519.SH'""" if '.' in stock_code: return stock_code # already has suffix code = stock_code if code.startswith(('6', '5', '9')): return f'{code}.SH' elif code.startswith(('0', '3', '2')): return f'{code}.SZ' # fallback: try both, prefer SH return f'{code}.SH' @staticmethod def _strip_code_suffixes(datas: dict) -> dict: """批量去除 xtquant 数据中的代码后缀""" result = {} for code, tick in datas.items(): result[code] = tick if '.' in code: result[code.split('.')[0]] = tick return result def __init__(self) -> None: self.inited = False self.connected = False self.account = None self.xt_trader = None self.mini_qmt_path = "" self._positions = {} self._pending_orders = [] self._market_data_thread = None self.isMarketActive = False self.lastMarketDataUpdateTimestamp = time.time() self.details = {} def getTrader(self): return self def init_qmtv(self): """初始化 QMT 交易器""" try: from xtquant.xttrader import XtQuantTrader from xtquant.xttype import StockAccount self.mini_qmt_path = config.miniQMTPath self.account = StockAccount(config.account_no, 'STOCK') # 创建 XtQuantTrader 实例 session_id = int(time.time()) % 10000 self.xt_trader = XtQuantTrader(self.mini_qmt_path, session_id) # 注册回调 — xtquant 只接受一个回调对象,会在上面调用 on_xxx 方法 self.xt_trader.register_callback(self) self.inited = True PrintLog(LogLevel.INFO, f'- [真实] QMT 交易器初始化成功') except Exception as e: self.inited = False PrintLog(LogLevel.ERROR, f'- [失败] QMT 初始化: {e}') def connect(self) -> bool: """连接 MiniQMT""" if not self.inited: PrintLog(LogLevel.ERROR, '- [失败] QMT 未初始化') return False try: # 启动 trader 线程 self.xt_trader.start() # 建立连接 connect_result = self.xt_trader.connect() if connect_result == 0: # 订阅账户 (传入 StockAccount 对象而不是 account_id 字符串) self.xt_trader.subscribe(self.account) # 等待回调 time.sleep(1) self.connected = True self.startMarketDataSubscription() PrintLog(LogLevel.INFO, f'- [成功] 真实交易连接成功 (账号: {config.account_no})') return True else: PrintLog(LogLevel.ERROR, f'- [失败] 连接失败, 返回码: {connect_result}') return False except Exception as e: PrintLog(LogLevel.ERROR, f'- [失败] 连接异常: {e}') return False def getAllPositions(self) -> dict: """获取全部持仓,返回 {plain_code: position_object}""" if not self.connected: return {} try: positions = self.xt_trader.query_stock_positions(self.account) result = {} for pos in positions: code = self._to_plain_code(getattr(pos, 'stock_code', '')) result[code] = pos # 缓存以供 getStockPosition 使用 self._position_cache = result return result except Exception as e: PrintLog(LogLevel.ERROR, f'- [获取全部持仓失败]: {e}') return {} def getStockPosition(self, stock_code: str): """获取单只股票持仓(优先使用缓存)""" if not self.connected: return None try: # 优先查缓存 if hasattr(self, '_position_cache') and stock_code in self._position_cache: return self._position_cache[stock_code] # 回退查询 positions = self.xt_trader.query_stock_positions(self.account) for pos in positions: pos_code = self._to_plain_code(getattr(pos, 'stock_code', '')) if pos_code == stock_code: return pos return None except Exception as e: PrintLog(LogLevel.ERROR, f'- [持仓查询失败] {stock_code}: {e}') return None def queryPendingOrder(self, stock_code: str, tag: str) -> list: """查询挂单""" if not self.connected: return [] try: orders = self.xt_trader.query_stock_orders(self.account) return [o for o in orders if self._to_plain_code(getattr(o, 'stock_code', '')) == stock_code and (tag is None or getattr(o, 'strategy_name', None) == tag)] except Exception as e: PrintLog(LogLevel.ERROR, f'- [查询挂单失败] {e}') return [] def orderAsync(self, stock_code, orderVolume, orderType, orderPrice, priceType, orderRemark, strategy_name): """异步下单""" if not self.connected: PrintLog(LogLevel.ERROR, '- [下单失败] 未连接') return -1 try: seq = self.xt_trader.order_stock_async( account=self.account, stock_code=stock_code, order_volume=orderVolume, order_type=orderType, price=orderPrice, price_type=priceType, order_remark=orderRemark, strategy_name=strategy_name ) PrintLog(LogLevel.INFO, f'- [下单] {stock_code} 数量:{orderVolume} 价格:{orderPrice} 类型:{orderType} seq:{seq}') return 0 except Exception as e: PrintLog(LogLevel.ERROR, f'- [下单失败] {stock_code}: {e}') return -1 def cacheStockDetail(self, stock_code: str): """获取股票详情""" if stock_code not in self.details: try: from xtquant import xtdata # xtquant 需要带后缀的完整代码 full_code = self._to_full_code(stock_code) detail = xtdata.get_instrument_detail(full_code) if detail: # xtquant 返回 dict,使用 .get() 读取 self.details[stock_code] = { 'InstrumentName': detail.get('InstrumentName', stock_code) if isinstance(detail, dict) else getattr(detail, 'InstrumentName', stock_code), 'UpStopPrice': detail.get('UpStopPrice', 0) if isinstance(detail, dict) else getattr(detail, 'UpStopPrice', 0), 'DownStopPrice': detail.get('DownStopPrice', 0) if isinstance(detail, dict) else getattr(detail, 'DownStopPrice', 0) } else: self.details[stock_code] = { 'InstrumentName': stock_code, 'UpStopPrice': 0, 'DownStopPrice': 0 } except Exception: self.details[stock_code] = { 'InstrumentName': stock_code, 'UpStopPrice': 0, 'DownStopPrice': 0 } return self.details[stock_code] def getInstrumentName(self, stock_code: str) -> str: """获取股票名称""" return self.cacheStockDetail(stock_code)['InstrumentName'] def dailyUpStop(self, stock_code: str): """获取涨停价""" detail = self.cacheStockDetail(stock_code) up_stop = detail.get('UpStopPrice', 0) PrintLog(LogLevel.DEBUG, f'- [详情] {stock_code} {detail["InstrumentName"]} 涨停价: {up_stop}') return up_stop or 0.0 def dailyDownStop(self, stock_code: str): """获取跌停价""" detail = self.cacheStockDetail(stock_code) down_stop = detail.get('DownStopPrice', 0) return down_stop or 0.0 def getLastPrice(self, stock_code: str) -> float: """主动获取最新市价(拉取模式,作为推送的兜底)""" try: from xtquant import xtdata import json full_code = self._to_full_code(stock_code) # 方式1: 尝试 get_full_tick(参数是 list[str],返回 dict {code: {...}}) raw = xtdata.get_full_tick([full_code]) if raw: tick = json.loads(raw) if isinstance(raw, str) else raw if isinstance(tick, dict): # 格式: {'600519.SH': {'lastPrice': 8.97, ...}} for code, info in tick.items(): if isinstance(info, dict) and info.get('lastPrice', 0) > 0: PrintLog(LogLevel.DEBUG, f'[getLastPrice] {stock_code} → tick: {info["lastPrice"]:.3f}') return float(info['lastPrice']) # 方式2: get_market_data 取最新1分钟K线收盘价 data = xtdata.get_market_data( field_list=['close'], stock_list=[full_code], period='1m', count=1 ) if data: vals = None if full_code in data: row = data[full_code] if hasattr(row, '__iter__') and not isinstance(row, str): row = list(row) if row: vals = row if not vals and 'close' in data: field_data = data['close'] if full_code in field_data: vals = list(field_data[full_code]) if vals and len(vals) > 0 and float(vals[0]) > 0: PrintLog(LogLevel.DEBUG, f'[getLastPrice] {stock_code} → kline: {float(vals[0]):.3f}') return float(vals[0]) # 方式3: 下载历史数据后再试 xtdata.download_history_data(full_code, '1m', '') data = xtdata.get_market_data( field_list=['close'], stock_list=[full_code], period='1m', count=1 ) if data: vals = None if full_code in data: row = data[full_code] if hasattr(row, '__iter__') and not isinstance(row, str): row = list(row) if row: vals = row if not vals and 'close' in data: field_data = data['close'] if full_code in field_data: vals = list(field_data[full_code]) if vals and len(vals) > 0 and float(vals[0]) > 0: PrintLog(LogLevel.DEBUG, f'[getLastPrice] {stock_code} → download+kline: {float(vals[0]):.3f}') return float(vals[0]) PrintLog(LogLevel.DEBUG, f'[getLastPrice] {stock_code} → 失败: 所有方式均无数据, raw={raw}') except Exception as e: PrintLog(LogLevel.DEBUG, f'[getLastPrice] {stock_code} → 异常: {e}') return 0.0 def startMarketDataSubscription(self): """启动市场数据订阅""" try: from xtquant import xtdata # 订阅沪深全市场实时行情 seq = xtdata.subscribe_whole_quote(['SH', 'SZ'], self._on_market_data) PrintLog(LogLevel.INFO, f'- [市场数据订阅成功-真实] seq={seq}') # 启动行情活跃监控线程 self._market_data_thread = threading.Thread( target=self._market_data_watchdog, daemon=True ) self._market_data_thread.start() except Exception as e: PrintLog(LogLevel.ERROR, f'- [市场数据订阅失败-{e}]') def _on_market_data(self, datas: dict): """xtquant 行情回调 — 将数据转换为事件总线格式""" self.lastMarketDataUpdateTimestamp = time.time() if not self.isMarketActive: self.isMarketActive = True eBus.event_bus.publish(eBus.EventMarketActiveSwitch, True) # xtquant 返回 "600519.SH" 格式 key,UI 使用纯代码 "600519" # 构建同时包含两种 key 的数据确保匹配 eBus.event_bus.publish(eBus.MarketDataUpdate, self._strip_code_suffixes(datas)) def _market_data_watchdog(self): """行情活跃监控 — 超过 30 秒无数据则标记市场不活跃""" while True: time.sleep(10) if self.isMarketActive: elapsed = time.time() - self.lastMarketDataUpdateTimestamp if elapsed > 30: self.isMarketActive = False eBus.event_bus.publish(eBus.EventMarketActiveSwitch, False) PrintLog(LogLevel.WARNING, f'- [行情] 超过 {elapsed:.0f} 秒无更新,市场标记为不活跃') def stopMarketDataSubscription(self): """停止市场数据订阅""" self.isMarketActive = False PrintLog(LogLevel.INFO, '- [市场数据订阅已停止]') # ---- xtquant 回调处理 (xtquant 通过回调对象调用 on_xxx 方法) ---- def on_connected(self): print(datetime.datetime.now(), '真实 QMT 连接成功') def on_disconnected(self): print(datetime.datetime.now(), '真实 QMT 连接断开') def on_stock_order(self, order): self._pending_orders.append(order) def on_stock_trade(self, trade): eBus.event_bus.publish(eBus.MarketOrderTraded, trade) def on_order_stock_async_response(self, response): eBus.event_bus.publish(eBus.MarketOrderCreated, response) def on_order_error(self, order_error): print(f"\n真实委托报错回调 {order_error}") def on_account_status(self, status): print(datetime.datetime.now(), status) qmtv = RealQmtV()