This commit is contained in:
2025-11-12 10:15:14 +08:00
parent 0dbd8e8dde
commit ba9cd9a700
5 changed files with 94 additions and 210 deletions
-3
View File
@@ -180,9 +180,6 @@ class MainWindow:
# 更新Tab按钮样式(可选,用于视觉反馈)
self.update_tab_button_styles()
def update_tab_button_styles(self):
"""更新Tab按钮的样式以显示选中状态"""
+11 -1
View File
@@ -8,6 +8,7 @@ from core.logger import LogLevel, PrintLog
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant, xtdata
import core.eventbus as eBus
class QmtV(XtQuantTraderCallback):
def __init__(self) -> None:
@@ -41,6 +42,7 @@ class QmtV(XtQuantTraderCallback):
if subscribe_result != 0:
self.inited = False
return
self.startMarketDataSubscription()
def getStockPosition(self, stock_code: str):
@@ -126,11 +128,19 @@ class QmtV(XtQuantTraderCallback):
# ========================================#
def startMarketDataSubscription(self):
self.subscriptionId = xtdata.subscribe_whole_quote(['SH', 'SZ'], self.onDataUpdate)
def stopMarketDataSubscription(self):
PrintLog(LogLevel.INFO, '- 停止市场数据订阅')
if self.subscriptionId is not None and self.subscriptionId > 0:
xtdata.unsubscribe_quote(self.subscriptionId)
# ====== 市场回调方法 -- 以下方法由XtQuantData调用 ======
def onDataUpdate(self, data):
# 收集所有市场数据用于市场监控
print(f'market data update {len(data)}')
eBus.event_bus.publish(eBus.MarketDataUpdate, data)
# ====== 市场回调方法 -- 以下方法由XtQuantTrader调用 ======
-151
View File
@@ -1,151 +0,0 @@
# coding:utf-8
from typing import Any
from core.sfgrid.model import SFGridTradeTarget as TradeTarget
from .bus_events import ActionEventAddTradeTarget, ActionEventDeleteTradeTarget, ActionEventDisableTrade, ActionEventEnableTrade, EventTradeTargetUpdate, ResultEventTradeDisabled, ResultEventTradeEnabled, ResultEventTradeTargetAdded, ResultEventTradeTargetDeleted, ActionEventGridFix
from core.eventbus import event_bus, MarketDataEnabled, MarketDataDisabled, MarketDataUpdate
from xtquant import xttrader
from xtquant.xttrader import XtQuantTrader
import time
from peewee import ModelSelect
import core.sfgrid.model as model
import config
from core.sfgrid.sfgrid_strategy import SFGridStrategy
from core.util import getStockPosition, queryPendingOrder
from xtquant.xttrader import XtQuantTrader
from xtquant.xttype import StockAccount, XtAsset, XtOrder, XtPosition, XtTrade
from xtquant import xtdata
from xtquant.xttrader import XtQuantTraderCallback
import datetime
import core.sfgrid.sfgrid_ui as sfgrid_ui
from core.logger import PrintLog, LogLevel
from core.qmt import qmtv
from core.sfgrid.objects import GridFixData
# 量化核心控制对象
class SFGridController:
def __init__(self):
super().__init__()
self.registerEventHandler()
self.seq = None
PrintLog(LogLevel.INFO, '- [成功]三疯交易系统初始化完成')
def registerEventHandler(self):
event_bus.subscribe(ActionEventAddTradeTarget, self.onAddTradeTarget)
event_bus.subscribe(ActionEventDeleteTradeTarget, self.onDeleteTradeTarget)
event_bus.subscribe(ActionEventGridFix, self.onGridFix)
def onDeleteTradeTarget(self, target: TradeTarget):
"""处理删除交易标的事件"""
id = target.get_id()
try:
# 从数据库中删除
target.delete_instance()
PrintLog(LogLevel.INFO, f"已删除交易标的: id{id} {target.stock_code} - {target.stock_name}")
# 发布删除完成事件
event_bus.publish(ResultEventTradeTargetDeleted, target)
except Exception as e:
PrintLog(LogLevel.ERROR, f"删除交易标的失败 ID {id}: {str(e)}")
def onAddTradeTarget(self, stock_code: str):
"""处理添加交易标的事件"""
try:
stock_name = qmtv.getInstrumentName(stock_code)
if not stock_name:
PrintLog(LogLevel.ERROR, f'无法获取股票代码 {stock_code} 的名称,请检查代码是否正确')
return
# 检查是否已存在该标的
existing_target = TradeTarget.get_or_none(TradeTarget.stock_code == stock_code)
if existing_target:
PrintLog(LogLevel.INFO, f'交易标的 {stock_code} {stock_name} 已存在')
return
# 刷新标的持仓
pos = qmtv.getStockPosition(stock_code) # type: ignore
new_target = TradeTarget.create(
stock_name=stock_name,
stock_code=stock_code,
market_price=0.0,
current_position=pos,
grid_index=0,
last_trade_price=0.0,
plan_buy_price=0.0,
plan_sell_price=0.0,
current_order_price=0.0,
current_order_no='',
current_order_type=''
)
new_target.save()
# 更新标的池
event_bus.publish(ResultEventTradeTargetAdded, new_target)
except Exception as e:
PrintLog(LogLevel.ERROR, f'新增交易标的失败 {stock_code} {e}')
def onGridFix(self, data: GridFixData):
"""处理网格修正事件"""
self.update_trade_target_grid(data)
def update_trade_target_grid(self, data: GridFixData):
"""更新交易标的网格信息"""
try:
target = data.tradeTarget
grid_index = data.grid_index
# 更新数据库中的网格索引
target.grid_index = grid_index
target.save()
PrintLog(LogLevel.INFO, f"网格修正已应用: {target.stock_code} - {target.stock_name}, 网格索引: {grid_index}")
except Exception as e:
PrintLog(LogLevel.ERROR, f"网格修正更新失败: {str(e)}")
def stopMarketData(self):
PrintLog(LogLevel.INFO, '- 停止市场数据订阅')
if self.seq is not None and self.seq > 0:
xtdata.unsubscribe_quote(self.seq)
event_bus.publish(MarketDataDisabled, False)
# # 初始化指定标的交易控制器
# def start_stock_trade(self, tradeTarget: TradeTarget):
# # check existing thread
# if tradeTarget.stock_code in self.stock_trade_ctrl:
# tradeController: SFGridStrategy = self.stock_trade_ctrl[tradeTarget.stock_code]
# tradeTarget = tradeController.enabledTrading(True)
# self.instrument_pool[id] = tradeTarget
# event_bus.publish(ResultEventTradeEnabled, tradeTarget)
# else:
# PrintLog(LogLevel.INFO, f"\t创建标的交易控制器 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}")
# ====== 市场回调方法 -- 以下方法由XtQuantData调用 ======
# def onDataUpdate(self, data):
# # 收集所有市场数据用于市场监控
# for stock_code, tickData in data.items():
# if stock_code in self.stock_trade_ctrl:
# stock_controller: SFGridStrategy = self.stock_trade_ctrl[stock_code]
# stock_controller.onDataUpdate(data)
# else:
# # 非目标交易,发布市场数据更新事件用于市场监控
# lastPrice = tickData['lastPrice']
# if lastPrice == 10 or stock_code in self.listening_stock:
# # 发布市场数据更新事件用于市场监控
# market_target = TradeTarget()
# market_target.stock_code = stock_code
# market_target.stock_name = getInstrumentName(stock_code) # type: ignore
# market_target.market_price = lastPrice # type: ignore
# event_bus.publish(MarketDataUpdate, market_target)
# if stock_code not in self.listening_stock:
# self.listening_stock.append(stock_code)
+83 -55
View File
@@ -13,19 +13,16 @@ import configparser
import config
from core.sfgrid.objects import GridFixData
from core.qmt import qmtv
from core.sfgrid.sfgrid_controller import SFGridController
from core.sfgrid.sfgrid_strategy import SFGridStrategy
class TradeTargetUI(ttk.Frame):
def __init__(self, parent):
super().__init__(parent)
self.controller = SFGridController()
self.tradeTargetData:dict[int, SFGridTradeTarget] = {} # id->trade_target
self.strategy_ctrl:dict[int, SFGridStrategy] = {} # stock_code->trade_target
self.init_trade_target_pool()
self.registerEventHandler()
eBus.event_bus.subscribe(eBus.MarketDataUpdate, self.onMarketDataUpdated)
# 创建刷新线程标志
self.refresh_thread_running = False # 默认不启动刷新线程
@@ -38,6 +35,21 @@ class TradeTargetUI(ttk.Frame):
self.start_ui_refresh()
def onMarketDataUpdated(self, data):
# 更新市场监控数据
for item in data:
if item['stock_code'] in self.tradeTargetData:
# 更新交易标准池信息
continue
else:
# 监控10元的票
current_time = datetime.now().strftime("%H:%M:%S")
self.marketData[str(item['stock_code'])] = {
'stock_name': item['stock_name'],
'last_price': item['market_price'] if item['market_price'] is not None else 0.0,
'time': current_time
}
def refresh_targets(self):
# 更新标的池
results = SFGridTradeTarget.select()
@@ -59,19 +71,9 @@ class TradeTargetUI(ttk.Frame):
stockTradeController = SFGridStrategy(tradeTarget) # type: ignore
self.strategy_ctrl[id] = stockTradeController # pyright: ignore[reportArgumentType]
# eBus.event_bus.publish(eBus.EventTradeTargetUpdate, tradeTarget)
self.updateTradeTarget(tradeTarget)
PrintLog(LogLevel.INFO, f'- [成功]交易标的信息初始化, 共 {len(self.tradeTargetData)} 个标的')
def registerEventHandler(self):
eBus.event_bus.subscribe(EventTradeTargetUpdate, self.onTradeTargetUpdated)
# eBus.event_bus.subscribe(eBus.MarketDataUpdate, self.onMarketDataUpdated)
# eBus.event_bus.subscribe(eBus.ResultEventTradeEnabled, self.onTradeEnabled)
# eBus.event_bus.subscribe(eBus.ResultEventTradeDisabled, self.onTradeDisabled)
# eBus.event_bus.subscribe(eBus.MarketDataEnabled, self.onMarketDataToggled)
# eBus.event_bus.subscribe(eBus.MarketDataDisabled, self.onMarketDataToggled)
eBus.event_bus.subscribe(ResultEventTradeTargetDeleted, self.onTradeTargetDeleted)
eBus.event_bus.subscribe(ResultEventTradeTargetAdded, self.onTradeTargetUpdated)
def start_refresh_thread(self):
"""启动刷新线程"""
@@ -89,17 +91,6 @@ class TradeTargetUI(ttk.Frame):
def stop_refresh_thread(self):
"""停止刷新线程"""
self.refresh_thread_running = False
def onTradeTargetDeleted(self, target: SFGridTradeTarget):
"""处理交易标的删除完成事件"""
# 从本地数据中删除
id = target.get_id()
if id in self.tradeTargetData:
PrintLog(LogLevel.DEBUG, f"删除交易标的,ID: {id}")
del self.tradeTargetData[id]
del self.strategy_ctrl[id]
# 添加日志
PrintLog(LogLevel.INFO, f"交易标的已删除,ID: {id}")
def onTradeEnabled(self, target:SFGridTradeTarget):
@@ -109,19 +100,9 @@ class TradeTargetUI(ttk.Frame):
PrintLog(LogLevel.INFO, f"交易禁用: {target.stock_code} - {target.stock_name}")
def onTradeTargetUpdated(self, target: SFGridTradeTarget):
def updateTradeTarget(self, target: SFGridTradeTarget):
# 更新或添加数据到本地缓存
self.tradeTargetData[target.get_id()] = target
def onMarketDataUpdated(self, target: SFGridTradeTarget):
# 更新市场监控数据
current_time = datetime.now().strftime("%H:%M:%S")
self.marketData[str(target.stock_code)] = {
'stock_name': target.stock_name,
'last_price': target.market_price if target.market_price is not None else 0.0,
'time': current_time
}
def create_ui(self):
"""创建UI界面"""
@@ -193,7 +174,7 @@ class TradeTargetUI(ttk.Frame):
columns = ("ID",
"股票代码", "股票名称", "市场价", "持仓数量", "网格索引",
"最新成交价", "计划买入价", "计划卖出价", "当前订单价", "当前订单号", "当前订单类型",
"启用状态", "交易状态"
"交易状态"
)
self.trade_table = ttk.Treeview(parent, columns=columns, show='headings', height=15)
@@ -212,7 +193,6 @@ class TradeTargetUI(ttk.Frame):
"当前订单价": (90, tk.CENTER),
"当前订单号": (90, tk.CENTER),
"当前订单类型": (90, tk.CENTER),
"启用状态": (80, tk.CENTER),
"交易状态": (80, tk.CENTER)
}
@@ -310,19 +290,7 @@ class TradeTargetUI(ttk.Frame):
if result:
# 发布事件通知主控制器添加标的
eBus.event_bus.publish(ActionEventAddTradeTarget, stock_code)
PrintLog(LogLevel.INFO, f"已发送添加请求: {stock_code} - {stock_name}")
def get_status_indicator(self, target: SFGridTradeTarget) -> str:
"""获取状态指示器(带颜色色块的文本)"""
if target.status == 1:
# 绿色圆点表示交易中
return "🟢 已建仓"
elif target.status == 0:
# 黄色圆点表示暂停
return "🟡 未建仓"
else:
return "🔴 错误状态"
self.onAddTradeTarget(stock_code)
def get_trade_enabled_indicator(self, enabled: bool) -> str:
"""获取交易状态指示器"""
@@ -347,7 +315,6 @@ class TradeTargetUI(ttk.Frame):
'-' if target.current_order_price is None else f"{target.current_order_price:.3f}",
target.current_order_no,
target.current_order_type,
self.get_status_indicator(target),
self.get_trade_enabled_indicator(target.enabled) # type: ignore
]
@@ -466,9 +433,19 @@ class TradeTargetUI(ttk.Frame):
icon='warning'
)
id = target.get_id()
if result:
# 通过事件总线发出删除动作
eBus.event_bus.publish(ActionEventDeleteTradeTarget, target)
try:
# 从数据库中删除
target.delete_instance()
del self.tradeTargetData[id]
del self.strategy_ctrl[id]
# 添加日志
PrintLog(LogLevel.INFO, f"交易标的已删除,ID: {id} {target.targetName()}")
except Exception as e:
PrintLog(LogLevel.ERROR, f"删除交易标的失败 ID {id}: {str(e)}")
PrintLog(LogLevel.INFO, f"已发送删除请求: {target.stock_code} - {target.stock_name}")
def add_trade_target(self):
@@ -1060,4 +1037,55 @@ class TradeTargetUI(ttk.Frame):
# 更新持仓量状态
current_position = getattr(target, 'current_position')
self.update_position_status(current_position, required_position, position_status_label)
self.update_position_status(current_position, required_position, position_status_label)
def onAddTradeTarget(self, stock_code: str):
"""处理添加交易标的事件"""
try:
stock_name = qmtv.getInstrumentName(stock_code)
if not stock_name:
PrintLog(LogLevel.ERROR, f'无法获取股票代码 {stock_code} 的名称,请检查代码是否正确')
return
# 检查是否已存在该标的
existing_target = SFGridTradeTarget.get_or_none(SFGridTradeTarget.stock_code == stock_code)
if existing_target:
PrintLog(LogLevel.INFO, f'交易标的 {stock_code} {stock_name} 已存在')
return
# 刷新标的持仓
pos = qmtv.getStockPosition(stock_code) # type: ignore
new_target = SFGridTradeTarget.create(
stock_name=stock_name,
stock_code=stock_code,
market_price=0.0,
current_position=pos,
grid_index=0,
last_trade_price=0.0,
plan_buy_price=0.0,
plan_sell_price=0.0,
current_order_price=0.0,
current_order_no='',
current_order_type=''
)
new_target.save()
# 更新标的池
self.updateTradeTarget(new_target)
except Exception as e:
PrintLog(LogLevel.ERROR, f'新增交易标的失败 {stock_code} {e}')
def update_trade_target_grid(self, data: GridFixData):
"""更新交易标的网格信息"""
try:
target = data.tradeTarget
grid_index = data.grid_index
# 更新数据库中的网格索引
target.grid_index = grid_index
target.save()
PrintLog(LogLevel.INFO, f"网格修正已应用: {target.stock_code} - {target.stock_name}, 网格索引: {grid_index}")
except Exception as e:
PrintLog(LogLevel.ERROR, f"网格修正更新失败: {str(e)}")
View File