This commit is contained in:
2025-11-18 18:06:15 +08:00
parent fcadcb86d2
commit d5fef7c0c1
5 changed files with 126 additions and 199 deletions
+31 -53
View File
@@ -1,4 +1,5 @@
import datetime
import threading
import time
import config
from xtquant.xttype import StockAccount, XtOrder, XtOrderResponse, XtPosition, XtTrade
@@ -15,6 +16,11 @@ class QmtV(XtQuantTraderCallback):
self.xttrader: XtQuantTrader
self.inited: bool = False
self.details = {}
self.lastMarketDataUpdateTimestamp = time.time()
self.isMarketActive = False
self.refresh_thread = threading.Thread(target=self.marketStatusNotifier, daemon=True)
self.refresh_thread.start()
# time.sleep(3.1)
def getTrader(self) -> XtQuantTrader:
return self.xttrader
@@ -94,48 +100,6 @@ class QmtV(XtQuantTraderCallback):
return self.cacheStockDetail(stock_code)['UpStopPrice']
def dailyDownStop(self, stock_code:str):
return self.cacheStockDetail(stock_code)['DownStopPrice']
# def print_position_info(self):
# positions:list[XtPosition] = self.xt_trader.query_stock_positions(self.account)
# if positions:
# PrintLog(LogLevel.INFO, "\n- 持仓信息")
# for temp in positions:
# pos : XtPosition = temp
# if pos.volume <=0:
# continue
# PrintLog(LogLevel.INFO, f"股票代码: {pos.stock_code}-{getInstrumentName(pos.stock_code)}")
# PrintLog(LogLevel.INFO, f"总持仓: {pos.volume}")
# PrintLog(LogLevel.INFO, f"可用持仓: {pos.can_use_volume}")
# PrintLog(LogLevel.INFO, f"持仓成本: {pos.avg_price}")
# PrintLog(LogLevel.INFO, "---")
# else:
# PrintLog(LogLevel.INFO, "\n当前无持仓")
# def print_account_info(self):
# temp = self.xt_trader.query_stock_asset(self.account)
# asset: XtAsset = temp # type: ignore
# PrintLog(LogLevel.INFO, f"=== 账户信息 {self.account.account_id} ===") # type: ignore
# PrintLog(LogLevel.INFO, f"可用资金: {asset.cash}")
# PrintLog(LogLevel.INFO, f"总资产: {asset.total_asset}")
# PrintLog(LogLevel.INFO, f"证券市值: {asset.market_value}")
# def print_stock_orders(self):
# orders = self.xt_trader.query_stock_orders(self.account, cancelable_only=True)
# if orders:
# PrintLog(LogLevel.INFO, "\n=== 委托信息 ===")
# for order in orders:
# PrintLog(LogLevel.INFO, f"委托编号: {order.order_id}")
# PrintLog(LogLevel.INFO, f"股票代码: {order.stock_code} {getInstrumentName(order.stock_code)}")
# PrintLog(LogLevel.INFO, f"委托方向: {order.offset_flag} ")
# PrintLog(LogLevel.INFO, f"委托价格: {order.price}")
# PrintLog(LogLevel.INFO, f"委托数量: {order.order_volume}")
# PrintLog(LogLevel.INFO, f"已成交数量: {order.traded_volume}")
# PrintLog(LogLevel.INFO, f"委托状态: {order.order_status} ")
# PrintLog(LogLevel.INFO, "---")
# else:
# PrintLog(LogLevel.INFO, "\n当前无委托记录")
# ========================================#
def startMarketDataSubscription(self):
@@ -155,7 +119,29 @@ class QmtV(XtQuantTraderCallback):
# ====== 市场回调方法 -- 以下方法由XtQuantData调用 ======
def onDataUpdate(self, data):
# 收集所有市场数据用于市场监控
PrintLog(LogLevel.INFO, f'- [市场数据更新] {len(data)}')
eBus.event_bus.publish(eBus.MarketDataUpdate, data)
now = time.time()
if now - self.lastMarketDataUpdateTimestamp < 5:
self.isMarketActive = True
PrintLog(LogLevel.INFO, f'- [市场状态变更] 市场已 Active') # 市场已 inactive
self.lastMarketDataUpdateTimestamp = now
def marketStatusNotifier(self):
# 市场状态通知器
tmpMarketStatus = False
while True:
PrintLog(LogLevel.INFO, f'- [市场状态检查器] {self.isMarketActive}')
tmpTime = time.time()
time.sleep(10)
if tmpMarketStatus != self.isMarketActive and tmpTime - self.lastMarketDataUpdateTimestamp < 5:
tmpMarketStatus = self.isMarketActive
PrintLog(LogLevel.INFO, f'- [市场状态变更] {self.isMarketActive}')
eBus.event_bus.publish(eBus.EventMarketActiveSwitch, self.isMarketActive)
if tmpMarketStatus and self.isMarketActive:
if tmpTime - self.lastMarketDataUpdateTimestamp > 10: # 上次更新市场状态已经超过10秒
self.isMarketActive = False
PrintLog(LogLevel.INFO, f'- [市场状态变更] 市场已 inactive') # 市场已 inactive
# ====== 市场回调方法 -- 以下方法由XtQuantTrader调用 ======
@@ -178,16 +164,8 @@ class QmtV(XtQuantTraderCallback):
:param order: XtOrder对象
:return:
"""
print(f"委托回调 on_stock_order 投资备注 {order.order_id} {order.strategy_name} {order.order_remark}")
# print(f'orderd {order.strategy_name}-{order.stock_code} {order.order_id} {order.order_volume}-{order.order_status}')
# stockCode = order.stock_code
# ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode]
# # 如果存在对应的StockTradeController,则调用其onDataUpdate方法
# if ctrl is not None and order.strategy_name == ctrl.getName():
# print(f'controller info {ctrl.getName()}')
# ctrl.onAsyncOrderResponse(order) # type: ignore
# else:
# print(f"委托下单回调 投资备注 orderId: {order.order_sysid} [{order.stock_code}-{order.instrument_name}] volume: {order.order_volume} 订单策略: '{order.strategy_name}'<-->'{ctrl.getName()}'")
pass
# print(f"委托回调 on_stock_order 投资备注 {order.order_id} {order.strategy_name} {order.order_remark}")
def on_stock_trade(self, trade:XtTrade):
@@ -206,7 +184,7 @@ class QmtV(XtQuantTraderCallback):
# print(f"委托回调 投资备注 {trade.strategy_name} 不匹配 {ctrl.getName()}")
def on_order_stock_async_response(self, response:XtOrderResponse):
print(f"委托回调 on_order_stock_async_response 投资备注 {response.order_id} {response.seq} {response.error_msg}{response.strategy_name} {response.order_remark}")
# print(f"委托回调 on_order_stock_async_response 投资备注 {response.order_id} {response.seq} {response.error_msg}{response.strategy_name} {response.order_remark}")
eBus.event_bus.publish(eBus.MarketOrderCreated, response)
# stockCode = response.order_remark