This commit is contained in:
2025-11-11 16:47:55 +08:00
parent c42648d1b4
commit 662a1ea7c1
13 changed files with 361 additions and 638 deletions
View File
+4 -6
View File
@@ -1,14 +1,12 @@
from peewee import SqliteDatabase, Model, CharField, IntegerField, FloatField, BooleanField
from peewee import SqliteDatabase, Model
from core.logger import LogLevel, PrintLog
from xtquant import xtconstant
# 连接到SQLite数据库
db = SqliteDatabase('example.db')
db: SqliteDatabase = SqliteDatabase('example.db')
db.connect()
PrintLog(LogLevel.INFO, '- [成功]数据库连接')
# 定义基础模型类
class BaseModel(Model):
class Meta:
database = db
database: SqliteDatabase = db
+2 -2
View File
@@ -1,7 +1,7 @@
from enum import Enum
from core.eventbus import EventPrintLog, event_bus
import sfgrid_config
import config
class LogLevel(Enum):
DEBUG = "DEBUG"
@@ -18,5 +18,5 @@ class LogData:
def PrintLog(level:LogLevel, message:str):
data = LogData(level, message)
event_bus.publish(EventPrintLog, data)
if sfgrid_config.console_log:
if config.console_log:
print(f'{level.value} {message}')
-306
View File
@@ -1,306 +0,0 @@
# coding:utf-8
from core.sfgrid.model import TradeTarget
from core.eventbus import ActionDisableMarketData, ActionEnableMarketData, ActionEventAddTradeTarget, ActionEventDeleteTradeTarget, ActionEventDisableTrade, ActionEventEnableTrade, EventTradeTargetUpdate, MarketDataUpdate, MarketDataEnabled, MarketDataDisabled, ResultEventTradeDisabled, ResultEventTradeEnabled, ResultEventTradeTargetDeleted, ActionEventGridFix, event_bus
from xtquant.xttrader import XtQuantTrader
import time
from peewee import ModelSelect
import core.sfgrid.model as model
import sfgrid_config
from core.sfgrid.sfgrid_strategy import SFGridStrategy
from core.util import getInstrumentName, getStockPosition, queryPendingOrder
from xtquant.xttrader import XtQuantTrader
from xtquant.xttype import StockAccount, XtAsset, XtOrder, XtPosition, XtTrade
from xtquant import xtdata
from xtquant.xttrader import XtQuantTraderCallback
import datetime
import core.sfgrid.ui as ui
from core.logger import PrintLog, LogLevel
from core.sfgrid.objects import GridFixData
# 量化核心控制对象
class MainController(XtQuantTraderCallback):
def __init__(self, account_no: str, miniQmtPath: str):
super().__init__()
self.registerEventHandler()
# self.appUi = ui.TradeTargetUI()
xtdata.enable_hello = False
session_id = int(time.time())
self.xt_trader: XtQuantTrader = XtQuantTrader(miniQmtPath, session_id)
self.xt_trader.register_callback(self)
self.xt_trader.start()
self.xt_trader.connect()
PrintLog(LogLevel.INFO, f'- [{'成功' if self.xt_trader.connected else '失败'}]市场交易连接: {miniQmtPath}')
if self.xt_trader.connected == False:
self.inited: bool = False
return
else:
self.inited = True
self.account= StockAccount(account_no, 'STOCK')
PrintLog(LogLevel.INFO, f'- [成功]交易账号对象初始化完成, 账号: {self.account.account_id}') # pyright: ignore[reportAttributeAccessIssue]
subscribe_result = self.xt_trader.subscribe(self.account)
PrintLog(LogLevel.INFO, f'- [{'成功' if subscribe_result == 0 else '失败'}:{subscribe_result}]交易状态订阅')
if subscribe_result == 0:
self.inited = True
else:
self.inited = False
return
self.listening_stock = []
self.stock_trade_ctrl = {}
self.init_instrument_pool(self.xt_trader, self.account) # type: ignore
self.seq = None
PrintLog(LogLevel.INFO, '- [成功]三疯交易系统初始化完成')
self.startMarketData()
def registerEventHandler(self):
event_bus.subscribe(ActionEventEnableTrade, self.onEnableTrade)
event_bus.subscribe(ActionEventDisableTrade, self.onDisableTrade)
event_bus.subscribe(ActionEnableMarketData, self.onMarketDataEnabled)
event_bus.subscribe(ActionDisableMarketData, self.onMarketDataDisabled)
event_bus.subscribe(ActionEventAddTradeTarget, self.onAddTradeTarget)
event_bus.subscribe(ActionEventDeleteTradeTarget, self.onDeleteTradeTarget)
event_bus.subscribe(ActionEventGridFix, self.onGridFix)
def onDeleteTradeTarget(self, id: int):
"""处理删除交易标的事件"""
self.del_trade_target(id)
# 发布删除完成事件
event_bus.publish(ResultEventTradeTargetDeleted, id)
def onAddTradeTarget(self, stock_code: str):
"""处理添加交易标的事件"""
self.add_trade_target(stock_code)
def onMarketDataEnabled(self, data):
"""处理市场数据监听启用事件"""
self.startMarketData()
def onMarketDataDisabled(self, data):
"""处理市场数据监听禁用事件"""
self.stopMarketData()
def onEnableTrade(self, id: int):
self.start_stock_trade(id)
def onDisableTrade(self, id: int):
self.pause_stock_trade(id)
def onGridFix(self, data: GridFixData):
"""处理网格修正事件"""
self.update_trade_target_grid(data)
def update_trade_target_grid(self, data: GridFixData):
"""更新交易标的网格信息"""
try:
target = data.tradeTarget
grid_index = data.grid_index
# 更新数据库中的网格索引
target.grid_index = grid_index
target.save()
# 更新内存中的交易标的
if target.get_id() in self.instrument_pool:
self.instrument_pool[target.get_id()] = target
# 更新交易控制器中的网格信息
if target.stock_code in self.stock_trade_ctrl:
trade_controller: SFGridStrategy = self.stock_trade_ctrl[target.stock_code]
trade_controller.updateGridIndex(grid_index) # type: ignore
PrintLog(LogLevel.INFO, f"网格修正已应用: {target.stock_code} - {target.stock_name}, 网格索引: {grid_index}")
except Exception as e:
PrintLog(LogLevel.ERROR, f"网格修正更新失败: {str(e)}")
def startMarketData(self):
PrintLog(LogLevel.INFO, '- 启动市场数据订阅')
# self.seq = xtdata.subscribe_whole_quote(['SH', 'SZ'], callback=self.onDataUpdate)
if self.seq == -1:
PrintLog(LogLevel.ERROR, '- 市场数据订阅失败')
else:
event_bus.publish(MarketDataEnabled, True)
PrintLog(LogLevel.INFO, f'- 市场数据订阅成功, 订阅号={self.seq}')
def stopMarketData(self):
PrintLog(LogLevel.INFO, '- 停止市场数据订阅')
if self.seq is not None and self.seq > 0:
xtdata.unsubscribe_quote(self.seq)
event_bus.publish(MarketDataDisabled, False)
def add_trade_target(self, stock_code: str):
try:
stock_name = getInstrumentName(stock_code)
if not stock_name:
PrintLog(LogLevel.ERROR, f'无法获取股票代码 {stock_code} 的名称,请检查代码是否正确')
return
# 检查是否已存在该标的
existing_target = model.TradeTarget.get_or_none(model.TradeTarget.stock_code == stock_code)
if existing_target:
PrintLog(LogLevel.INFO, f'交易标的 {stock_code} {stock_name} 已存在')
return
new_target = model.TradeTarget.create(
stock_name=stock_name,
stock_code=stock_code,
market_price=0.0,
current_position=0,
grid_index=0,
last_trade_price=0.0,
plan_buy_price=0.0,
plan_sell_price=0.0,
current_order_price=0.0,
current_order_no='',
current_order_type=''
)
new_target.save()
PrintLog(LogLevel.INFO, f'新增交易标的 {stock_code} {stock_name}, {new_target.id}')
# 刷新标的持仓
pos = getStockPosition(stock_code, self.xt_trader, self.account) # type: ignore
model.TradeTarget.update(current_position=pos).where(model.TradeTarget.stock_code == stock_code).execute()
# 更新标的池
self.refresh_targets()
# 添加交易控制器
stockTradeController = SFGridStrategy(new_target, self.xt_trader, self.account) # type: ignore
self.stock_trade_ctrl[stock_code] = stockTradeController
except Exception as e:
PrintLog(LogLevel.ERROR, f'新增交易标的失败 {stock_code} {e}')
def del_trade_target(self, id:int):
try:
# 检查标的是否存在
if id not in self.instrument_pool:
PrintLog(LogLevel.ERROR, f"交易标的 ID {id} 不存在")
return
target: model.TradeTarget = self.instrument_pool[id]
# 如果存在交易控制器,先停止交易
if target.stock_code in self.stock_trade_ctrl:
# 停止交易控制器
del self.stock_trade_ctrl[target.stock_code]
# 从数据库中删除
target.delete_instance()
# 从内存中删除
del self.instrument_pool[id]
# 刷新标的池
self.refresh_targets()
PrintLog(LogLevel.INFO, f"已删除交易标的: {target.stock_code} - {target.stock_name}")
except Exception as e:
PrintLog(LogLevel.ERROR, f"删除交易标的失败 ID {id}: {str(e)}")
def init_instrument_pool(self, xtTrader:XtQuantTrader, account:StockAccount):
self.refresh_targets()
for id in self.instrument_pool:
target:TradeTarget = self.instrument_pool[id]
status = "新建" if target.status == 0 else "已建初始仓"
PrintLog(LogLevel.INFO, f' [序号-{id}] 股票代码: {target.stock_code}-{target.stock_name} 当前持仓: {getStockPosition(target.stock_code, self.xt_trader, self.account)} 网格索引: {target.grid_index} 基准价格 {sfgrid_config.grid_price[target.grid_index]} 状态: {status} 启用交易线程: {'自动交易中' if target.enabled else '交易已停止'}') # type: ignore
tradeTarget:model.TradeTarget = self.instrument_pool[id]
tradeTarget.current_position = getStockPosition(tradeTarget.stock_code, xtTrader, account) # type: ignore
result = tradeTarget.save()
PrintLog(LogLevel.INFO, f' |- 同步[{target.stock_code}-{target.stock_name}]持仓信息[{'成功' if result == 1 else '失败'}]')
stockTradeController = SFGridStrategy(tradeTarget, self.xt_trader, self.account) # type: ignore
self.stock_trade_ctrl[tradeTarget.stock_code] = stockTradeController
event_bus.publish(EventTradeTargetUpdate, tradeTarget)
PrintLog(LogLevel.INFO, f'- [成功]交易标的信息初始化, 共 {len(self.instrument_pool)} 个标的')
def refresh_targets(self):
# 更新标的池
results:ModelSelect = model.TradeTarget.select()
self.instrument_pool: dict[int, model.TradeTarget] = {}
for temp in results:
result :model.TradeTarget = temp
self.instrument_pool[result.get_id()] = result
def print_position_info(self):
positions:list[XtPosition] = self.xt_trader.query_stock_positions(self.account)
if positions:
PrintLog(LogLevel.INFO, "\n- 持仓信息")
for temp in positions:
pos : XtPosition = temp
if pos.volume <=0:
continue
PrintLog(LogLevel.INFO, f"股票代码: {pos.stock_code}-{getInstrumentName(pos.stock_code)}")
PrintLog(LogLevel.INFO, f"总持仓: {pos.volume}")
PrintLog(LogLevel.INFO, f"可用持仓: {pos.can_use_volume}")
PrintLog(LogLevel.INFO, f"持仓成本: {pos.avg_price}")
PrintLog(LogLevel.INFO, "---")
else:
PrintLog(LogLevel.INFO, "\n当前无持仓")
def print_account_info(self):
temp = self.xt_trader.query_stock_asset(self.account)
asset: XtAsset = temp # type: ignore
PrintLog(LogLevel.INFO, f"=== 账户信息 {self.account.account_id} ===") # type: ignore
PrintLog(LogLevel.INFO, f"可用资金: {asset.cash}")
PrintLog(LogLevel.INFO, f"总资产: {asset.total_asset}")
PrintLog(LogLevel.INFO, f"证券市值: {asset.market_value}")
def print_stock_orders(self):
orders = self.xt_trader.query_stock_orders(self.account, cancelable_only=True)
if orders:
PrintLog(LogLevel.INFO, "\n=== 委托信息 ===")
for order in orders:
PrintLog(LogLevel.INFO, f"委托编号: {order.order_id}")
PrintLog(LogLevel.INFO, f"股票代码: {order.stock_code} {getInstrumentName(order.stock_code)}")
PrintLog(LogLevel.INFO, f"委托方向: {order.offset_flag} ")
PrintLog(LogLevel.INFO, f"委托价格: {order.price}")
PrintLog(LogLevel.INFO, f"委托数量: {order.order_volume}")
PrintLog(LogLevel.INFO, f"已成交数量: {order.traded_volume}")
PrintLog(LogLevel.INFO, f"委托状态: {order.order_status} ")
PrintLog(LogLevel.INFO, "---")
else:
PrintLog(LogLevel.INFO, "\n当前无委托记录")
# 初始化指定标的交易控制器
def start_stock_trade(self, id: int):
tradeTarget: TradeTarget = self.instrument_pool[id]
# check existing thread
if tradeTarget.stock_code in self.stock_trade_ctrl:
tradeController: SFGridStrategy = self.stock_trade_ctrl[tradeTarget.stock_code]
tradeTarget = tradeController.enabledTrading(True)
self.instrument_pool[id] = tradeTarget
event_bus.publish(ResultEventTradeEnabled, tradeTarget)
else:
PrintLog(LogLevel.INFO, f"\t创建标的交易控制器 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}")
def pause_stock_trade(self, id: int):
localTarget: model.TradeTarget = self.instrument_pool[id]
print(f'暂停标的交易 {localTarget.stock_code} - enabled {localTarget.enabled}')
if localTarget.stock_code in self.stock_trade_ctrl:
tradeController: SFGridStrategy = self.stock_trade_ctrl[localTarget.stock_code]
tradeTarget = tradeController.enabledTrading(False)
orders = queryPendingOrder(localTarget.stock_code, tradeController.getName(), self.xt_trader, self.account) # type: ignore
for order in orders:
self.xt_trader.cancel_order_stock_async(self.account, order.order_id)
print(f'取消未成交订单 {len(orders)}')
self.instrument_pool[id] = tradeTarget
event_bus.publish(ResultEventTradeDisabled, tradeTarget)
else:
print(f"标的交易控制器不存在 {localTarget.stock_code} {localTarget.stock_name}\n")
+4 -110
View File
@@ -2,14 +2,14 @@ import time
import tkinter as tk
from tkinter import ttk
from core.logger import LogLevel, PrintLog
from core.sfgrid.ui import TradeTargetUI
import sfgrid_config
from core.sfgrid.sfgrid_ui import TradeTargetUI
import config
from xtquant import xtdata
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
import datetime
from xtquant.xttype import StockAccount, XtAsset, XtOrder, XtOrderResponse, XtPosition, XtTrade
class MainWindow(XtQuantTraderCallback):
class MainWindow:
def __init__(self):
self.root = tk.Tk()
self.root.title("三疯交易系统")
@@ -21,36 +21,9 @@ class MainWindow(XtQuantTraderCallback):
self.strategy_frames = {}
# 日志面板可见性标志
self.log_visible = False
self.initQmt()
# 创建界面
self.create_ui()
def initQmt(self):
xtdata.enable_hello = False
session_id = int(time.time())
self.xt_trader: XtQuantTrader = XtQuantTrader(sfgrid_config.miniQMTPath, session_id)
self.xt_trader.register_callback(self)
self.xt_trader.start()
self.xt_trader.connect()
PrintLog(LogLevel.INFO, f'- [{'成功' if self.xt_trader.connected else '失败'}]市场交易连接: {sfgrid_config.miniQMTPath}')
if self.xt_trader.connected == False:
self.inited: bool = False
return
else:
self.inited = True
self.account= StockAccount(sfgrid_config.account_no, 'STOCK')
PrintLog(LogLevel.INFO, f'- [成功]交易账号对象初始化完成, 账号: {self.account.account_id}') # pyright: ignore[reportAttributeAccessIssue]
subscribe_result = self.xt_trader.subscribe(self.account)
PrintLog(LogLevel.INFO, f'- [{'成功' if subscribe_result == 0 else '失败'}:{subscribe_result}]交易状态订阅')
if subscribe_result == 0:
self.inited = True
else:
self.inited = False
return
def create_ui(self):
"""创建UI界面"""
@@ -181,7 +154,7 @@ class MainWindow(XtQuantTraderCallback):
for idx, name in enumerate(strategy_names):
if idx == 0:
# 第一个Tab使用TradeTargetUI,传入main_window引用
frame = TradeTargetUI(self.content_container, self)
frame = TradeTargetUI(self.content_container)
self.strategy_frames[idx] = frame
else:
# 其他策略使用占位Frame
@@ -246,82 +219,3 @@ class MainWindow(XtQuantTraderCallback):
"""运行程序"""
self.root.mainloop()
# ====== 市场回调方法 -- 以下方法由XtQuantData调用 ======
def onDataUpdate(self, data):
# 收集所有市场数据用于市场监控
print(f'market data update {len(data)}')
# ====== 市场回调方法 -- 以下方法由XtQuantTrader调用 ======
def on_connected(self):
"""
连接成功推送
"""
print(datetime.datetime.now(), '连接成功回调')
def on_disconnected(self):
"""
连接断开
:return:
"""
print(datetime.datetime.now(), '连接断开回调')
def on_stock_order(self, order:XtOrder):
"""
委托回报推送
:param order: XtOrder对象
:return:
"""
print(f'orderd {order.strategy_name}-{order.stock_code} {order.order_id} {order.order_volume}-{order.order_status}')
# stockCode = order.stock_code
# ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode]
# # 如果存在对应的StockTradeController,则调用其onDataUpdate方法
# if ctrl is not None and order.strategy_name == ctrl.getName():
# print(f'controller info {ctrl.getName()}')
# ctrl.onAsyncOrderResponse(order) # type: ignore
# else:
# print(f"委托下单回调 投资备注 orderId: {order.order_sysid} [{order.stock_code}-{order.instrument_name}] volume: {order.order_volume} 订单策略: '{order.strategy_name}'<-->'{ctrl.getName()}'")
def on_stock_trade(self, trade:XtTrade):
"""
成交变动推送
:param trade: XtTrade对象
:return:
"""
print(f"委托回调 投资备注 {trade.stock_code}-{trade.instrument_name} {trade.strategy_name} 不匹配 {trade.order_remark}")
# stockCode = trade.stock_code
# ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode]
# # 如果存在对应的StockTradeController,则调用其onDataUpdate方法
# if ctrl is not None and trade.strategy_name == ctrl.getName():
# ctrl.onOrderTrade(trade)
# else:
# print(f"委托回调 投资备注 {trade.strategy_name} 不匹配 {ctrl.getName()}")
def on_order_stock_async_response(self, response:XtOrderResponse):
print(f"委托回调 投资备注 {response.error_msg}{response.strategy_name} {response.order_remark}")
# stockCode = response.order_remark
# ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode]
# # 如果存在对应的StockTradeController,则调用其onDataUpdate方法
# if ctrl is not None and response.strategy_name == ctrl.getName():
# ctrl.onAsyncOrderResponse(response)
# else:
# print(f"委托回调 投资备注 {response.strategy_name} 不匹配 {ctrl.getName()}")
def on_order_error(self, order_error):
"""
委托失败推送
:param order_error:XtOrderError 对象
:return:
"""
print(f"\n委托报错回调 {order_error.order_remark} {order_error.error_msg}")
def on_account_status(self, status):
"""
:param response: XtAccountStatus 对象
:return:
"""
print(datetime.datetime.now(), status)
+203
View File
@@ -0,0 +1,203 @@
import datetime
import time
import config
from xtquant.xttype import StockAccount, XtOrder, XtOrderResponse, XtPosition, XtTrade
from xtquant.xttrader import XtQuantTrader
from xtquant.xttype import StockAccount
from core.logger import LogLevel, PrintLog
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
from xtquant.xttype import StockAccount
from xtquant import xtconstant, xtdata
class QmtV(XtQuantTraderCallback):
def __init__(self) -> None:
self.xttrader: XtQuantTrader
self.inited: bool = False
def getTrader(self) -> XtQuantTrader:
return self.xttrader
def init_qmtv(self):
sessionId= int(time.time())
self.xttrader = XtQuantTrader(config.miniQMTPath, sessionId)
xtdata.enable_hello = False
def connect(self):
self.xttrader.register_callback(self)
self.xttrader.start()
self.xttrader.connect()
PrintLog(LogLevel.INFO, f'- [{'成功' if self.xttrader.connected else '失败'}]市场交易连接: {config.miniQMTPath}')
if self.xttrader.connected == False:
self.inited = False
return
else:
self.inited = True
self.account = StockAccount(config.account_no, 'STOCK') # pyright: ignore[reportAssignmentType, reportAttributeAccessIssue]
PrintLog(LogLevel.INFO, f'- [成功]交易账号对象初始化完成, 账号: {config.account_no}') # pyright: ignore[reportOptionalMemberAccess]
subscribe_result = self.xttrader.subscribe(self.account)
PrintLog(LogLevel.INFO, f'- [{'成功' if subscribe_result == 0 else '失败'}:{subscribe_result}]交易状态订阅')
if subscribe_result != 0:
self.inited = False
return
def getStockPosition(self, stock_code: str):
volume = 0
print(f'获取股票持仓: {stock_code}, {self.xttrader.connected}, {self.account.account_id if self.account else None}') # pyright: ignore[reportAttributeAccessIssue]
positions = self.xttrader.query_stock_positions(self.account)
if positions:
for temp in positions:
pos:XtPosition = temp
if pos.stock_code == stock_code:
volume = pos.volume
break
return volume
def queryPendingOrder(self, stock_code:str, tag: str) -> list[XtOrder]:
if stock_code == None or tag == None:
return []
orders = self.xttrader.query_stock_orders(self.account)
result = [order for order in orders if order.order_status == xtconstant.ORDER_REPORTED and order.stock_code == stock_code and order.strategy_name == tag]
return result
def orderAsync(self, stock_code, orderVolume, orderType, orderPrice, priceType, orderRemark, strategy_name):
return self.xttrader.order_stock_async(
self.account,
str(stock_code),
orderType,
orderVolume,
priceType,
orderPrice,
strategy_name, # strategy_name
orderRemark # remark # type: ignore
)
# def print_position_info(self):
# positions:list[XtPosition] = self.xt_trader.query_stock_positions(self.account)
# if positions:
# PrintLog(LogLevel.INFO, "\n- 持仓信息")
# for temp in positions:
# pos : XtPosition = temp
# if pos.volume <=0:
# continue
# PrintLog(LogLevel.INFO, f"股票代码: {pos.stock_code}-{getInstrumentName(pos.stock_code)}")
# PrintLog(LogLevel.INFO, f"总持仓: {pos.volume}")
# PrintLog(LogLevel.INFO, f"可用持仓: {pos.can_use_volume}")
# PrintLog(LogLevel.INFO, f"持仓成本: {pos.avg_price}")
# PrintLog(LogLevel.INFO, "---")
# else:
# PrintLog(LogLevel.INFO, "\n当前无持仓")
# def print_account_info(self):
# temp = self.xt_trader.query_stock_asset(self.account)
# asset: XtAsset = temp # type: ignore
# PrintLog(LogLevel.INFO, f"=== 账户信息 {self.account.account_id} ===") # type: ignore
# PrintLog(LogLevel.INFO, f"可用资金: {asset.cash}")
# PrintLog(LogLevel.INFO, f"总资产: {asset.total_asset}")
# PrintLog(LogLevel.INFO, f"证券市值: {asset.market_value}")
# def print_stock_orders(self):
# orders = self.xt_trader.query_stock_orders(self.account, cancelable_only=True)
# if orders:
# PrintLog(LogLevel.INFO, "\n=== 委托信息 ===")
# for order in orders:
# PrintLog(LogLevel.INFO, f"委托编号: {order.order_id}")
# PrintLog(LogLevel.INFO, f"股票代码: {order.stock_code} {getInstrumentName(order.stock_code)}")
# PrintLog(LogLevel.INFO, f"委托方向: {order.offset_flag} ")
# PrintLog(LogLevel.INFO, f"委托价格: {order.price}")
# PrintLog(LogLevel.INFO, f"委托数量: {order.order_volume}")
# PrintLog(LogLevel.INFO, f"已成交数量: {order.traded_volume}")
# PrintLog(LogLevel.INFO, f"委托状态: {order.order_status} ")
# PrintLog(LogLevel.INFO, "---")
# else:
# PrintLog(LogLevel.INFO, "\n当前无委托记录")
# ========================================#
# ====== 市场回调方法 -- 以下方法由XtQuantData调用 ======
def onDataUpdate(self, data):
# 收集所有市场数据用于市场监控
print(f'market data update {len(data)}')
# ====== 市场回调方法 -- 以下方法由XtQuantTrader调用 ======
def on_connected(self):
"""
连接成功推送
"""
print(datetime.datetime.now(), '连接成功回调')
def on_disconnected(self):
"""
连接断开
:return:
"""
print(datetime.datetime.now(), '连接断开回调')
def on_stock_order(self, order:XtOrder):
"""
委托回报推送
:param order: XtOrder对象
:return:
"""
print(f'orderd {order.strategy_name}-{order.stock_code} {order.order_id} {order.order_volume}-{order.order_status}')
# stockCode = order.stock_code
# ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode]
# # 如果存在对应的StockTradeController,则调用其onDataUpdate方法
# if ctrl is not None and order.strategy_name == ctrl.getName():
# print(f'controller info {ctrl.getName()}')
# ctrl.onAsyncOrderResponse(order) # type: ignore
# else:
# print(f"委托下单回调 投资备注 orderId: {order.order_sysid} [{order.stock_code}-{order.instrument_name}] volume: {order.order_volume} 订单策略: '{order.strategy_name}'<-->'{ctrl.getName()}'")
def on_stock_trade(self, trade:XtTrade):
"""
成交变动推送
:param trade: XtTrade对象
:return:
"""
print(f"委托回调 投资备注 {trade.stock_code}-{trade.instrument_name} {trade.strategy_name} 不匹配 {trade.order_remark}")
# stockCode = trade.stock_code
# ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode]
# # 如果存在对应的StockTradeController,则调用其onDataUpdate方法
# if ctrl is not None and trade.strategy_name == ctrl.getName():
# ctrl.onOrderTrade(trade)
# else:
# print(f"委托回调 投资备注 {trade.strategy_name} 不匹配 {ctrl.getName()}")
def on_order_stock_async_response(self, response:XtOrderResponse):
print(f"委托回调 投资备注 {response.error_msg}{response.strategy_name} {response.order_remark}")
# stockCode = response.order_remark
# ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode]
# # 如果存在对应的StockTradeController,则调用其onDataUpdate方法
# if ctrl is not None and response.strategy_name == ctrl.getName():
# ctrl.onAsyncOrderResponse(response)
# else:
# print(f"委托回调 投资备注 {response.strategy_name} 不匹配 {ctrl.getName()}")
def on_order_error(self, order_error):
"""
委托失败推送
:param order_error:XtOrderError 对象
:return:
"""
print(f"\n委托报错回调 {order_error.order_remark} {order_error.error_msg}")
def on_account_status(self, status):
"""
:param response: XtAccountStatus 对象
:return:
"""
print(datetime.datetime.now(), status)
qmtv = QmtV()
+2 -2
View File
@@ -4,7 +4,7 @@ from core.database import BaseModel, db
# 定义Target类,对应targets表
class TradeTarget(BaseModel):
class SFGridTradeTarget(BaseModel):
stock_code = CharField(unique=True)
stock_name = CharField()
market_price = FloatField()
@@ -23,4 +23,4 @@ class TradeTarget(BaseModel):
return f'{self.stock_name}[{self.stock_code}]'
db.create_tables([TradeTarget])
db.create_tables([SFGridTradeTarget])
+1 -1
View File
@@ -1,4 +1,4 @@
from core.sfgrid.model import TradeTarget
from core.sfgrid.model import SFGridTradeTarget as TradeTarget
class GridFixData:
@@ -1,12 +1,13 @@
# coding:utf-8
from core.sfgrid.model import TradeTarget
from core.sfgrid.model import SFGridTradeTarget as TradeTarget
from core.eventbus import ActionDisableMarketData, ActionEnableMarketData, ActionEventAddTradeTarget, ActionEventDeleteTradeTarget, ActionEventDisableTrade, ActionEventEnableTrade, EventTradeTargetUpdate, MarketDataUpdate, MarketDataEnabled, MarketDataDisabled, ResultEventTradeDisabled, ResultEventTradeEnabled, ResultEventTradeTargetDeleted, ActionEventGridFix, event_bus
from xtquant import xttrader
from xtquant.xttrader import XtQuantTrader
import time
from peewee import ModelSelect
import core.sfgrid.model as model
import sfgrid_config
import config
from core.sfgrid.sfgrid_strategy import SFGridStrategy
from core.util import getInstrumentName, getStockPosition, queryPendingOrder
from xtquant.xttrader import XtQuantTrader
@@ -14,45 +15,21 @@ from xtquant.xttype import StockAccount, XtAsset, XtOrder, XtPosition, XtTrade
from xtquant import xtdata
from xtquant.xttrader import XtQuantTraderCallback
import datetime
import core.sfgrid.ui as ui
import core.sfgrid.sfgrid_ui as sfgrid_ui
from core.logger import PrintLog, LogLevel
from core.qmt import qmtv
from core.sfgrid.objects import GridFixData
# 量化核心控制对象
class SFGridController(XtQuantTraderCallback):
def __init__(self, account_no: str, miniQmtPath: str):
def __init__(self):
super().__init__()
self.registerEventHandler()
self.appUi = ui.TradeTargetUI()
xtdata.enable_hello = False
session_id = int(time.time())
self.xt_trader: XtQuantTrader = XtQuantTrader(miniQmtPath, session_id)
self.xt_trader.register_callback(self)
self.xt_trader.start()
self.xt_trader.connect()
PrintLog(LogLevel.INFO, f'- [{'成功' if self.xt_trader.connected else '失败'}]市场交易连接: {miniQmtPath}')
if self.xt_trader.connected == False:
self.inited: bool = False
return
else:
self.inited = True
self.account= StockAccount(account_no, 'STOCK')
PrintLog(LogLevel.INFO, f'- [成功]交易账号对象初始化完成, 账号: {self.account.account_id}') # pyright: ignore[reportAttributeAccessIssue]
subscribe_result = self.xt_trader.subscribe(self.account)
PrintLog(LogLevel.INFO, f'- [{'成功' if subscribe_result == 0 else '失败'}:{subscribe_result}]交易状态订阅')
if subscribe_result == 0:
self.inited = True
else:
self.inited = False
return
self.listening_stock = []
self.stock_trade_ctrl = {}
self.init_instrument_pool(self.xt_trader, self.account) # type: ignore
self.init_instrument_pool(qmtv.xttrader, qmtv.account) # type: ignore
self.seq = None
PrintLog(LogLevel.INFO, '- [成功]三疯交易系统初始化完成')
@@ -118,9 +95,6 @@ class SFGridController(XtQuantTraderCallback):
except Exception as e:
PrintLog(LogLevel.ERROR, f"网格修正更新失败: {str(e)}")
def hold(self):
self.appUi.run()
def startMarketData(self):
PrintLog(LogLevel.INFO, '- 启动市场数据订阅')
@@ -149,12 +123,12 @@ class SFGridController(XtQuantTraderCallback):
return
# 检查是否已存在该标的
existing_target = model.TradeTarget.get_or_none(model.TradeTarget.stock_code == stock_code)
existing_target = TradeTarget.get_or_none(TradeTarget.stock_code == stock_code)
if existing_target:
PrintLog(LogLevel.INFO, f'交易标的 {stock_code} {stock_name} 已存在')
return
new_target = model.TradeTarget.create(
new_target = TradeTarget.create(
stock_name=stock_name,
stock_code=stock_code,
market_price=0.0,
@@ -171,7 +145,7 @@ class SFGridController(XtQuantTraderCallback):
PrintLog(LogLevel.INFO, f'新增交易标的 {stock_code} {stock_name}, {new_target.id}')
# 刷新标的持仓
pos = getStockPosition(stock_code, self.xt_trader, self.account) # type: ignore
model.TradeTarget.update(current_position=pos).where(model.TradeTarget.stock_code == stock_code).execute()
TradeTarget.update(current_position=pos).where(TradeTarget.stock_code == stock_code).execute()
# 更新标的池
self.refresh_targets()
# 添加交易控制器
@@ -189,7 +163,7 @@ class SFGridController(XtQuantTraderCallback):
PrintLog(LogLevel.ERROR, f"交易标的 ID {id} 不存在")
return
target: model.TradeTarget = self.instrument_pool[id]
target: TradeTarget = self.instrument_pool[id]
# 如果存在交易控制器,先停止交易
if target.stock_code in self.stock_trade_ctrl:
@@ -215,9 +189,9 @@ class SFGridController(XtQuantTraderCallback):
for id in self.instrument_pool:
target:TradeTarget = self.instrument_pool[id]
status = "新建" if target.status == 0 else "已建初始仓"
PrintLog(LogLevel.INFO, f' [序号-{id}] 股票代码: {target.stock_code}-{target.stock_name} 当前持仓: {getStockPosition(target.stock_code, self.xt_trader, self.account)} 网格索引: {target.grid_index} 基准价格 {sfgrid_config.grid_price[target.grid_index]} 状态: {status} 启用交易线程: {'自动交易中' if target.enabled else '交易已停止'}') # type: ignore
PrintLog(LogLevel.INFO, f' [序号-{id}] 股票代码: {target.stock_code}-{target.stock_name} 当前持仓: {getStockPosition(target.stock_code, self.xt_trader, self.account)} 网格索引: {target.grid_index} 基准价格 {config.grid_price[target.grid_index]} 状态: {status} 启用交易线程: {'自动交易中' if target.enabled else '交易已停止'}') # type: ignore
tradeTarget:model.TradeTarget = self.instrument_pool[id]
tradeTarget:TradeTarget = self.instrument_pool[id]
tradeTarget.current_position = getStockPosition(tradeTarget.stock_code, xtTrader, account) # type: ignore
result = tradeTarget.save()
PrintLog(LogLevel.INFO, f' |- 同步[{target.stock_code}-{target.stock_name}]持仓信息[{'成功' if result == 1 else '失败'}]')
@@ -230,53 +204,12 @@ class SFGridController(XtQuantTraderCallback):
def refresh_targets(self):
# 更新标的池
results:ModelSelect = model.TradeTarget.select()
self.instrument_pool: dict[int, model.TradeTarget] = {}
results:ModelSelect = TradeTarget.select()
self.instrument_pool: dict[int, TradeTarget] = {}
for temp in results:
result :model.TradeTarget = temp
result :TradeTarget = temp
self.instrument_pool[result.get_id()] = result
def print_position_info(self):
positions:list[XtPosition] = self.xt_trader.query_stock_positions(self.account)
if positions:
PrintLog(LogLevel.INFO, "\n- 持仓信息")
for temp in positions:
pos : XtPosition = temp
if pos.volume <=0:
continue
PrintLog(LogLevel.INFO, f"股票代码: {pos.stock_code}-{getInstrumentName(pos.stock_code)}")
PrintLog(LogLevel.INFO, f"总持仓: {pos.volume}")
PrintLog(LogLevel.INFO, f"可用持仓: {pos.can_use_volume}")
PrintLog(LogLevel.INFO, f"持仓成本: {pos.avg_price}")
PrintLog(LogLevel.INFO, "---")
else:
PrintLog(LogLevel.INFO, "\n当前无持仓")
def print_account_info(self):
temp = self.xt_trader.query_stock_asset(self.account)
asset: XtAsset = temp # type: ignore
PrintLog(LogLevel.INFO, f"=== 账户信息 {self.account.account_id} ===") # type: ignore
PrintLog(LogLevel.INFO, f"可用资金: {asset.cash}")
PrintLog(LogLevel.INFO, f"总资产: {asset.total_asset}")
PrintLog(LogLevel.INFO, f"证券市值: {asset.market_value}")
def print_stock_orders(self):
orders = self.xt_trader.query_stock_orders(self.account, cancelable_only=True)
if orders:
PrintLog(LogLevel.INFO, "\n=== 委托信息 ===")
for order in orders:
PrintLog(LogLevel.INFO, f"委托编号: {order.order_id}")
PrintLog(LogLevel.INFO, f"股票代码: {order.stock_code} {getInstrumentName(order.stock_code)}")
PrintLog(LogLevel.INFO, f"委托方向: {order.offset_flag} ")
PrintLog(LogLevel.INFO, f"委托价格: {order.price}")
PrintLog(LogLevel.INFO, f"委托数量: {order.order_volume}")
PrintLog(LogLevel.INFO, f"已成交数量: {order.traded_volume}")
PrintLog(LogLevel.INFO, f"委托状态: {order.order_status} ")
PrintLog(LogLevel.INFO, "---")
else:
PrintLog(LogLevel.INFO, "\n当前无委托记录")
# 初始化指定标的交易控制器
def start_stock_trade(self, id: int):
@@ -293,14 +226,14 @@ class SFGridController(XtQuantTraderCallback):
def pause_stock_trade(self, id: int):
localTarget: model.TradeTarget = self.instrument_pool[id]
localTarget: TradeTarget = self.instrument_pool[id]
print(f'暂停标的交易 {localTarget.stock_code} - enabled {localTarget.enabled}')
if localTarget.stock_code in self.stock_trade_ctrl:
tradeController: SFGridStrategy = self.stock_trade_ctrl[localTarget.stock_code]
tradeTarget = tradeController.enabledTrading(False)
orders = queryPendingOrder(localTarget.stock_code, tradeController.getName(), self.xt_trader, self.account) # type: ignore
for order in orders:
self.xt_trader.cancel_order_stock_async(self.account, order.order_id)
qmtv.xttrader.cancel_order_stock_async(qmtv.account, order.order_id)
print(f'取消未成交订单 {len(orders)}')
self.instrument_pool[id] = tradeTarget
event_bus.publish(ResultEventTradeDisabled, tradeTarget)
+24 -26
View File
@@ -1,3 +1,4 @@
from core.qmt import qmtv
import core.sfgrid.model as model
from core import constants
from core.eventbus import EventTradeTargetUpdate, event_bus
@@ -6,16 +7,14 @@ from core.util import queryPendingOrder, is_trading_time
from xtquant import xttrader, xtconstant
from xtquant.xttype import StockAccount, XtOrder, XtTrade
import sfgrid_config
import config
import threading
class SFGridStrategy:
def __init__(self, tradeTarget: model.TradeTarget, xt_trader: xttrader.XtQuantTrader, account: StockAccount):
self.tradeTarget:model.TradeTarget = tradeTarget
self.xt_trader: xttrader.XtQuantTrader = xt_trader
self.account:StockAccount = account
def __init__(self, tradeTarget: model.SFGridTradeTarget):
self.tradeTarget:model.SFGridTradeTarget = tradeTarget
self.enabledTrading(bool(tradeTarget.enabled)) # 修复类型兼容性问题
event_bus.publish(EventTradeTargetUpdate, self.tradeTarget)
@@ -27,7 +26,7 @@ class SFGridStrategy:
self.refreshPlanPrice()
self.saveProxy()
def enabledTrading(self, enabled: bool) -> model.TradeTarget:
def enabledTrading(self, enabled: bool) -> model.SFGridTradeTarget:
self.tradeTarget.enabled = enabled # type: ignore
self.saveProxy()
@@ -40,7 +39,7 @@ class SFGridStrategy:
else: # 已建仓
# 交易阶段,检查仓位,检查现有订单
print(f" |- 标的{self.tradeTarget.targetName()}已有仓位或非初始状态 无需建初始仓 当前仓位: {self.tradeTarget.current_position} 状态: {self.tradeTarget.status}")
minRequirePosition:int = sfgrid_config.grid_volume * int(self.tradeTarget.grid_index) # type: ignore
minRequirePosition:int = config.grid_volume * int(self.tradeTarget.grid_index) # type: ignore
if minRequirePosition <= int(self.tradeTarget.current_position): # type: ignore
print(f' |- 仓位检查: 持仓需求充足, (gridVolume*gridIndex)={minRequirePosition}, 当前持仓:{self.tradeTarget.current_position}')
else:
@@ -79,16 +78,16 @@ class SFGridStrategy:
index: int = self.tradeTarget.grid_index # pyright: ignore[reportAssignmentType]
orderRemark= ""
gridBasePrice = -1 if index>=len(sfgrid_config.grid_price) or index < 0 else sfgrid_config.grid_price[int(index)] # pyright: ignore[reportArgumentType]
gridBasePrice = -1 if index>=len(config.grid_price) or index < 0 else config.grid_price[int(index)] # pyright: ignore[reportArgumentType]
if self.tradeTarget.enabled and self.tradeTarget.status == 0 and lastPrice <= sfgrid_config.grid_price[1]: # 已启用,未建仓,建仓
orderPrice = sfgrid_config.grid_price[index]
if self.tradeTarget.enabled and self.tradeTarget.status == 0 and lastPrice <= config.grid_price[1]: # 已启用,未建仓,建仓
orderPrice = config.grid_price[index]
orderType = xtconstant.STOCK_BUY
orderRemark = OrderTypeInit
if self.tradeTarget.enabled and self.tradeTarget.status == 1: # 已启用,已建仓,网格单
lowPrice = -1 if index+1>=len(sfgrid_config.grid_price) else sfgrid_config.grid_price[int(index) + 1] # pyright: ignore[reportArgumentType]
highPrice = sfgrid_config.grid_price[index - 1]
lowPrice = -1 if index+1>=len(config.grid_price) else config.grid_price[int(index) + 1] # pyright: ignore[reportArgumentType]
highPrice = config.grid_price[index - 1]
if lastPrice <= lowPrice: # 下下方多单
orderPrice = lowPrice
@@ -101,21 +100,20 @@ class SFGridStrategy:
orderRemark = OrderTypeSell
if orderType != -1:
orders = queryPendingOrder(str(self.tradeTarget.stock_code), self.getName(), self.xt_trader, self.account)
orders = qmtv.queryPendingOrder(str(self.tradeTarget.stock_code), self.getName())
if len([order for order in orders if order.order_type == orderType and order.price == orderPrice]) > 0:
# 已存在未交易的多单
print(f' |- [{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}]已存在未交易的{"多单" if orderType == xtconstant.STOCK_BUY else "空单"},不重复下单')
else:
print(f' |- 下网格{"多单" if orderType == xtconstant.STOCK_BUY else "空单"}')
self.tradeTarget.current_order_no = self.xt_trader.order_stock_async(
self.account,
self.tradeTarget.current_order_no = qmtv.orderAsync(
str(self.tradeTarget.stock_code),
config.grid_volume,
orderType,
sfgrid_config.grid_volume,
xtconstant.FIX_PRICE,
orderPrice,
xtconstant.FIX_PRICE,
orderRemark, # remark # type: ignore
self.getName(), # strategy_name
orderRemark # remark # type: ignore
)
orderTypeName = ""
if orderRemark == OrderTypeBuy:
@@ -124,7 +122,7 @@ class SFGridStrategy:
orderTypeName = "空单"
elif orderRemark == OrderTypeInit:
orderTypeName = "建仓单"
print(f' |- {orderTypeName}委托, 单号 {self.tradeTarget.current_order_no}, 网格基准价 {gridBasePrice}, 下单价 {orderPrice}, 下单量 {sfgrid_config.grid_volume}')
print(f' |- {orderTypeName}委托, 单号 {self.tradeTarget.current_order_no}, 网格基准价 {gridBasePrice}, 下单价 {orderPrice}, 下单量 {config.grid_volume}')
finally:
print(f'|- 市价更新[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}] - release lock')
self.saveProxy()
@@ -162,8 +160,8 @@ class SFGridStrategy:
self.tradeTarget.current_position = int(self.tradeTarget.current_position) + trade.traded_volume # 当前持仓数,账户原有持仓不在策略范围内 # type: ignore
self.tradeTarget.last_trade_price = float(trade.traded_price) # type: ignore
self.tradeTarget.grid_index = 1 # type: ignore
self.tradeTarget.plan_buy_price = float(sfgrid_config.grid_price[2]) # type: ignore
self.tradeTarget.plan_sell_price = float(sfgrid_config.grid_price[0]) # type: ignore
self.tradeTarget.plan_buy_price = float(config.grid_price[2]) # type: ignore
self.tradeTarget.plan_sell_price = float(config.grid_price[0]) # type: ignore
self.tradeTarget.status = 1 # type: ignore
self.saveProxy()
print(f"|- 标的{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} 建初始仓订单ID: {self.tradeTarget.current_order_no}已成交 ")
@@ -175,7 +173,7 @@ class SFGridStrategy:
self.tradeTarget.current_position = int(self.tradeTarget.current_position) - trade.traded_volume # type: ignore
self.tradeTarget.last_trade_price = float(trade.traded_price) # type: ignore
self.tradeTarget.grid_index = int(self.tradeTarget.grid_index) - 1 # type: ignore
print(f"|- 标的{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} 上涨 卖单已成交 订单ID: {self.tradeTarget.current_order_no} Price: {sfgrid_config.grid_price[int(self.tradeTarget.grid_index)]} Volume: {sfgrid_config.grid_volume} 手续费: {trade.commission}\n") # type: ignore
print(f"|- 标的{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} 上涨 卖单已成交 订单ID: {self.tradeTarget.current_order_no} Price: {config.grid_price[int(self.tradeTarget.grid_index)]} Volume: {config.grid_volume} 手续费: {trade.commission}\n") # type: ignore
print(f' 成交价: {trade.traded_price} 成交量: {trade.traded_volume}')
print(f' 当前持仓: {self.tradeTarget.current_position}')
print(f' 网格坐标: {self.tradeTarget.grid_index}')
@@ -184,7 +182,7 @@ class SFGridStrategy:
self.tradeTarget.current_position = int(self.tradeTarget.current_position) + trade.traded_volume # type: ignore
self.tradeTarget.last_trade_price = float(trade.traded_price) # type: ignore
self.tradeTarget.grid_index = int(self.tradeTarget.grid_index) + 1 # type: ignore
print(f"|- 标的{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} 下跌 买单已成交 订单ID: {self.tradeTarget.current_order_no} Price: {trade.traded_price} Volume: {sfgrid_config.grid_volume} 手续费: {trade.commission}")
print(f"|- 标的{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} 下跌 买单已成交 订单ID: {self.tradeTarget.current_order_no} Price: {trade.traded_price} Volume: {config.grid_volume} 手续费: {trade.commission}")
print(f' 成交价: {trade.traded_price} 成交量: {trade.traded_volume}')
print(f' 当前持仓: {self.tradeTarget.current_position}')
print(f' 网格坐标: {self.tradeTarget.grid_index}')
@@ -202,11 +200,11 @@ class SFGridStrategy:
buyIdx: int = self.tradeTarget.grid_index + 1 # pyright: ignore[reportAssignmentType]
sellIdx: int = self.tradeTarget.grid_index - 1
if self.tradeTarget.grid_index > 0: # 可以下空单
self.tradeTarget.plan_sell_price = float(sfgrid_config.grid_price[sellIdx]) # pyright: ignore[reportAttributeAccessIssue]
self.tradeTarget.plan_sell_price = float(config.grid_price[sellIdx]) # pyright: ignore[reportAttributeAccessIssue]
else:
self.tradeTarget.plan_sell_price = -1.0 # type: ignore
if self.tradeTarget.grid_index < len(sfgrid_config.grid_price) - 1:
self.tradeTarget.plan_buy_price = float(sfgrid_config.grid_price[buyIdx]) # pyright: ignore[reportAttributeAccessIssue]
if self.tradeTarget.grid_index < len(config.grid_price) - 1:
self.tradeTarget.plan_buy_price = float(config.grid_price[buyIdx]) # pyright: ignore[reportAttributeAccessIssue]
else:
self.tradeTarget.plan_buy_price = -1.0 # pyright: ignore[reportAttributeAccessIssue]
else:
+97 -85
View File
@@ -1,52 +1,71 @@
from typing import Any
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
from datetime import datetime
import threading
import time
import core.eventbus as eBus
from core.logger import LogData, LogLevel
from core.sfgrid.model import TradeTarget, db
from core.logger import LogLevel, PrintLog
from core.sfgrid.model import SFGridTradeTarget
import configparser
import sfgrid_config
import config
from core.sfgrid.objects import GridFixData
from core.util import getInstrumentName
from core.qmt import qmtv
from core.sfgrid.sfgrid_strategy import SFGridStrategy
class TradeTargetUI(ttk.Frame):
def __init__(self, parent, main_window):
def __init__(self, parent):
super().__init__(parent)
# 保存主窗口的引用,用于访问全局日志
self.main_window = main_window
self.tradeTargetData:dict[int, TradeTarget] = {}
self.market_data_enabled = False # 添加市场数据监听状态变量
self.ui_refresh_enabled = False # 添加UI刷新线程状态变量
self.tradeTargetData:dict[int, SFGridTradeTarget] = {} # id->trade_target
self.strategy_ctrl:dict[str, SFGridStrategy] = {} # stock_code->trade_target
self.init_trade_target_pool()
self.registerEventHandler()
# 创建刷新线程标志
self.refresh_thread_running = False # 默认不启动刷新线程
self.refresh_thread_running = True # 默认不启动刷新线程
# 市场监控数据
self.marketData: dict[str, Any] = {} # 存储市场数据 {stock_code: {stock_name, last_price, time}}
# 创建界面
self.create_ui()
# 不再自动启动刷新线程,由市场数据开关控制
def refresh_targets(self):
# 更新标的池
results = SFGridTradeTarget.select()
for temp in results:
result :SFGridTradeTarget = temp
self.tradeTargetData[result.get_id()] = result
def init_trade_target_pool(self):
self.refresh_targets()
for id, tradeTarget in self.tradeTargetData.items():
status = "新建" if tradeTarget.status == 0 else "已建初始仓"
PrintLog(LogLevel.INFO, f' [序号-{id}] 股票代码: {tradeTarget.stock_code}-{tradeTarget.stock_name} 当前持仓: {qmtv.getStockPosition(tradeTarget.stock_code)} 网格索引: {tradeTarget.grid_index} 基准价格 {config.grid_price[tradeTarget.grid_index]} 状态: {status} 启用交易线程: {'自动交易中' if tradeTarget.enabled else '交易已停止'}') # type: ignore
tradeTarget.current_position = qmtv.getStockPosition(tradeTarget.stock_code) # type: ignore
result = tradeTarget.save()
PrintLog(LogLevel.INFO, f' |- 同步[{tradeTarget.stock_code}-{tradeTarget.stock_name}]持仓信息[{'成功' if result == 1 else '失败'}]')
stockTradeController = SFGridStrategy(tradeTarget) # type: ignore
self.strategy_ctrl[tradeTarget.stock_code] = stockTradeController # pyright: ignore[reportArgumentType]
# eBus.event_bus.publish(eBus.EventTradeTargetUpdate, tradeTarget)
PrintLog(LogLevel.INFO, f'- [成功]交易标的信息初始化, 共 {len(self.tradeTargetData)} 个标的')
def registerEventHandler(self):
eBus.event_bus.subscribe(eBus.EventTradeTargetUpdate, self.onTradeTargetUpdated)
eBus.event_bus.subscribe(eBus.MarketDataUpdate, self.onMarketDataUpdated)
eBus.event_bus.subscribe(eBus.ResultEventTradeEnabled, self.onTradeEnabled)
eBus.event_bus.subscribe(eBus.ResultEventTradeDisabled, self.onTradeDisabled)
eBus.event_bus.subscribe(eBus.MarketDataEnabled, self.onMarketDataToggled)
eBus.event_bus.subscribe(eBus.MarketDataDisabled, self.onMarketDataToggled)
# eBus.event_bus.subscribe(eBus.EventTradeTargetUpdate, self.onTradeTargetUpdated)
# eBus.event_bus.subscribe(eBus.MarketDataUpdate, self.onMarketDataUpdated)
# eBus.event_bus.subscribe(eBus.ResultEventTradeEnabled, self.onTradeEnabled)
# eBus.event_bus.subscribe(eBus.ResultEventTradeDisabled, self.onTradeDisabled)
# eBus.event_bus.subscribe(eBus.MarketDataEnabled, self.onMarketDataToggled)
# eBus.event_bus.subscribe(eBus.MarketDataDisabled, self.onMarketDataToggled)
eBus.event_bus.subscribe(eBus.ResultEventTradeTargetDeleted, self.onTradeTargetDeleted)
eBus.event_bus.subscribe(eBus.EventPrintLog, self.onLog)
def start_refresh_thread(self):
"""启动刷新线程"""
@@ -71,30 +90,30 @@ class TradeTargetUI(ttk.Frame):
if id in self.tradeTargetData:
del self.tradeTargetData[id]
# 添加日志
self.add_log(LogLevel.INFO, f"交易标的已删除,ID: {id}")
PrintLog(LogLevel.INFO, f"交易标的已删除,ID: {id}")
def onMarketDataToggled(self, data:bool):
self.market_data_enabled = self.market_data_switch_var.get()
self.add_log(LogLevel.INFO, "市场数据监听已" + ("启用" if data else "禁用"))
PrintLog(LogLevel.INFO, "市场数据监听已" + ("启用" if data else "禁用"))
# 同步UI刷新线程状态
if data:
self.start_ui_refresh()
else:
self.stop_ui_refresh()
def onTradeEnabled(self, target:TradeTarget):
self.add_log(LogLevel.INFO, f"交易启用: {target.stock_code} - {target.stock_name}")
def onTradeEnabled(self, target:SFGridTradeTarget):
PrintLog(LogLevel.INFO, f"交易启用: {target.stock_code} - {target.stock_name}")
def onTradeDisabled(self, target:TradeTarget):
self.add_log(LogLevel.INFO, f"交易禁用: {target.stock_code} - {target.stock_name}")
def onTradeDisabled(self, target:SFGridTradeTarget):
PrintLog(LogLevel.INFO, f"交易禁用: {target.stock_code} - {target.stock_name}")
def onTradeTargetUpdated(self, target: TradeTarget):
def onTradeTargetUpdated(self, target: SFGridTradeTarget):
# 更新或添加数据到本地缓存
self.tradeTargetData[target.get_id()] = target
def onMarketDataUpdated(self, target: TradeTarget):
def onMarketDataUpdated(self, target: SFGridTradeTarget):
# 更新市场监控数据
current_time = datetime.now().strftime("%H:%M:%S")
self.marketData[str(target.stock_code)] = {
@@ -160,14 +179,14 @@ class TradeTargetUI(ttk.Frame):
if not self.refresh_thread_running:
self.refresh_thread_running = True
self.start_refresh_thread()
self.add_log(LogLevel.INFO, "UI刷新线程已启动")
PrintLog(LogLevel.INFO, "UI刷新线程已启动")
def stop_ui_refresh(self):
"""停止UI刷新线程"""
if self.refresh_thread_running:
self.stop_refresh_thread()
self.refresh_thread_running = False
self.add_log(LogLevel.INFO, "UI刷新线程已停止")
PrintLog(LogLevel.INFO, "UI刷新线程已停止")
@@ -315,9 +334,9 @@ class TradeTargetUI(ttk.Frame):
if result:
# 发布事件通知主控制器添加标的
eBus.event_bus.publish(eBus.ActionEventAddTradeTarget, stock_code)
self.add_log(LogLevel.INFO, f"已发送添加请求: {stock_code} - {stock_name}")
PrintLog(LogLevel.INFO, f"已发送添加请求: {stock_code} - {stock_name}")
def get_status_indicator(self, target: TradeTarget) -> str:
def get_status_indicator(self, target: SFGridTradeTarget) -> str:
"""获取状态指示器(带颜色色块的文本)"""
if target.status == 1:
# 绿色圆点表示交易中
@@ -337,8 +356,7 @@ class TradeTargetUI(ttk.Frame):
def populate_trade_table(self):
"""填充交易标的表格数据"""
for temp in self.tradeTargetData:
target: TradeTarget = self.tradeTargetData[temp]
for temp, target in self.tradeTargetData.items():
values = [
target.id, # type: ignore
target.stock_code,
@@ -374,7 +392,7 @@ class TradeTargetUI(ttk.Frame):
if selected:
item = selected[0]
values = self.trade_table.item(item)['values']
self.add_log(LogLevel.DEBUG, f"双击查看详情: {values[0]} - {values[1]}")
PrintLog(LogLevel.DEBUG, f"双击查看详情: {values[0]} - {values[1]}")
def get_selected_target(self):
"""获取选中的交易标的"""
@@ -418,7 +436,7 @@ class TradeTargetUI(ttk.Frame):
# self.add_log("INFO", f"已启动交易: {target.stock_code} - {target.stock_name}")
# messagebox.showinfo("启动成功", f"已启动 {target.stock_code} ({target.stock_name}) 的交易")
def on_trade_enabled(self, target: TradeTarget):
def on_trade_enabled(self, target: SFGridTradeTarget):
eBus.event_bus.publish(eBus.ActionEventEnableTrade, target)
def pause_selected_trade(self):
@@ -462,7 +480,7 @@ class TradeTargetUI(ttk.Frame):
if result:
# 通过事件总线发出删除动作
eBus.event_bus.publish(eBus.ActionEventDeleteTradeTarget, target.get_id())
self.add_log(LogLevel.INFO, f"已发送删除请求: {target.stock_code} - {target.stock_name}")
PrintLog(LogLevel.INFO, f"已发送删除请求: {target.stock_code} - {target.stock_name}")
def add_trade_target(self):
"""添加新的交易标的"""
@@ -519,7 +537,7 @@ class TradeTargetUI(ttk.Frame):
# 绑定回车键确认
stock_code_entry.bind('<Return>', lambda event: confirm_add())
self.add_log(LogLevel.INFO, "点击添加交易标的按钮")
PrintLog(LogLevel.INFO, "点击添加交易标的按钮")
@@ -550,14 +568,6 @@ class TradeTargetUI(ttk.Frame):
# 刷新市场监控表格
self.populate_market_table()
def onLog(self, data:LogData):
# 使用全局日志
self.main_window.add_log(data.level, data.message)
def add_log(self, level:LogLevel, message):
"""添加日志记录 - 转发到全局日志"""
self.main_window.add_log(level, message)
def system_settings(self):
"""系统设置"""
# 获取顶层窗口
@@ -612,7 +622,7 @@ class TradeTargetUI(ttk.Frame):
# 读取当前配置
config = configparser.ConfigParser()
config_path = sfgrid_config.get_config_path()
config_path = config.get('config', 'config_path')
config.read(config_path, encoding='utf-8')
# 创建输入框字典用于保存引用
@@ -823,40 +833,42 @@ class TradeTargetUI(ttk.Frame):
# 定义保存和取消按钮的功能(button_frame已在上方创建)
def save_settings():
"""保存配置"""
try:
# 计算网格价格序列
grid_prices = calculate_grid_prices()
if not grid_prices:
messagebox.showerror("错误", "网格价格参数有误,请检查输入!")
return
# try:
# # 计算网格价格序列
# grid_prices = calculate_grid_prices()
# if not grid_prices:
# messagebox.showerror("错误", "网格价格参数有误,请检查输入!")
# return
grid_price_str = ",".join([str(p) for p in grid_prices])
# grid_price_str = ",".join([str(p) for p in grid_prices])
# 更新配置对象
config.set('config', 'miniQMTPath', entries['miniQMTPath'].get())
config.set('config', 'grid_price', grid_price_str)
config.set('config', 'grid_volume', entries['grid_volume'].get())
config.set('config', 'account_no', entries['account_no'].get())
# # 更新配置对象
# config.set('config', 'miniQMTPath', entries['miniQMTPath'].get())
# config.set('config', 'grid_price', grid_price_str)
# config.set('config', 'grid_volume', entries['grid_volume'].get())
# config.set('config', 'account_no', entries['account_no'].get())
# 写入配置文件
config_path = sfgrid_config.get_config_path()
with open(config_path, 'w', encoding='utf-8') as configfile:
config.write(configfile)
# # 写入配置文件
# config_path = config.get_config_path()
# with open(config_path, 'w', encoding='utf-8') as configfile:
# config.write(configfile)
# 重新加载配置到内存中
sfgrid_config.initConfig()
# # 重新加载配置到内存中
# config.initConfig()
messagebox.showinfo("成功", f"配置已保存!\n网格价格序列: {grid_price_str}\n部分配置可能需要重启程序后生效。")
self.add_log(LogLevel.INFO, f"系统配置已更新 - 网格数量: {len(grid_prices)}")
settings_window.destroy()
# messagebox.showinfo("成功", f"配置已保存!\n网格价格序列: {grid_price_str}\n部分配置可能需要重启程序后生效。")
# self.add_log(LogLevel.INFO, f"系统配置已更新 - 网格数量: {len(grid_prices)}")
# settings_window.destroy()
except Exception as e:
messagebox.showerror("错误", f"保存配置失败:{str(e)}")
self.add_log(LogLevel.ERROR, f"保存配置失败: {str(e)}")
# except Exception as e:
# messagebox.showerror("错误", f"保存配置失败:{str(e)}")
# self.add_log(LogLevel.ERROR, f"保存配置失败: {str(e)}")
pass
def cancel_settings():
"""取消设置"""
settings_window.destroy()
# settings_window.destroy()
pass
# 在button_frame中添加按钮
ttk.Button(button_frame, text="💾 保存配置", command=save_settings, width=15).pack(side=tk.LEFT, padx=5)
@@ -871,7 +883,7 @@ class TradeTargetUI(ttk.Frame):
# 创建网格修正窗口
self.create_grid_correction_window(target)
def create_grid_correction_window(self, target: TradeTarget):
def create_grid_correction_window(self, target: SFGridTradeTarget):
"""创建网格修正窗口"""
# 获取顶层窗口
root = self.winfo_toplevel()
@@ -929,7 +941,7 @@ class TradeTargetUI(ttk.Frame):
required_position_frame.pack(fill=tk.X, pady=5)
grid_index_value = getattr(target, 'grid_index')
required_position = grid_index_value * sfgrid_config.grid_volume
required_position = grid_index_value * config.grid_volume
ttk.Label(required_position_frame, text="需求持仓量:", width=12).pack(side=tk.LEFT)
required_position_label = ttk.Label(required_position_frame, text=str(required_position), width=10, anchor=tk.CENTER)
required_position_label.pack(side=tk.LEFT, padx=5)
@@ -954,7 +966,7 @@ class TradeTargetUI(ttk.Frame):
# 增加按钮
ttk.Button(grid_index_frame, text="+", width=3,
command=lambda: self.increase_grid_index(grid_index_var, len(sfgrid_config.grid_price)-1, target, required_position_label, position_status_label)).pack(side=tk.LEFT, padx=(5, 0))
command=lambda: self.increase_grid_index(grid_index_var, len(config.grid_price)-1, target, required_position_label, position_status_label)).pack(side=tk.LEFT, padx=(5, 0))
# 当前价格(实时更新)
price_frame = ttk.Frame(options_frame)
@@ -978,7 +990,7 @@ class TradeTargetUI(ttk.Frame):
command=correction_window.destroy).pack(side=tk.RIGHT, padx=5)
# 监听市场数据更新
def on_market_data_update(updated_target: TradeTarget):
def on_market_data_update(updated_target: SFGridTradeTarget):
if updated_target.get_id() == target.get_id():
current_price_var.set(f"{updated_target.market_price:.3f}" if updated_target.market_price else "-")
@@ -1005,13 +1017,13 @@ class TradeTargetUI(ttk.Frame):
shortage = required_position - current_position
status_label.config(text=f"还需补充 {shortage} 手仓位", foreground="red")
def save_grid_correction(self, window, target: TradeTarget, new_grid_index: int):
def save_grid_correction(self, window, target: SFGridTradeTarget, new_grid_index: int):
"""保存网格修正"""
# 更新网格序号
setattr(target, 'grid_index', new_grid_index)
# 重新计算需求持仓量
required_position = new_grid_index * sfgrid_config.grid_volume
required_position = new_grid_index * config.grid_volume
# 检查持仓量是否满足要求
current_position = getattr(target, 'current_position')
@@ -1033,11 +1045,11 @@ class TradeTargetUI(ttk.Frame):
window.destroy()
# 添加日志
self.add_log(LogLevel.INFO, f"网格修正已保存: {target.stock_code} - {target.stock_name}, 网格序号: {new_grid_index}")
PrintLog(LogLevel.INFO, f"网格修正已保存: {target.stock_code} - {target.stock_name}, 网格序号: {new_grid_index}")
def decrease_grid_index(self, grid_index_var: tk.IntVar, target: TradeTarget, required_position_label: ttk.Label, position_status_label: ttk.Label):
def decrease_grid_index(self, grid_index_var: tk.IntVar, target: SFGridTradeTarget, required_position_label: ttk.Label, position_status_label: ttk.Label):
"""减少网格序号"""
current_value = grid_index_var.get()
if current_value > 0:
@@ -1045,7 +1057,7 @@ class TradeTargetUI(ttk.Frame):
# 同步更新需求持仓量和持仓状态
self.update_required_position_and_status(grid_index_var.get(), target, required_position_label, position_status_label)
def increase_grid_index(self, grid_index_var: tk.IntVar, max_index: int, target: TradeTarget, required_position_label: ttk.Label, position_status_label: ttk.Label):
def increase_grid_index(self, grid_index_var: tk.IntVar, max_index: int, target: SFGridTradeTarget, required_position_label: ttk.Label, position_status_label: ttk.Label):
"""增加网格序号"""
current_value = grid_index_var.get()
if current_value < max_index:
@@ -1053,10 +1065,10 @@ class TradeTargetUI(ttk.Frame):
# 同步更新需求持仓量和持仓状态
self.update_required_position_and_status(grid_index_var.get(), target, required_position_label, position_status_label)
def update_required_position_and_status(self, grid_index: int, target: TradeTarget, required_position_label: ttk.Label, position_status_label: ttk.Label):
def update_required_position_and_status(self, grid_index: int, target: SFGridTradeTarget, required_position_label: ttk.Label, position_status_label: ttk.Label):
"""更新需求持仓量和持仓状态"""
# 计算需求持仓量
required_position = grid_index * sfgrid_config.grid_volume
required_position = grid_index * config.grid_volume
required_position_label.config(text=str(required_position))
# 更新持仓量状态
+2 -2
View File
@@ -1,4 +1,4 @@
import sfgrid_config
import config
import xtquant.xtconstant as xtconstant
from xtquant import xtdata, xttrader
from xtquant.xttype import StockAccount, XtOrder, XtPosition
@@ -58,7 +58,7 @@ def getStockPosition(stock_code: str, xt_trader: xttrader.XtQuantTrader, account
return volume
def minPosition(gridIndex:int):
return sfgrid_config.grid_volume * gridIndex
return config.grid_volume * gridIndex
def queryPendingOrder(stock_code:str, tag: str, xt_trader: xttrader.XtQuantTrader, account: StockAccount) -> list[XtOrder]:
if stock_code == None or tag == None:
+4 -13
View File
@@ -1,21 +1,12 @@
# coding:utf-8
from core.database import db
from core.main_ui import MainWindow
from core.sfgrid.main_controller import SFGridController
from core.logger import LogLevel, PrintLog
import sfgrid_config as sdConstants
# def startTrade(index: int):
# ctrl.start_stock_trade(index)
# def pauseTrade(index: int):
# ctrl.pause_stock_trade(index)
import config as sdConstants
from core.qmt import qmtv
if __name__ == '__main__':
sdConstants.initConfig()
# ctrl: SFGridController = SFGridController(sdConstants.account_no, sdConstants.miniQMTPath)
# ctrl.hold()
qmtv.init_qmtv()
qmtv.connect()
window = MainWindow()
window.run()