市场数据跟踪

This commit is contained in:
2025-11-10 14:43:14 +08:00
parent 7733d6df32
commit 3a137b6aee
4 changed files with 192 additions and 41 deletions
+48 -17
View File
@@ -1,6 +1,6 @@
# coding:utf-8
from core.strategy_db import TradeTarget
from core.eventbus import ActionDisableMarketData, ActionEnableMarketData, ActionEventAddTradeTarget, ActionEventDeleteTradeTarget, ActionEventDisableTrade, ActionEventEnableTrade, MarketDataUpdate, MarketDataEnabled, MarketDataDisabled, ResultEventTradeDisabled, ResultEventTradeEnabled, ResultEventTradeTargetDeleted, event_bus
from core.eventbus import ActionDisableMarketData, ActionEnableMarketData, ActionEventAddTradeTarget, ActionEventDeleteTradeTarget, ActionEventDisableTrade, ActionEventEnableTrade, EventTradeTargetUpdate, MarketDataUpdate, MarketDataEnabled, MarketDataDisabled, ResultEventTradeDisabled, ResultEventTradeEnabled, ResultEventTradeTargetDeleted, ActionEventGridFix, event_bus
from xtquant.xttrader import XtQuantTrader
import time
from peewee import ModelSelect
@@ -16,6 +16,7 @@ from xtquant.xttrader import XtQuantTraderCallback
import datetime
import core.ui as ui
from core.logger import PrintLog, LogLevel
from core.objects import GridFixData
# 量化核心控制对象
class SFGridController(XtQuantTraderCallback):
@@ -49,6 +50,7 @@ class SFGridController(XtQuantTraderCallback):
else:
self.inited = False
return
self.listening_stock = []
self.stock_trade_ctrl = {}
self.init_instrument_pool(self.xt_trader, self.account) # type: ignore
@@ -63,6 +65,7 @@ class SFGridController(XtQuantTraderCallback):
event_bus.subscribe(ActionDisableMarketData, self.onMarketDataDisabled)
event_bus.subscribe(ActionEventAddTradeTarget, self.onAddTradeTarget)
event_bus.subscribe(ActionEventDeleteTradeTarget, self.onDeleteTradeTarget)
event_bus.subscribe(ActionEventGridFix, self.onGridFix)
def onDeleteTradeTarget(self, id: int):
"""处理删除交易标的事件"""
@@ -88,6 +91,33 @@ class SFGridController(XtQuantTraderCallback):
def onDisableTrade(self, id: int):
self.pause_stock_trade(id)
def onGridFix(self, data: GridFixData):
"""处理网格修正事件"""
self.update_trade_target_grid(data)
def update_trade_target_grid(self, data: GridFixData):
"""更新交易标的网格信息"""
try:
target = data.tradeTarget
grid_index = data.grid_index
# 更新数据库中的网格索引
target.grid_index = grid_index
target.save()
# 更新内存中的交易标的
if target.get_id() in self.instrument_pool:
self.instrument_pool[target.get_id()] = target
# 更新交易控制器中的网格信息
if target.stock_code in self.stock_trade_ctrl:
trade_controller: SFGridStrategy = self.stock_trade_ctrl[target.stock_code]
trade_controller.updateGridIndex(grid_index) # type: ignore
PrintLog(LogLevel.INFO, f"网格修正已应用: {target.stock_code} - {target.stock_name}, 网格索引: {grid_index}")
except Exception as e:
PrintLog(LogLevel.ERROR, f"网格修正更新失败: {str(e)}")
def hold(self):
self.appUi.run()
@@ -193,7 +223,7 @@ class SFGridController(XtQuantTraderCallback):
PrintLog(LogLevel.INFO, f' |- 同步[{target.stock_code}-{target.stock_name}]持仓信息[{'成功' if result == 1 else '失败'}]')
stockTradeController = SFGridStrategy(tradeTarget, self.xt_trader, self.account) # type: ignore
self.stock_trade_ctrl[tradeTarget.stock_code] = stockTradeController
event_bus.publish(MarketDataUpdate, tradeTarget)
event_bus.publish(EventTradeTargetUpdate, tradeTarget)
PrintLog(LogLevel.INFO, f'- [成功]交易标的信息初始化, 共 {len(self.instrument_pool)} 个标的')
@@ -280,22 +310,23 @@ class SFGridController(XtQuantTraderCallback):
# ====== 市场回调方法 -- 以下方法由XtQuantData调用 ======
def onDataUpdate(self, data):
if sfgrid_constants.max_enabled_targets <= 0: # 全推
for stock_code, tickData in data.items():
lastPrice = tickData['lastPrice']
if lastPrice == 10.0 and stock_code not in self.stock_trade_ctrl:
print(f'New trade target = {stock_code} - {getInstrumentName(stock_code)} {tickData['lastPrice']}')
self.add_trade_target(stock_code)
self.stock_trade_ctrl[stock_code].enabledTrading(True)
else: # 指定目标 当前主要使用这种模式
for id in self.instrument_pool:
stock_code = self.instrument_pool[id].stock_code
# 如果存在对应的StockTradeController,则调用其onDataUpdate方法
if stock_code not in self.stock_trade_ctrl or stock_code not in data:
# print(f"股票代码 {stock_code} 未在交易控制器中找到,跳过处理。\n")
continue
# 收集所有市场数据用于市场监控
for stock_code, tickData in data.items():
if stock_code in self.stock_trade_ctrl:
stock_controller: SFGridStrategy = self.stock_trade_ctrl[stock_code]
stock_controller.onDataUpdate(data)
else:
# 非目标交易,发布市场数据更新事件用于市场监控
lastPrice = tickData['lastPrice']
if lastPrice == 10 or stock_code in self.listening_stock:
# 发布市场数据更新事件用于市场监控
market_target = TradeTarget()
market_target.stock_code = stock_code
market_target.stock_name = getInstrumentName(stock_code) # type: ignore
market_target.market_price = lastPrice # type: ignore
event_bus.publish(MarketDataUpdate, market_target)
if stock_code not in self.listening_stock:
self.listening_stock.append(stock_code)
# ====== 市场回调方法 -- 以下方法由XtQuantTrader调用 ======
@@ -368,4 +399,4 @@ class SFGridController(XtQuantTraderCallback):
:param response: XtAccountStatus 对象
:return:
"""
print(datetime.datetime.now(), status)
print(datetime.datetime.now(), status)