""" QMT 真实交易实现 - 封装 xtquant SDK """ import datetime import os import subprocess import threading import time import config import core.eventbus as eBus from core.logger import LogLevel, PrintLog class RealQmtV: """ 真实 QMT 交易器 封装 xtquant 的 XtQuantTrader,提供与模拟器一致的接口 """ # miniQMT 进程名关键字(GUI 壳: XtMiniQmt.exe,交易引擎: miniquote.exe) _QMT_PROCESS_KEYWORDS = ['Qmt', 'qmt', 'QMT', 'miniquote', 'MiniQuote'] @staticmethod def _discover_qmt_port() -> int: """ 自动探测 miniQMT 监听端口。 方法1: SDK 内部扫描 (读取配置) 方法2: netstat 找 LISTENING 端口 → 反查所属进程名 → 匹配 QMT 关键字 返回端口号,未找到返回 0。 """ # ---- 方法1: SDK 内部扫描 ---- try: from xtquant import xtconn addrs = xtconn.scan_available_server_addr() for addr in addrs: try: port = int(addr.split(':')[1]) if port: PrintLog(LogLevel.INFO, f'[端口探测] SDK 扫描发现端口: {port}') return port except (ValueError, IndexError): continue except Exception as e: PrintLog(LogLevel.DEBUG, f'[端口探测] SDK 扫描异常: {e}') # ---- 方法2: netstat → 反向查进程名 ---- try: # 2a. netstat 找出所有 LISTENING 端口的 PID pid_ports = {} # pid -> [port, ...] netstat = subprocess.run( ['netstat', '-ano'], capture_output=True, text=True, timeout=10 ) for line in netstat.stdout.splitlines(): if 'LISTENING' not in line and 'LISTEN' not in line: continue parts = line.split() if len(parts) < 5: continue try: local_addr = parts[1] port = int(local_addr.rsplit(':', 1)[-1]) pid = int(parts[-1]) if port > 0: pid_ports.setdefault(pid, []).append(port) except (ValueError, IndexError): continue if not pid_ports: PrintLog(LogLevel.DEBUG, '[端口探测] netstat 未找到任何 LISTENING 端口') return 0 # 2b. 对每个有监听端口的 PID,查进程名是否匹配 QMT for pid, ports in pid_ports.items(): name = RealQmtV._get_process_name(pid) if name and any(kw in name for kw in RealQmtV._QMT_PROCESS_KEYWORDS): port = ports[0] PrintLog(LogLevel.INFO, f'[端口探测] 发现 QMT 进程: {name} (PID={pid}), 端口: {port}') # 同时探测 userdata_mini 路径 exe_path = RealQmtV._get_process_exe_path(pid) if exe_path: PrintLog(LogLevel.INFO, f'[路径探测] 进程路径: {exe_path}') found_path = RealQmtV._find_userdata_mini(exe_path) if found_path: PrintLog(LogLevel.INFO, f'[路径探测] 发现 userdata_mini: {found_path}') if found_path != config.miniQMTPath: PrintLog(LogLevel.INFO, f'[路径探测] 自动修正 miniQMTPath: {config.miniQMTPath} -> {found_path}') config.miniQMTPath = found_path # 同时从窗口标题提取资金账号 account = RealQmtV._discover_account() if account: if account != config.account_no: PrintLog(LogLevel.INFO, f'[账号探测] 自动修正 account_no: {config.account_no[-4:]}**** -> {account[-4:]}****') config.account_no = account else: PrintLog(LogLevel.INFO, f'[账号探测] 确认账号: {account[-4:]}****') return port except Exception as e: PrintLog(LogLevel.INFO, f'[端口探测] 进程扫描异常: {e}') PrintLog(LogLevel.WARNING, '[端口探测] 未能自动发现 miniQMT 端口') return 0 @staticmethod def _get_process_name(pid: int) -> str: """通过 PID 获取进程名(单个查询,不用扫全量 tasklist)""" try: result = subprocess.run( ['tasklist', '/fi', f'PID eq {pid}', '/fo', 'csv', '/nh'], capture_output=True, text=True, timeout=5 ) for line in result.stdout.splitlines(): line = line.strip() if not line or line.startswith('INFO:'): continue parts = [p.strip('"').strip() for p in line.split('","')] if len(parts) >= 2: return parts[0] except Exception: pass return '' @staticmethod def _get_process_exe_path(pid: int) -> str: """通过 PID 获取进程的可执行文件完整路径""" try: result = subprocess.run( ['powershell', '-NoProfile', '-Command', f'(Get-Process -Id {pid}).Path'], capture_output=True, text=True, encoding='utf-8', errors='replace', timeout=5 ) path = result.stdout.strip() if path and os.path.isfile(path): return path except Exception: pass return '' @staticmethod def _find_userdata_mini(exe_path: str) -> str: """从 QMT 可执行文件路径向上查找 userdata_mini 目录""" exe_dir = os.path.dirname(exe_path) # 从 exe 所在目录开始,向上最多 3 层 for _ in range(4): candidate = os.path.join(exe_dir, 'userdata_mini') if os.path.isdir(candidate): return candidate parent = os.path.dirname(exe_dir) if parent == exe_dir: break exe_dir = parent return '' @staticmethod def _discover_account() -> str: """ 从 XtMiniQmt.exe 的窗口标题中提取资金账号。 标题格式: "8882874667 - 国金证券QMT交易端 2.0.8.300" 返回账号字符串,失败返回空字符串。 """ try: # 找到 XtMiniQmt.exe 的 PID tasklist = subprocess.run( ['tasklist', '/fo', 'csv', '/nh'], capture_output=True, text=True, timeout=10 ) gui_pid = 0 for line in tasklist.stdout.splitlines(): line = line.strip() if not line: continue parts = [p.strip('"').strip() for p in line.split('","')] if len(parts) >= 2 and 'XtMiniQmt' in parts[0]: gui_pid = int(parts[1]) break if not gui_pid: return '' # 获取窗口标题(PowerShell 输出可能含中文,用 utf-8) result = subprocess.run( ['powershell', '-NoProfile', '-Command', f'(Get-Process -Id {gui_pid}).MainWindowTitle'], capture_output=True, text=True, encoding='utf-8', errors='replace', timeout=5 ) title = result.stdout.strip() if title and ' - ' in title: account = title.split(' - ')[0].strip() if account.isdigit(): return account except Exception: pass return '' @staticmethod def _to_plain_code(stock_code: str) -> str: """将 xtquant 格式 '600519.SH' 转换为数据库格式 '600519'""" return stock_code.split('.')[0] if '.' in stock_code else stock_code @staticmethod def _to_full_code(stock_code: str) -> str: """将数据库格式 '600519' 转换为 xtquant 格式 '600519.SH'""" if '.' in stock_code: return stock_code # already has suffix code = stock_code if code.startswith(('6', '5', '9')): return f'{code}.SH' elif code.startswith(('0', '3', '2')): return f'{code}.SZ' # fallback: try both, prefer SH return f'{code}.SH' def __init__(self) -> None: self.inited = False self.connected = False self.account = None self.xt_trader = None self.mini_qmt_path = "" self._positions = {} self._pending_orders = [] self._market_data_thread = None self.isMarketActive = False self.lastMarketDataUpdateTimestamp = time.time() self.details = {} def getTrader(self): return self def init_qmtv(self): """初始化 QMT 交易器""" try: from xtquant.xttrader import XtQuantTrader from xtquant.xttype import StockAccount self.mini_qmt_path = config.miniQMTPath self.account = StockAccount(config.account_no, 'STOCK') PrintLog(LogLevel.INFO, f'[QMT] 初始化: path={self.mini_qmt_path}, account={config.account_no[-4:]}****') # 创建 XtQuantTrader 实例 session_id = int(time.time()) % 10000 PrintLog(LogLevel.INFO, f'[QMT] 创建 XtQuantTrader, session={session_id}') self.xt_trader = XtQuantTrader(self.mini_qmt_path, session_id) # 注册回调 — xtquant 只接受一个回调对象,会在上面调用 on_xxx 方法 self.xt_trader.register_callback(self) self.inited = True PrintLog(LogLevel.INFO, f'- [真实] QMT 交易器初始化成功') except Exception as e: self.inited = False PrintLog(LogLevel.ERROR, f'- [失败] QMT 初始化: {e}') def connect(self) -> bool: """连接 MiniQMT,失败自动探测端口并重试""" if not self.inited: PrintLog(LogLevel.ERROR, '[QMT] 连接失败: 未初始化') return False _connect_errors = { 0: '成功', -1: '一般错误(miniQMT 可能未启动)', -2: 'miniQMT 未运行(请先启动极简QMT)', -3: '连接超时', } def _do_connect() -> int: self.xt_trader.start() PrintLog(LogLevel.INFO, '[QMT] xt_trader.start() 完成') PrintLog(LogLevel.INFO, '[QMT] 正在连接 miniQMT...') return self.xt_trader.connect() try: # 尝试默认连接 PrintLog(LogLevel.INFO, '[QMT] 尝试默认方式连接...') connect_result = _do_connect() # 失败则自动探测端口并重试 if connect_result != 0: PrintLog(LogLevel.INFO, '[QMT] 默认连接失败,启动端口自动探测...') discovered_port = self._discover_qmt_port() if discovered_port > 0: PrintLog(LogLevel.INFO, f'[QMT] 探测到端口 {discovered_port},尝试连接...') try: from xtquant import xtdata xtdata.connect(ip='127.0.0.1', port=discovered_port) PrintLog(LogLevel.INFO, f'[QMT] xtdata 连接成功 (端口: {discovered_port})') except Exception as e: PrintLog(LogLevel.ERROR, f'[QMT] xtdata 连接失败 (端口: {discovered_port}): {e}') return False connect_result = _do_connect() else: PrintLog(LogLevel.WARNING, '[QMT] 端口自动探测未找到 miniQMT 进程') result_desc = _connect_errors.get(connect_result, f'未知({connect_result})') PrintLog(LogLevel.INFO, f'[QMT] connect() 返回: {connect_result} ({result_desc})') if connect_result == 0: PrintLog(LogLevel.INFO, f'[QMT] 订阅账户...') self.xt_trader.subscribe(self.account) PrintLog(LogLevel.INFO, '[QMT] 订阅完成') self.connected = True self.startMarketDataSubscription() PrintLog(LogLevel.INFO, f'[QMT] 连接成功 (账号: {config.account_no[-4:]}****)') return True else: PrintLog(LogLevel.ERROR, f'[QMT] 连接失败: {result_desc}') return False except Exception as e: PrintLog(LogLevel.ERROR, f'[QMT] 连接异常: {e}') return False def getAllPositions(self) -> dict: """获取全部持仓,返回 {plain_code: position_object}""" if not self.connected: return {} try: positions = self.xt_trader.query_stock_positions(self.account) result = {} for pos in positions: code = self._to_plain_code(getattr(pos, 'stock_code', '')) result[code] = pos # 缓存以供 getStockPosition 使用 self._position_cache = result return result except Exception as e: PrintLog(LogLevel.ERROR, f'- [获取全部持仓失败]: {e}') return {} def getStockPosition(self, stock_code: str): """获取单只股票持仓(优先使用缓存)""" if not self.connected: return None try: # 优先查缓存 if hasattr(self, '_position_cache') and stock_code in self._position_cache: return self._position_cache[stock_code] # 回退查询 positions = self.xt_trader.query_stock_positions(self.account) for pos in positions: pos_code = self._to_plain_code(getattr(pos, 'stock_code', '')) if pos_code == stock_code: return pos return None except Exception as e: PrintLog(LogLevel.ERROR, f'- [持仓查询失败] {stock_code}: {e}') return None def queryPendingOrder(self, stock_code: str, tag: str) -> list: """查询挂单(过滤已撤/废单)""" if not self.connected: return [] try: orders = self.xt_trader.query_stock_orders(self.account) # 过滤已撤(54)和废单(57),避免策略误判"已有挂单"跳过下单 _CANCELED = {54, 57} return [o for o in orders if self._to_plain_code(getattr(o, 'stock_code', '')) == stock_code and (tag is None or getattr(o, 'strategy_name', None) == tag) and getattr(o, 'order_status', 0) not in _CANCELED] except Exception as e: PrintLog(LogLevel.ERROR, f'- [查询挂单失败] {e}') return [] def queryTodayOrders(self) -> list: """查询当日所有委托""" if not self.connected: return [] try: return list(self.xt_trader.query_stock_orders(self.account)) except Exception as e: PrintLog(LogLevel.ERROR, f'- [查询委托失败] {e}') return [] def queryTodayTrades(self) -> list: """查询当日所有成交""" if not self.connected: return [] try: return list(self.xt_trader.query_stock_trades(self.account)) except Exception as e: PrintLog(LogLevel.ERROR, f'- [查询成交失败] {e}') return [] def orderAsync(self, stock_code, orderVolume, orderType, orderPrice, priceType, orderRemark, strategy_name): """异步下单""" if not self.connected: PrintLog(LogLevel.ERROR, '- [下单失败] 未连接') return -1 try: full_code = self._to_full_code(stock_code) seq = self.xt_trader.order_stock_async( account=self.account, stock_code=full_code, order_volume=orderVolume, order_type=orderType, price=orderPrice, price_type=priceType, order_remark=orderRemark, strategy_name=strategy_name ) PrintLog(LogLevel.INFO, f'- [下单] {stock_code} 数量:{orderVolume} 价格:{orderPrice} 类型:{orderType} seq:{seq}') return 0 except Exception as e: PrintLog(LogLevel.ERROR, f'- [下单失败] {stock_code}: {e}') return -1 def cacheStockDetail(self, stock_code: str): """获取股票详情""" if stock_code not in self.details: try: from xtquant import xtdata # xtquant 需要带后缀的完整代码 full_code = self._to_full_code(stock_code) detail = xtdata.get_instrument_detail(full_code) if detail: # xtquant 返回 dict,使用 .get() 读取 self.details[stock_code] = { 'InstrumentName': detail.get('InstrumentName', stock_code) if isinstance(detail, dict) else getattr(detail, 'InstrumentName', stock_code), 'UpStopPrice': detail.get('UpStopPrice', 0) if isinstance(detail, dict) else getattr(detail, 'UpStopPrice', 0), 'DownStopPrice': detail.get('DownStopPrice', 0) if isinstance(detail, dict) else getattr(detail, 'DownStopPrice', 0) } else: self.details[stock_code] = { 'InstrumentName': stock_code, 'UpStopPrice': 0, 'DownStopPrice': 0 } except Exception: self.details[stock_code] = { 'InstrumentName': stock_code, 'UpStopPrice': 0, 'DownStopPrice': 0 } return self.details[stock_code] def getInstrumentName(self, stock_code: str) -> str: """获取股票名称""" return self.cacheStockDetail(stock_code)['InstrumentName'] def dailyUpStop(self, stock_code: str): """获取涨停价""" detail = self.cacheStockDetail(stock_code) up_stop = detail.get('UpStopPrice', 0) PrintLog(LogLevel.DEBUG, f'- [详情] {stock_code} {detail["InstrumentName"]} 涨停价: {up_stop}') return up_stop or 0.0 def dailyDownStop(self, stock_code: str): """获取跌停价""" detail = self.cacheStockDetail(stock_code) down_stop = detail.get('DownStopPrice', 0) return down_stop or 0.0 def getLastPrice(self, stock_code: str) -> float: """主动获取最新市价(拉取模式,作为推送的兜底)""" try: from xtquant import xtdata import json full_code = self._to_full_code(stock_code) # 方式1: 尝试 get_full_tick(参数是 list[str],返回 dict {code: {...}}) raw = xtdata.get_full_tick([full_code]) if raw: tick = json.loads(raw) if isinstance(raw, str) else raw if isinstance(tick, dict): # 格式: {'600519.SH': {'lastPrice': 8.97, ...}} for code, info in tick.items(): if isinstance(info, dict) and info.get('lastPrice', 0) > 0: PrintLog(LogLevel.DEBUG, f'[getLastPrice] {stock_code} → tick: {info["lastPrice"]:.3f}') return float(info['lastPrice']) # 方式2: get_market_data 取最新1分钟K线收盘价 data = xtdata.get_market_data( field_list=['close'], stock_list=[full_code], period='1m', count=1 ) if data: vals = None if full_code in data: row = data[full_code] if hasattr(row, '__iter__') and not isinstance(row, str): row = list(row) if row: vals = row if not vals and 'close' in data: field_data = data['close'] if full_code in field_data: vals = list(field_data[full_code]) if vals and len(vals) > 0 and float(vals[0]) > 0: PrintLog(LogLevel.DEBUG, f'[getLastPrice] {stock_code} → kline: {float(vals[0]):.3f}') return float(vals[0]) # 方式3: 下载历史数据后再试 xtdata.download_history_data(full_code, '1m', '') data = xtdata.get_market_data( field_list=['close'], stock_list=[full_code], period='1m', count=1 ) if data: vals = None if full_code in data: row = data[full_code] if hasattr(row, '__iter__') and not isinstance(row, str): row = list(row) if row: vals = row if not vals and 'close' in data: field_data = data['close'] if full_code in field_data: vals = list(field_data[full_code]) if vals and len(vals) > 0 and float(vals[0]) > 0: PrintLog(LogLevel.DEBUG, f'[getLastPrice] {stock_code} → download+kline: {float(vals[0]):.3f}') return float(vals[0]) PrintLog(LogLevel.DEBUG, f'[getLastPrice] {stock_code} → 失败: 所有方式均无数据, raw={raw}') except Exception as e: PrintLog(LogLevel.DEBUG, f'[getLastPrice] {stock_code} → 异常: {e}') return 0.0 def startMarketDataSubscription(self): """启动市场数据订阅""" try: from xtquant import xtdata # 订阅沪深全市场实时行情 seq = xtdata.subscribe_whole_quote(['SH', 'SZ'], self._on_market_data) PrintLog(LogLevel.INFO, f'- [市场数据订阅成功-真实] seq={seq}') # 启动行情活跃监控线程(默认不活跃,收到行情后激活) self._market_data_thread = threading.Thread( target=self._market_data_watchdog, daemon=True ) self._market_data_thread.start() except Exception as e: PrintLog(LogLevel.ERROR, f'- [市场数据订阅失败-{e}]') def _on_market_data(self, datas: dict): """xtquant 行情回调 — 收到行情即标记市场活跃""" self.lastMarketDataUpdateTimestamp = time.time() if not self.isMarketActive: self.isMarketActive = True eBus.event_bus.publish(eBus.EventMarketActiveSwitch, True) eBus.event_bus.publish(eBus.MarketDataUpdate, datas) def _market_data_watchdog(self): """行情活跃监控 — 超过 120 秒无行情则标记市场不活跃""" while True: time.sleep(15) if self.isMarketActive: elapsed = time.time() - self.lastMarketDataUpdateTimestamp if elapsed > 120: self.isMarketActive = False eBus.event_bus.publish(eBus.EventMarketActiveSwitch, False) PrintLog(LogLevel.WARNING, f'- [行情] 超过 {elapsed:.0f} 秒无更新,市场标记为不活跃') def stopMarketDataSubscription(self): """停止市场数据订阅""" self.isMarketActive = False PrintLog(LogLevel.INFO, '- [市场数据订阅已停止]') # ---- xtquant 回调处理 (xtquant 通过回调对象调用 on_xxx 方法) ---- def on_connected(self): PrintLog(LogLevel.INFO, f'[QMT] on_connected: 真实 QMT 连接成功 {datetime.datetime.now()}') def on_disconnected(self): PrintLog(LogLevel.WARNING, f'[QMT] on_disconnected: 真实 QMT 连接断开 {datetime.datetime.now()}') def on_stock_order(self, order): self._pending_orders.append(order) def on_stock_trade(self, trade): eBus.event_bus.publish(eBus.MarketOrderTraded, trade) def on_order_stock_async_response(self, response): eBus.event_bus.publish(eBus.MarketOrderCreated, response) def on_order_error(self, order_error): PrintLog(LogLevel.ERROR, f'[QMT] 委托报错: order_id={order_error.order_id}, error_id={order_error.error_id}, ' f'error_msg={order_error.error_msg}, remark={order_error.order_remark}') eBus.event_bus.publish(eBus.MarketOrderError, order_error) def on_account_status(self, status): PrintLog(LogLevel.INFO, f'[QMT] on_account_status: {datetime.datetime.now()} {status}') qmtv = RealQmtV()