From db910e03d65a884725d907fd47a4e72992d04557 Mon Sep 17 00:00:00 2001 From: AdamGao Date: Tue, 2 Jun 2026 18:07:55 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0dummy=20gateway?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/main_ui.py | 13 ++- core/qmt.py | 225 +++---------------------------------- core/qmt_dummy.py | 275 ++++++++++++++++++++++++++++++++++++++++++++++ core/qmt_real.py | 25 +++++ starter.py | 64 +++++++---- 5 files changed, 373 insertions(+), 229 deletions(-) create mode 100644 core/qmt_dummy.py create mode 100644 core/qmt_real.py diff --git a/core/main_ui.py b/core/main_ui.py index 1fa1e47..65ad0e5 100644 --- a/core/main_ui.py +++ b/core/main_ui.py @@ -3,7 +3,18 @@ from tkinter import ttk from core.logger import LogLevel, LogData, PrintLog from core.sfgrid.sfgrid_ui import TradeTargetUI -from tkinter import ttk +# 检测运行环境,决定使用真实或模拟 QMT +def get_qmt_module(): + try: + # 尝试导入真实 QMT,如果失败则使用模拟 + from core.qmt import qmtv + return qmtv + except ImportError: + from core.qmt_dummy import qmtv + return qmtv + +qmtv = get_qmt_module() + from core.eventbus import EventPrintLog from core.eventbus import event_bus as eBus diff --git a/core/qmt.py b/core/qmt.py index 98fdd3d..2d6db40 100644 --- a/core/qmt.py +++ b/core/qmt.py @@ -1,212 +1,21 @@ -import datetime -import threading -import time -import config -from xtquant.xttype import StockAccount, XtOrder, XtOrderResponse, XtPosition, XtTrade -from xtquant.xttrader import XtQuantTrader -from xtquant.xttype import StockAccount -from core.logger import LogLevel, PrintLog -from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback -from xtquant.xttype import StockAccount -from xtquant import xtconstant, xtdata -import core.eventbus as eBus +""" +QMT 模块统一入口 +根据环境自动选择真实 QMT 或模拟器 +""" +import sys -class QmtV(XtQuantTraderCallback): - def __init__(self) -> None: - self.xttrader: XtQuantTrader - self.inited: bool = False - self.details = {} - self.lastMarketDataUpdateTimestamp = time.time() - self.isMarketActive = False - self.refresh_thread = threading.Thread(target=self.marketStatusNotifier, daemon=True) - self.refresh_thread.start() - # time.sleep(3.1) - - def getTrader(self) -> XtQuantTrader: - return self.xttrader - - def init_qmtv(self): - sessionId= int(time.time()) - self.xttrader = XtQuantTrader(config.miniQMTPath, sessionId) - xtdata.enable_hello = False - - def connect(self) -> bool: - self.xttrader.register_callback(self) - self.xttrader.start() - self.xttrader.connect() - - PrintLog(LogLevel.INFO, f'- [{'成功' if self.xttrader.connected else '失败'}]市场交易连接: {config.miniQMTPath}') - if self.xttrader.connected == False: - self.inited = False - return self.inited - else: - self.inited = True - - self.account = StockAccount(config.account_no, 'STOCK') # pyright: ignore[reportAssignmentType, reportAttributeAccessIssue] - PrintLog(LogLevel.INFO, f'- [成功]交易账号对象初始化完成, 账号: {config.account_no}') # pyright: ignore[reportOptionalMemberAccess] - subscribe_result = self.xttrader.subscribe(self.account) - PrintLog(LogLevel.INFO, f'- [{'成功' if subscribe_result == 0 else '失败'}:{subscribe_result}]交易状态订阅') - if subscribe_result != 0: - self.inited = False - return self.inited - self.startMarketDataSubscription() - return self.inited - - - def getStockPosition(self, stock_code: str): - positions = self.xttrader.query_stock_positions(self.account) - if positions: - for temp in positions: - pos:XtPosition = temp - if pos.stock_code == stock_code: - return pos - return None - - - def queryPendingOrder(self, stock_code:str, tag: str) -> list[XtOrder]: - if stock_code == None or tag == None: - return [] - orders = self.xttrader.query_stock_orders(self.account) - result = [order for order in orders if order.order_status == xtconstant.ORDER_REPORTED and order.stock_code == stock_code and order.strategy_name == tag] - return result - - def orderAsync(self, stock_code, orderVolume, orderType, orderPrice, priceType, orderRemark, strategy_name): - return self.xttrader.order_stock_async( - self.account, - str(stock_code), - orderType, - orderVolume, - priceType, - orderPrice, - strategy_name, # strategy_name - orderRemark # remark # type: ignore - ) - - def cacheStockDetail(self, stock_code:str): - if stock_code in self.details: - return self.details[stock_code] - else: - self.details[stock_code] = xtdata.get_instrument_detail(stock_code, False) - return self.details[stock_code] - - def getInstrumentName(self, stock_code:str): - return self.cacheStockDetail(stock_code)['InstrumentName'] - - def dailyUpStop(self, stock_code:str): - cacheStock = self.cacheStockDetail(stock_code) - PrintLog(LogLevel.INFO, f'- [成功]获取股票详情: {stock_code} {cacheStock["InstrumentName"]} {cacheStock["UpStopPrice"]}') - return cacheStock['UpStopPrice'] - - def dailyDownStop(self, stock_code:str): - return self.cacheStockDetail(stock_code)['DownStopPrice'] - - # ========================================# - def startMarketDataSubscription(self): +def _get_qmt(): + """获取 QMT 模块""" + if sys.platform == 'win32': try: - self.subscriptionId = xtdata.subscribe_whole_quote(['SH', 'SZ'], self.onDataUpdate) - - PrintLog(LogLevel.INFO, f'- [市场数据订阅成功-{self.subscriptionId}]') - except Exception as e: - PrintLog(LogLevel.ERROR, f'- [市场数据订阅失败-{e}]') - - def stopMarketDataSubscription(self): - PrintLog(LogLevel.INFO, '- 停止市场数据订阅') - - if self.subscriptionId is not None and self.subscriptionId > 0: - xtdata.unsubscribe_quote(self.subscriptionId) - - # ====== 市场回调方法 -- 以下方法由XtQuantData调用 ====== - def onDataUpdate(self, data): - # 收集所有市场数据用于市场监控 - eBus.event_bus.publish(eBus.MarketDataUpdate, data) - now = time.time() - if now - self.lastMarketDataUpdateTimestamp < 5: - self.isMarketActive = True - self.lastMarketDataUpdateTimestamp = now + from core.qmt_real import qmtv as real_qmtv + return real_qmtv + except ImportError: + pass - def marketStatusNotifier(self): - # 市场状态通知器 - tmpMarketStatus = False - while True: - tmpTime = time.time() - time.sleep(10) - if tmpMarketStatus != self.isMarketActive and tmpTime - self.lastMarketDataUpdateTimestamp < 5: - tmpMarketStatus = self.isMarketActive - PrintLog(LogLevel.INFO, f'- [市场状态变更] {self.isMarketActive}') - eBus.event_bus.publish(eBus.EventMarketActiveSwitch, self.isMarketActive) - if tmpMarketStatus and self.isMarketActive and tmpTime - self.lastMarketDataUpdateTimestamp > 10: # 上次更新市场状态已经超过10秒 - self.isMarketActive = False - PrintLog(LogLevel.INFO, f'- [市场状态变更] {self.isMarketActive}') + # 非 Windows 或导入失败,使用模拟器 + from core.qmt_dummy import qmtv + return qmtv - PrintLog(LogLevel.DEBUG, f'- [市场状态] {self.isMarketActive}') # 市场已 inactive - - - # ====== 市场回调方法 -- 以下方法由XtQuantTrader调用 ====== - def on_connected(self): - """ - 连接成功推送 - """ - print(datetime.datetime.now(), '连接成功回调') - - def on_disconnected(self): - """ - 连接断开 - :return: - """ - print(datetime.datetime.now(), '连接断开回调') - - def on_stock_order(self, order:XtOrder): - """ - 委托回报推送 - :param order: XtOrder对象 - :return: - """ - pass - # print(f"委托回调 on_stock_order 投资备注 {order.order_id} {order.strategy_name} {order.order_remark}") - - - def on_stock_trade(self, trade:XtTrade): - """ - 成交变动推送 - :param trade: XtTrade对象 - :return: - """ - eBus.event_bus.publish(eBus.MarketOrderTraded, trade) - # stockCode = trade.stock_code - # ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode] - # # 如果存在对应的StockTradeController,则调用其onDataUpdate方法 - # if ctrl is not None and trade.strategy_name == ctrl.getName(): - # ctrl.onOrderTrade(trade) - # else: - # print(f"委托回调 投资备注 {trade.strategy_name} 不匹配 {ctrl.getName()}") - - def on_order_stock_async_response(self, response:XtOrderResponse): - # print(f"委托回调 on_order_stock_async_response 投资备注 {response.order_id} {response.seq} {response.error_msg}{response.strategy_name} {response.order_remark}") - eBus.event_bus.publish(eBus.MarketOrderCreated, response) - - # stockCode = response.order_remark - # ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode] - # # 如果存在对应的StockTradeController,则调用其onDataUpdate方法 - # if ctrl is not None and response.strategy_name == ctrl.getName(): - # ctrl.onAsyncOrderResponse(response) - # else: - # print(f"委托回调 投资备注 {response.strategy_name} 不匹配 {ctrl.getName()}") - - def on_order_error(self, order_error): - """ - 委托失败推送 - :param order_error:XtOrderError 对象 - :return: - """ - print(f"\n委托报错回调 {order_error.order_remark} {order_error.error_msg}") - - - def on_account_status(self, status): - """ - :param response: XtAccountStatus 对象 - :return: - """ - print(datetime.datetime.now(), status) - - -qmtv = QmtV() +# 导出单例 +qmtv = _get_qmt() \ No newline at end of file diff --git a/core/qmt_dummy.py b/core/qmt_dummy.py new file mode 100644 index 0000000..55b0b84 --- /dev/null +++ b/core/qmt_dummy.py @@ -0,0 +1,275 @@ +""" +Dummy QMT 模拟器 - 用于在非 Windows 环境下模拟 QMT 交易功能 +""" +import datetime +import threading +import time +import random +import config +import core.eventbus as eBus +from core.logger import LogLevel, PrintLog + +class DummyPosition: + """模拟持仓""" + def __init__(self, stock_code, stock_name, volume, yesterday_vol=0): + self.stock_code = stock_code + self.stock_name = stock_name + self.volume = volume + self.can_use_volume = volume + self.yesterday_volume = yesterday_vol + +class DummyOrder: + """模拟订单""" + def __init__(self, stock_code, order_id, status, price, volume): + self.stock_code = stock_code + self.order_id = order_id + self.order_status = status + self.order_price = price + self.volume = volume + +class DummyTrade: + """模拟成交""" + def __init__(self, stock_code, trade_id, price, volume, strategy_name): + self.stock_code = stock_code + self.trade_id = trade_id + self.trade_price = price + self.trade_volume = volume + self.strategy_name = strategy_name + +class DummyOrderResponse: + """模拟下单响应""" + def __init__(self, order_id, stock_code, seq, error_msg, strategy_name): + self.order_id = order_id + self.stock_code = stock_code + self.seq = seq + self.error_msg = error_msg + self.strategy_name = strategy_name + +class DummyQmtV: + """ + Dummy QMT 模拟器 + 模拟 QmtV 类的接口,用于在没有 miniQMT 的环境下运行和测试 + """ + def __init__(self) -> None: + self.inited = False + self.details = {} + self.lastMarketDataUpdateTimestamp = time.time() + self.isMarketActive = True + self.connected = False + self.account = None + self._positions = {} + self._pending_orders = [] + self._market_data_thread = None + self._counter = 0 + + def getTrader(self): + return self + + def init_qmtv(self): + """初始化交易器""" + PrintLog(LogLevel.INFO, f'- [模拟] QMT 交易器初始化') + self.connected = True + self.inited = True + + def connect(self) -> bool: + """连接 QMT (模拟总是成功)""" + PrintLog(LogLevel.INFO, f'- [成功] 市场交易连接 (模拟模式)') + + # 创建模拟账号 + try: + from xtquant.xttype import StockAccount + self.account = StockAccount(config.account_no, 'STOCK') + except ImportError: + self.account = type('StockAccount', (), {'account_id': config.account_no})() + PrintLog(LogLevel.INFO, f'- [成功] 交易账号: {config.account_no}') + + self._init_dummy_positions() + self.startMarketDataSubscription() + + return self.inited + + def _init_dummy_positions(self): + """初始化模拟持仓数据""" + dummy_stocks = [ + ('600519', '贵州茅台', 100, 2800.0), + ('000858', '五粮液', 200, 180.0), + ('600036', '招商银行', 500, 42.0), + ('000001', '平安银行', 300, 13.5), + ] + for code, name, volume, price in dummy_stocks: + self._positions[code] = { + 'stock_code': code, + 'stock_name': name, + 'volume': volume, + 'can_use_volume': volume, + 'open_cost': price, + 'market_value': volume * price + } + PrintLog(LogLevel.INFO, f'- [模拟] 已加载 {len(self._positions)} 个持仓') + + def getStockPosition(self, stock_code: str): + """获取持仓 (模拟)""" + if stock_code in self._positions: + pos = self._positions[stock_code] + return type('DummyPos', (), pos)() + return None + + def queryPendingOrder(self, stock_code: str, tag: str) -> list: + """查询挂单""" + return [o for o in self._pending_orders + if o.stock_code == stock_code and + (tag is None or getattr(o, 'strategy_name', None) == tag)] + + def orderAsync(self, stock_code, orderVolume, orderType, orderPrice, priceType, orderRemark, strategy_name): + """异步下单 (模拟)""" + self._counter += 1 + order_id = f"DUMMY{self._counter:06d}" + seq = self._counter + + order = DummyOrder( + stock_code=stock_code, + order_id=order_id, + status='reported', + price=orderPrice, + volume=orderVolume + ) + order.strategy_name = strategy_name + order.order_remark = orderRemark + self._pending_orders.append(order) + + response = DummyOrderResponse( + order_id=order_id, + stock_code=stock_code, + seq=seq, + error_msg='成功', + strategy_name=strategy_name + ) + response.order_remark = orderRemark + + eBus.event_bus.publish(eBus.MarketOrderCreated, response) + PrintLog(LogLevel.INFO, f'- [模拟下单] {stock_code} 数量:{orderVolume} 价格:{orderPrice} 订单号:{order_id}') + + # 模拟成交 (80% 概率) + if random.random() > 0.2: + threading.Timer(random.uniform(0.5, 3.0), self._simulate_trade, + args=(stock_code, order_id, orderPrice, orderVolume, strategy_name)).start() + + return 0 + + def _simulate_trade(self, stock_code, order_id, price, volume, strategy_name): + """模拟成交""" + trade = DummyTrade( + stock_code=stock_code, + trade_id=f"TRADE{self._counter:06d}", + price=price, + volume=volume, + strategy_name=strategy_name + ) + trade.trade_time = int(time.strftime('%H%M%S')) + trade.order_remark = stock_code + + if stock_code in self._positions: + self._positions[stock_code]['volume'] += volume + self._positions[stock_code]['can_use_volume'] += volume + + eBus.event_bus.publish(eBus.MarketOrderTraded, trade) + PrintLog(LogLevel.INFO, f'- [模拟成交] {stock_code} 数量:{volume} 价格:{price}') + + def cacheStockDetail(self, stock_code: str): + """获取股票详情 (模拟)""" + if stock_code not in self.details: + self.details[stock_code] = { + 'InstrumentName': self._get_dummy_name(stock_code), + 'UpStopPrice': 0, + 'DownStopPrice': 0 + } + return self.details[stock_code] + + def _get_dummy_name(self, stock_code: str) -> str: + """获取模拟股票名称""" + names = { + '600519': '贵州茅台', '000858': '五粮液', '600036': '招商银行', + '000001': '平安银行', '000002': '万科A', '600000': '浦发银行' + } + return names.get(stock_code, f'股票{stock_code}') + + def getInstrumentName(self, stock_code: str) -> str: + """获取股票名称""" + return self.cacheStockDetail(stock_code)['InstrumentName'] + + def dailyUpStop(self, stock_code: str): + """获取涨停价 (模拟)""" + cacheStock = self.cacheStockDetail(stock_code) + PrintLog(LogLevel.INFO, f'- [模拟] 获取股票详情: {stock_code} {cacheStock["InstrumentName"]} 涨停价: 0') + return 0.0 + + def dailyDownStop(self, stock_code: str): + """获取跌停价 (模拟)""" + return 0.0 + + def startMarketDataSubscription(self): + """启动市场数据订阅 (模拟)""" + try: + self._market_data_thread = threading.Thread(target=self._generate_market_data, daemon=True) + self._market_data_thread.start() + PrintLog(LogLevel.INFO, f'- [市场数据订阅成功-模拟]') + except Exception as e: + PrintLog(LogLevel.ERROR, f'- [市场数据订阅失败-{e}]') + + def stopMarketDataSubscription(self): + """停止市场数据订阅""" + PrintLog(LogLevel.INFO, '- 停止市场数据订阅 (模拟)') + + def _generate_market_data(self): + """生成模拟市场数据""" + stocks = ['600519', '000858', '600036', '000001', '000002', '600000'] + base_prices = [2800.0, 180.0, 42.0, 13.5, 10.0, 10.0] + + while True: + try: + for i, stock in enumerate(stocks): + data = { + 'stock_code': stock, + 'last_price': base_prices[i] + random.uniform(-1, 1), + 'open_price': base_prices[i], + 'high_price': base_prices[i] + random.uniform(0, 2), + 'low_price': base_prices[i] - random.uniform(0, 2), + 'volume': random.randint(1000, 10000), + 'timestamp': time.time() + } + eBus.event_bus.publish(eBus.MarketDataUpdate, data) + base_prices[i] = data['last_price'] + + self.lastMarketDataUpdateTimestamp = time.time() + self.isMarketActive = True + eBus.event_bus.publish(eBus.EventMarketActiveSwitch, True) + + time.sleep(3) + + except Exception as e: + PrintLog(LogLevel.ERROR, f'- [市场数据模拟异常-{e}]') + time.sleep(1) + + def on_connected(self): + print(datetime.datetime.now(), '模拟连接成功') + + def on_disconnected(self): + print(datetime.datetime.now(), '模拟连接断开') + + def on_stock_order(self, order): + pass + + def on_stock_trade(self, trade): + eBus.event_bus.publish(eBus.MarketOrderTraded, trade) + + def on_order_stock_async_response(self, response): + eBus.event_bus.publish(eBus.MarketOrderCreated, response) + + def on_order_error(self, order_error): + print(f"\n模拟委托报错回调 {order_error}") + + def on_account_status(self, status): + print(datetime.datetime.now(), status) + + +qmtv = DummyQmtV() \ No newline at end of file diff --git a/core/qmt_real.py b/core/qmt_real.py new file mode 100644 index 0000000..4d4bdf1 --- /dev/null +++ b/core/qmt_real.py @@ -0,0 +1,25 @@ +""" +QMT 模块统一入口 +根据环境自动选择真实 QMT 或模拟器 +""" +import sys + +def _get_qmt(): + """获取 QMT 模块""" + if sys.platform == 'win32': + try: + # Windows 环境尝试导入真实 QMT + import core.qmt_real as qmt_module + return qmt_module.qmtv + except ImportError: + pass + + # 非 Windows 或导入失败,使用模拟器 + try: + import core.qmt_dummy as qmt_module + return qmt_module.qmtv + except ImportError: + raise ImportError("无法加载 QMT 模块") + +# 导出单例 +qmtv = _get_qmt() \ No newline at end of file diff --git a/starter.py b/starter.py index 1bd2280..318e2c6 100644 --- a/starter.py +++ b/starter.py @@ -5,7 +5,6 @@ from tkinter import ttk, filedialog, messagebox import configparser from core.main_ui import MainWindow import config as sdConstants -from core.qmt import qmtv class ConfigWindow: def __init__(self, root): @@ -119,33 +118,58 @@ def check_and_create_config(): config_window = ConfigWindow(root) root.mainloop() +def ask_mode(): + """询问用户选择模式""" + root = tk.Tk() + root.withdraw() # 隐藏主窗口 + result = messagebox.askyesno( + "选择交易模式", + "是否使用模拟交易模式?\n\n" + + "是 → 模拟交易(无需 miniQMT,可在 macOS/Linux 运行)\n" + + "否 → 真实交易(需要 Windows + miniQMT)" + ) + root.destroy() + return result + def initialize_system(): """初始化系统""" - + try: - while True: - # 初始化配置 - if sdConstants.exist_config() and sdConstants.initConfig(): - # 初始化qmtv - qmtv.init_qmtv() - connected = qmtv.connect() - if connected: - # 连接成功,启动主窗口 - window = MainWindow(sdConstants.log_level) - window.run() - break + # 询问用户选择模式 + if ask_mode(): + # 模拟模式 + from core.qmt_dummy import qmtv as selected_qmtv + print("[模拟模式] 使用模拟交易器") + sdConstants.miniQMTPath = '/dummy/path' + sdConstants.account_no = 'DUMMY_ACCOUNT' + sdConstants.log_level = 'INFO' + selected_qmtv.init_qmtv() + selected_qmtv.connect() + window = MainWindow(sdConstants.log_level) + window.run() + else: + # 真实 QMT 模式 + from core.qmt_real import qmtv as selected_qmtv + while True: + if sdConstants.exist_config() and sdConstants.initConfig(): + selected_qmtv.init_qmtv() + connected = selected_qmtv.connect() + if connected: + window = MainWindow(sdConstants.log_level) + window.run() + break + else: + option = messagebox.askokcancel("连接失败", "QMT连接失败,请检查") + if option: + check_and_create_config() + else: + break else: - option = messagebox.askokcancel("连接失败", "QMT连接失败,请检查") + option = messagebox.askokcancel("错误", "请检查配置") if option: check_and_create_config() else: break - else: - option = messagebox.askokcancel("错误", "请检查配置") - if option: - check_and_create_config() - else: - break except Exception as e: messagebox.showerror("错误", f"系统初始化失败: {str(e)}")