添加dummy gateway

This commit is contained in:
2026-06-02 18:07:55 +08:00
parent 6b3b1a1f76
commit db910e03d6
5 changed files with 373 additions and 229 deletions
+17 -208
View File
@@ -1,212 +1,21 @@
import datetime
import threading
import time
import config
from xtquant.xttype import StockAccount, XtOrder, XtOrderResponse, XtPosition, XtTrade
from xtquant.xttrader import XtQuantTrader
from xtquant.xttype import StockAccount
from core.logger import LogLevel, PrintLog
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant, xtdata
import core.eventbus as eBus
"""
QMT 模块统一入口
根据环境自动选择真实 QMT 或模拟器
"""
import sys
class QmtV(XtQuantTraderCallback):
def __init__(self) -> None:
self.xttrader: XtQuantTrader
self.inited: bool = False
self.details = {}
self.lastMarketDataUpdateTimestamp = time.time()
self.isMarketActive = False
self.refresh_thread = threading.Thread(target=self.marketStatusNotifier, daemon=True)
self.refresh_thread.start()
# time.sleep(3.1)
def getTrader(self) -> XtQuantTrader:
return self.xttrader
def init_qmtv(self):
sessionId= int(time.time())
self.xttrader = XtQuantTrader(config.miniQMTPath, sessionId)
xtdata.enable_hello = False
def connect(self) -> bool:
self.xttrader.register_callback(self)
self.xttrader.start()
self.xttrader.connect()
PrintLog(LogLevel.INFO, f'- [{'成功' if self.xttrader.connected else '失败'}]市场交易连接: {config.miniQMTPath}')
if self.xttrader.connected == False:
self.inited = False
return self.inited
else:
self.inited = True
self.account = StockAccount(config.account_no, 'STOCK') # pyright: ignore[reportAssignmentType, reportAttributeAccessIssue]
PrintLog(LogLevel.INFO, f'- [成功]交易账号对象初始化完成, 账号: {config.account_no}') # pyright: ignore[reportOptionalMemberAccess]
subscribe_result = self.xttrader.subscribe(self.account)
PrintLog(LogLevel.INFO, f'- [{'成功' if subscribe_result == 0 else '失败'}:{subscribe_result}]交易状态订阅')
if subscribe_result != 0:
self.inited = False
return self.inited
self.startMarketDataSubscription()
return self.inited
def getStockPosition(self, stock_code: str):
positions = self.xttrader.query_stock_positions(self.account)
if positions:
for temp in positions:
pos:XtPosition = temp
if pos.stock_code == stock_code:
return pos
return None
def queryPendingOrder(self, stock_code:str, tag: str) -> list[XtOrder]:
if stock_code == None or tag == None:
return []
orders = self.xttrader.query_stock_orders(self.account)
result = [order for order in orders if order.order_status == xtconstant.ORDER_REPORTED and order.stock_code == stock_code and order.strategy_name == tag]
return result
def orderAsync(self, stock_code, orderVolume, orderType, orderPrice, priceType, orderRemark, strategy_name):
return self.xttrader.order_stock_async(
self.account,
str(stock_code),
orderType,
orderVolume,
priceType,
orderPrice,
strategy_name, # strategy_name
orderRemark # remark # type: ignore
)
def cacheStockDetail(self, stock_code:str):
if stock_code in self.details:
return self.details[stock_code]
else:
self.details[stock_code] = xtdata.get_instrument_detail(stock_code, False)
return self.details[stock_code]
def getInstrumentName(self, stock_code:str):
return self.cacheStockDetail(stock_code)['InstrumentName']
def dailyUpStop(self, stock_code:str):
cacheStock = self.cacheStockDetail(stock_code)
PrintLog(LogLevel.INFO, f'- [成功]获取股票详情: {stock_code} {cacheStock["InstrumentName"]} {cacheStock["UpStopPrice"]}')
return cacheStock['UpStopPrice']
def dailyDownStop(self, stock_code:str):
return self.cacheStockDetail(stock_code)['DownStopPrice']
# ========================================#
def startMarketDataSubscription(self):
def _get_qmt():
"""获取 QMT 模块"""
if sys.platform == 'win32':
try:
self.subscriptionId = xtdata.subscribe_whole_quote(['SH', 'SZ'], self.onDataUpdate)
PrintLog(LogLevel.INFO, f'- [市场数据订阅成功-{self.subscriptionId}]')
except Exception as e:
PrintLog(LogLevel.ERROR, f'- [市场数据订阅失败-{e}]')
def stopMarketDataSubscription(self):
PrintLog(LogLevel.INFO, '- 停止市场数据订阅')
if self.subscriptionId is not None and self.subscriptionId > 0:
xtdata.unsubscribe_quote(self.subscriptionId)
# ====== 市场回调方法 -- 以下方法由XtQuantData调用 ======
def onDataUpdate(self, data):
# 收集所有市场数据用于市场监控
eBus.event_bus.publish(eBus.MarketDataUpdate, data)
now = time.time()
if now - self.lastMarketDataUpdateTimestamp < 5:
self.isMarketActive = True
self.lastMarketDataUpdateTimestamp = now
from core.qmt_real import qmtv as real_qmtv
return real_qmtv
except ImportError:
pass
def marketStatusNotifier(self):
# 市场状态通知器
tmpMarketStatus = False
while True:
tmpTime = time.time()
time.sleep(10)
if tmpMarketStatus != self.isMarketActive and tmpTime - self.lastMarketDataUpdateTimestamp < 5:
tmpMarketStatus = self.isMarketActive
PrintLog(LogLevel.INFO, f'- [市场状态变更] {self.isMarketActive}')
eBus.event_bus.publish(eBus.EventMarketActiveSwitch, self.isMarketActive)
if tmpMarketStatus and self.isMarketActive and tmpTime - self.lastMarketDataUpdateTimestamp > 10: # 上次更新市场状态已经超过10秒
self.isMarketActive = False
PrintLog(LogLevel.INFO, f'- [市场状态变更] {self.isMarketActive}')
# 非 Windows 或导入失败,使用模拟器
from core.qmt_dummy import qmtv
return qmtv
PrintLog(LogLevel.DEBUG, f'- [市场状态] {self.isMarketActive}') # 市场已 inactive
# ====== 市场回调方法 -- 以下方法由XtQuantTrader调用 ======
def on_connected(self):
"""
连接成功推送
"""
print(datetime.datetime.now(), '连接成功回调')
def on_disconnected(self):
"""
连接断开
:return:
"""
print(datetime.datetime.now(), '连接断开回调')
def on_stock_order(self, order:XtOrder):
"""
委托回报推送
:param order: XtOrder对象
:return:
"""
pass
# print(f"委托回调 on_stock_order 投资备注 {order.order_id} {order.strategy_name} {order.order_remark}")
def on_stock_trade(self, trade:XtTrade):
"""
成交变动推送
:param trade: XtTrade对象
:return:
"""
eBus.event_bus.publish(eBus.MarketOrderTraded, trade)
# stockCode = trade.stock_code
# ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode]
# # 如果存在对应的StockTradeController,则调用其onDataUpdate方法
# if ctrl is not None and trade.strategy_name == ctrl.getName():
# ctrl.onOrderTrade(trade)
# else:
# print(f"委托回调 投资备注 {trade.strategy_name} 不匹配 {ctrl.getName()}")
def on_order_stock_async_response(self, response:XtOrderResponse):
# print(f"委托回调 on_order_stock_async_response 投资备注 {response.order_id} {response.seq} {response.error_msg}{response.strategy_name} {response.order_remark}")
eBus.event_bus.publish(eBus.MarketOrderCreated, response)
# stockCode = response.order_remark
# ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode]
# # 如果存在对应的StockTradeController,则调用其onDataUpdate方法
# if ctrl is not None and response.strategy_name == ctrl.getName():
# ctrl.onAsyncOrderResponse(response)
# else:
# print(f"委托回调 投资备注 {response.strategy_name} 不匹配 {ctrl.getName()}")
def on_order_error(self, order_error):
"""
委托失败推送
:param order_error:XtOrderError 对象
:return:
"""
print(f"\n委托报错回调 {order_error.order_remark} {order_error.error_msg}")
def on_account_status(self, status):
"""
:param response: XtAccountStatus 对象
:return:
"""
print(datetime.datetime.now(), status)
qmtv = QmtV()
# 导出单例
qmtv = _get_qmt()