This commit is contained in:
2026-06-05 06:08:27 +08:00
parent 1816d585bf
commit ef4c1cca32
7 changed files with 551 additions and 168 deletions
+10 -3
View File
@@ -7,6 +7,7 @@ miniQMTPath = r'D:\\Programs\\DTQMT\\userdata_mini' # miniQMT软件的安装路
account_no:str = '99082560'
console_log = True
log_level = "INFO"
use_simulated_qmt: bool = False
def get_config_path() -> Path:
"""获取配置文件的正确路径(兼容开发环境和打包后的可执行文件)"""
@@ -21,12 +22,14 @@ def get_config_path() -> Path:
return base_path / 'config.ini'
def save_config(miniQmtPath:str, account_no:str):
def save_config(miniQmtPath:str, account_no:str, use_simulated_qmt: bool = False):
"""创建默认配置文件"""
config = configparser.ConfigParser()
config['config'] = {
'miniQMTPath': miniQmtPath,
'account_no': account_no
'account_no': account_no,
'use_simulated_qmt': str(use_simulated_qmt),
'log_level': 'INFO'
}
config_path = get_config_path()
with open(config_path, 'w') as configfile:
@@ -39,7 +42,7 @@ def exist_config() -> bool:
return config_path.exists()
def initConfig() -> bool:
global miniQMTPath, account_no, log_level
global miniQMTPath, account_no, log_level, use_simulated_qmt
# 获取配置文件路径
config_path = get_config_path()
@@ -49,6 +52,10 @@ def initConfig() -> bool:
miniQMTPath = config.get('config','miniQMTPath')
account_no = config.get('config','account_no')
log_level = config.get('config','log_level')
try:
use_simulated_qmt = config.get('config','use_simulated_qmt').lower() in ('true', '1', 'yes')
except (configparser.NoOptionError, configparser.NoSectionError):
use_simulated_qmt = False
# 判断miniQMTPath是否为空,并且目录是否存在
if not miniQMTPath or not Path(miniQMTPath).exists():
+1
View File
@@ -4,6 +4,7 @@ EventMarketActiveSwitch = "market_active_switch" # 市场数据状态变更
MarketDataUpdate = "market_data_update" # 市价更新
MarketOrderCreated = "market_order_created" # 市价单创建
MarketOrderTraded = "market_order_traded" # 市价单成交
MarketOrderError = "market_order_error" # 市价单委托失败
# Pring Log
EventPrintLog = "print_log" # 打印日志
+9 -2
View File
@@ -1,11 +1,17 @@
"""
QMT 模块统一入口
根据环境自动选择真实 QMT 或模拟器
根据配置或环境自动选择真实 QMT 或模拟器
"""
import sys
import config as _config
def _get_qmt():
"""获取 QMT 模块"""
"""获取 QMT 模块(配置优先于平台检测)"""
if _config.use_simulated_qmt:
from core.qmt_dummy import qmtv
return qmtv
if sys.platform == 'win32':
try:
from core.qmt_real import qmtv as real_qmtv
@@ -17,5 +23,6 @@ def _get_qmt():
from core.qmt_dummy import qmtv
return qmtv
# 导出单例
qmtv = _get_qmt()
+2 -1
View File
@@ -280,7 +280,8 @@ class DummyQmtV:
eBus.event_bus.publish(eBus.MarketOrderCreated, response)
def on_order_error(self, order_error):
print(f"\n模拟委托报错回调 {order_error}")
print(f"\n模拟委托报错回调: order_id={order_error.order_id}, error_id={order_error.error_id}, error_msg={order_error.error_msg}, remark={order_error.order_remark}")
eBus.event_bus.publish(eBus.MarketOrderError, order_error)
def on_account_status(self, status):
print(datetime.datetime.now(), status)
+8 -2
View File
@@ -163,9 +163,10 @@ class RealQmtV:
return -1
try:
full_code = self._to_full_code(stock_code)
seq = self.xt_trader.order_stock_async(
account=self.account,
stock_code=stock_code,
stock_code=full_code,
order_volume=orderVolume,
order_type=orderType,
price=orderPrice,
@@ -306,6 +307,10 @@ class RealQmtV:
seq = xtdata.subscribe_whole_quote(['SH', 'SZ'], self._on_market_data)
PrintLog(LogLevel.INFO, f'- [市场数据订阅成功-真实] seq={seq}')
# 订阅成功即标记市场活跃,避免策略初始化时因等待首条数据被误判为休市
self.isMarketActive = True
eBus.event_bus.publish(eBus.EventMarketActiveSwitch, True)
# 启动行情活跃监控线程
self._market_data_thread = threading.Thread(
target=self._market_data_watchdog, daemon=True
@@ -359,7 +364,8 @@ class RealQmtV:
eBus.event_bus.publish(eBus.MarketOrderCreated, response)
def on_order_error(self, order_error):
print(f"\n真实委托报错回调 {order_error}")
print(f"\n真实委托报错回调: order_id={order_error.order_id}, error_id={order_error.error_id}, error_msg={order_error.error_msg}, remark={order_error.order_remark}")
eBus.event_bus.publish(eBus.MarketOrderError, order_error)
def on_account_status(self, status):
print(datetime.datetime.now(), status)
+450 -103
View File
@@ -1,3 +1,24 @@
"""
网格交易策略控制器
核心逻辑:在预设的价格网格上低买高卖,每个网格节点同时挂一对买卖单,
成交后自动切换到相邻网格并刷新订单。
网格结构示意(以 grid_index 为中心):
价格从高到低排列在 getPriceGrid() 列表中
grid_index=0 是最低价(底部),越大价格越高(顶部)
卖出方向(上移): grid_index - 1 (价格更低,空单)
买入方向(下移): grid_index + 1 (价格更高,多单)
成交 → 上移一格(卖出成交): grid_index -= 1,赚取一格差价
成交 → 下移一格(买入成交): grid_index += 1,持仓成本降低
状态机:
status=0: 未建仓,需先下建仓单买入初始仓位
status=1: 已建仓,运行网格交易(上下各挂一单)
"""
from core.logger import LogLevel, PrintLog
from core.qmt import qmtv
from core.sfgrid import bus_events
@@ -7,113 +28,234 @@ from core.eventbus import event_bus
from core.constants import OrderTypeBuy, OrderTypeSell, OrderTypeInit
from xtquant import xtconstant
from xtquant.xttype import XtOrderResponse, XtTrade
from xtquant.xttype import XtOrderError, XtOrderResponse, XtTrade
import threading
import core.eventbus as eBus
class SFGridStrategy:
"""
单标的网格交易策略控制器
每个 SFGridTradeTarget 数据库记录对应一个 SFGridStrategy 实例。
负责:建仓 → 挂网格单 → 监听成交/错误事件 → 调整网格 → 刷新订单。
订单 remark 格式: "{订单类型},{网格索引},{股票代码}"
例: "BUY,3,000001" 表示在网格索引 3 处挂买入单,标的 000001
例: "INIT,1,000001" 表示建仓单,建仓在网格索引 1
"""
def __init__(self, tradeTarget: model.SFGridTradeTarget):
"""
初始化网格策略控制器
参数:
tradeTarget: 数据库中的交易标记录,包含网格参数、当前状态等
"""
self.tradeTarget: model.SFGridTradeTarget = tradeTarget
# 订阅事件总线:监听订单创建、成交、失败三种事件
event_bus.subscribe(eBus.MarketOrderCreated, self.onOrderCreateAsync)
event_bus.subscribe(eBus.MarketOrderTraded, self.onOrderTrade)
event_bus.subscribe(eBus.MarketOrderError, self.onOrderError)
# 获取当日涨跌停价格(用于价格边界校验)
self.todayUpStopPrice = qmtv.dailyUpStop(tradeTarget.stock_code) # type: ignore
self.todayDownStopPrice = qmtv.dailyDownStop(tradeTarget.stock_code) # type: ignore
PrintLog(LogLevel.INFO, f'|- [DEBUG] 标的{tradeTarget.targetName()} 构造开始: grid_index={tradeTarget.grid_index}, status={tradeTarget.status}, enabled={tradeTarget.enabled}')
self.orderGrid = {} # grid index, order_seq | order_id
PrintLog(LogLevel.INFO,
f'|- [DEBUG] 标的{tradeTarget.targetName()} 构造开始: '
f'grid_index={tradeTarget.grid_index}, status={tradeTarget.status}, '
f'enabled={tradeTarget.enabled}')
# orderGrid: 网格索引 → 订单编号(seq 或 order_id)的映射
# seq 是 xtquant 返回的下单序号(下单瞬间),order_id 是交易所返回的正式订单号(异步回调后更新)
self.orderGrid = {} # {grid_index: order_seq | order_id}
# 加载券商侧已存在的未成交订单,恢复到 orderGrid 中
self.loadExistOrders()
self.enabledTrading(tradeTarget.enabled) # type: ignore
# 数据更新锁:保护 orderGrid 和 tradeTarget 的并发访问
# QMT 回调在独立线程中触发,必须在可能触发回调的操作之前创建
self.dataUpdateLock = threading.Lock()
PrintLog(LogLevel.INFO, f'|- [DEBUG] 标的{tradeTarget.targetName()} 构造结束: grid_index={self.tradeTarget.grid_index}')
# 根据数据库中的 enabled 字段决定是否启动交易
self.enabledTrading(tradeTarget.enabled) # type: ignore
PrintLog(LogLevel.INFO,
f'|- [DEBUG] 标的{tradeTarget.targetName()} 构造结束: '
f'grid_index={self.tradeTarget.grid_index}')
# ── 订单加载 ──────────────────────────────────────────────
def loadExistOrders(self):
"""
从券商侧加载该策略的未成交订单,恢复到 orderGrid
用于程序重启后恢复状态:数据库中可能没有记录所有挂单,
通过 queryPendingOrder 从 QMT 获取实际存在的订单。
"""
orders = qmtv.queryPendingOrder(self.tradeTarget.stock_code, self.getName()) # type: ignore
for order in orders:
# 只处理本策略的订单(通过 strategy_name 过滤)
if order.strategy_name != self.getName():
continue
gridIdx = int(order.order_remark.split(',')[1])
parsed = self._parse_remark(order.order_remark)
if parsed is None:
continue
_, gridIdx, _ = parsed
self.orderGrid[gridIdx] = order.order_id
PrintLog(LogLevel.INFO, f'|- 标的[{self.tradeTarget.targetName()}] 初始化: 加载现有订单, grid-{gridIdx} order_id:{self.orderGrid[gridIdx]}')
PrintLog(LogLevel.INFO,
f'|- 标的[{self.tradeTarget.targetName()}] 初始化: '
f'加载现有订单, grid-{gridIdx} order_id:{self.orderGrid[gridIdx]}')
def printPendingOrder(self):
"""调试用:打印当前所有挂单"""
for idx, order_id in self.orderGrid.items():
PrintLog(LogLevel.DEBUG, f" {idx} : {order_id}")
# ── 市场状态切换 ──────────────────────────────────────────
def onMarketActiveSwitch(self, isActive: bool):
"""
市场数据状态切换回调(由 UI 层调用)
当市场数据从不可用变为可用时,如果策略已启用则刷新网格订单。
"""
if isActive and self.tradeTarget.enabled:
self.refreshGridOrder()
def refreshGridOrder(self): # 下网格单
# ── 核心:网格下单逻辑 ────────────────────────────────────
def refreshGridOrder(self):
"""
刷新网格挂单 —— 策略的核心下单方法
逻辑分支:
1. 前置检查: 市场未激活 或 策略未启用 → 跳过不下单
2. status=0 (未建仓): 下一个建仓单(买入初始仓位)
3. status=1 (已建仓): 在 grid_index 上下各挂一单
- 上方 (sellIdx = grid_index - 1): 挂卖出单(价格更低时卖出获利)
- 下方 (buyIdx = grid_index + 1): 挂买入单(价格更低时补仓)
每个方向都先检查是否已存在同价位订单,避免重复下单
"""
# ── 前置检查:市场和策略状态 ──
if not qmtv.isMarketActive or not self.tradeTarget.enabled:
PrintLog(LogLevel.INFO, f'|- 市场 {qmtv.isMarketActive}, 策略 {self.getName()} {self.tradeTarget.enabled}, 不下单')
PrintLog(LogLevel.INFO,
f'|- 市场 {qmtv.isMarketActive}, 策略 {self.getName()} '
f'{self.tradeTarget.enabled}, 不下单')
return
currentIdx:int = 0
# 获取当前该标的所有未成交订单
orders = qmtv.queryPendingOrder(self.tradeTarget.stock_code, self.getName()) # type: ignore
if self.tradeTarget.status == 0 and len([order for order in orders if order.order_remark == f'{OrderTypeInit},1,{self.tradeTarget.stock_code}']) == 0: # status == 0 表示已配置好交易参数,且不存在执行中的建仓单
# ── 分支1: status=0 未建仓 → 下建仓单 ──
# 条件: 标的尚未建仓 且 不存在正在执行中的建仓单(防止重复建仓)
init_remark = self._make_remark(OrderTypeInit, 1)
if self.tradeTarget.status == 0 and not any(
o.order_remark == init_remark for o in orders
):
# 建仓价取价格网格中最高价(grid_index=0 即列表第一个元素)
price = self.tradeTarget.getPriceGrid()[0]
remark = f'{OrderTypeInit},1,{self.tradeTarget.stock_code}'
tmpOrderSeq = qmtv.orderAsync(
str(self.tradeTarget.stock_code),
self.tradeTarget.grid_volume,
xtconstant.STOCK_BUY,
xtconstant.STOCK_BUY, # 建仓 = 买入
price,
xtconstant.FIX_PRICE,
remark, # remark # type: ignore
self.getName(), # strategy_name
xtconstant.FIX_PRICE, # 限价单
init_remark,
self.getName(),
)
self.orderGrid[1] = tmpOrderSeq # seq
PrintLog(LogLevel.INFO, f'|- 标的[{self.tradeTarget.targetName()}] 初始化: 建仓单,建仓价: {price:.3f}')
elif self.tradeTarget.status == 1: # 下网格单
currentIdx = self.tradeTarget.grid_index # type: ignore
orders = qmtv.queryPendingOrder(self.tradeTarget.stock_code, self.getName()) # type: ignore
self.orderGrid[1] = tmpOrderSeq # 建仓单固定在网格索引 1
PrintLog(LogLevel.INFO,
f'|- 标的[{self.tradeTarget.targetName()}] 初始化: '
f'建仓单,建仓价: {price:.3f}')
# 向上下一单,向下下一单
if currentIdx > 0: # 可以下空单
sellIdx = currentIdx - 1
# ── 分支2: status=1 已建仓 → 下网格买卖单 ──
elif self.tradeTarget.status == 1:
currentIdx = self.tradeTarget.grid_index # type: ignore
# --- 上方挂卖出单(空单)---
# 条件: grid_index > 0,即当前位置不是价格最低点,还有向下(卖出)空间
if currentIdx > 0:
sellIdx = currentIdx - 1 # 向上一个网格
sellPrice = self.tradeTarget.getPriceGrid()[sellIdx]
remark = f'{OrderTypeSell},{sellIdx},{self.tradeTarget.stock_code}'
if len([order for order in orders if order.order_remark == remark]) == 0: # 网格节点没有卖单,下单
# 不存在策略内同价位订单,下单
sell_remark = self._make_remark(OrderTypeSell, sellIdx)
# 检查是否已存在同 remark 的卖单(避免重复挂单)
if not any(o.order_remark == sell_remark for o in orders):
# 卖单价格超过涨停价 → 今日无法成交,跳过下单
if sellPrice > self.todayUpStopPrice:
PrintLog(LogLevel.INFO,
f'|- 标的[{self.tradeTarget.targetName()}] '
f'上方网格[{sellIdx}]卖价 {sellPrice:.3f} > 涨停价 {self.todayUpStopPrice:.3f}'
f'今日无法下卖单 (当前网格基准 grid-{currentIdx})')
else:
tmpOrderSeq = qmtv.orderAsync(
str(self.tradeTarget.stock_code),
self.tradeTarget.grid_volume,
xtconstant.STOCK_SELL,
xtconstant.STOCK_SELL, # 卖出
sellPrice,
xtconstant.FIX_PRICE,
remark, # remark # type: ignore
self.getName(), # strategy_name
sell_remark,
self.getName(),
)
self.orderGrid[sellIdx] = tmpOrderSeq # seq
PrintLog(LogLevel.INFO, f'|- 标的[{self.tradeTarget.targetName()}] 网格策略: 下空单,价格: {sellPrice:.3f}')
self.orderGrid[sellIdx] = tmpOrderSeq
PrintLog(LogLevel.INFO,
f'|- 标的[{self.tradeTarget.targetName()}] 网格策略: '
f'下空单,价格: {sellPrice:.3f}')
else:
PrintLog(LogLevel.INFO, f'|- 标的[{self.tradeTarget.targetName()}] 网格策略: 已存在同价位空单,跳过下单')
if currentIdx < len(self.tradeTarget.getPriceGrid()) - 1: # 可以下多单
print(f'length: {len(self.tradeTarget.getPriceGrid())}, currentIdx = {currentIdx}')
buyIdx = currentIdx + 1
PrintLog(LogLevel.INFO,
f'|- 标的[{self.tradeTarget.targetName()}] 网格策略: '
f'已存在同价位空单,跳过下单')
# --- 下方挂买入单(多单)---
# 条件: grid_index < 价格网格长度-1,即当前位置不是价格最高点,还有向上(买入)空间
if currentIdx < len(self.tradeTarget.getPriceGrid()) - 1:
buyIdx = currentIdx + 1 # 向下一个网格
buyPrice = self.tradeTarget.getPriceGrid()[buyIdx]
remark = f'{OrderTypeBuy},{buyIdx},{self.tradeTarget.stock_code}'
if len([order for order in orders if order.order_type == xtconstant.STOCK_BUY and order.price == buyPrice]) == 0:
buy_remark = self._make_remark(OrderTypeBuy, buyIdx)
# 检查是否已存在同 remark 的买单(避免重复挂单)
if not any(o.order_remark == buy_remark for o in orders):
# 买单价格低于跌停价 → 今日无法成交,跳过下单
if buyPrice < self.todayDownStopPrice:
PrintLog(LogLevel.INFO,
f'|- 标的[{self.tradeTarget.targetName()}] '
f'下方网格[{buyIdx}]买价 {buyPrice:.3f} < 跌停价 {self.todayDownStopPrice:.3f}'
f'今日无法下买单 (当前网格基准 grid-{currentIdx})')
else:
tmpOrderSeq = qmtv.orderAsync(
str(self.tradeTarget.stock_code),
self.tradeTarget.grid_volume,
xtconstant.STOCK_BUY,
xtconstant.STOCK_BUY, # 买入
buyPrice,
xtconstant.FIX_PRICE,
remark, # remark # type: ignore
self.getName(), # strategy_name
buy_remark,
self.getName(),
)
self.orderGrid[buyIdx] = tmpOrderSeq # seq
PrintLog(LogLevel.INFO, f'|- 标的[{self.tradeTarget.targetName()}] 网格策略: 下多单,价格: {buyPrice:.3f}')
self.orderGrid[buyIdx] = tmpOrderSeq
PrintLog(LogLevel.INFO,
f'|- 标的[{self.tradeTarget.targetName()}] 网格策略: '
f'下多单,价格: {buyPrice:.3f}')
else:
PrintLog(LogLevel.INFO, f'|- 标的[{self.tradeTarget.targetName()}] 网格策略: 已存在同价位多单,跳过下单')
PrintLog(LogLevel.INFO,
f'|- 标的[{self.tradeTarget.targetName()}] 网格策略: '
f'已存在同价位多单,跳过下单')
else:
PrintLog(LogLevel.INFO, f'|- 标的[{self.tradeTarget.targetName()}] 网格策略: 已过下边界,停止多单交易')
# grid_index 已到达价格网格上边界,无法再挂买入单(价格已经到顶)
PrintLog(LogLevel.INFO,
f'|- 标的[{self.tradeTarget.targetName()}] 网格策略: '
f'已过下边界,停止多单交易')
# ── 标的管理 ──────────────────────────────────────────────
def deleteTradeTarget(self, tradeTarget: model.SFGridTradeTarget):
"""
从数据库中删除该交易标的
同时发布 EventTradeTargetDeleted 事件通知 UI 刷新。
"""
PrintLog(LogLevel.INFO, f'|- 标的{tradeTarget.targetName()}信息删除: START')
self.dataUpdateLock.acquire()
try:
@@ -123,108 +265,313 @@ class SFGridStrategy:
finally:
self.dataUpdateLock.release()
# ── 交易启停控制 ──────────────────────────────────────────
def enabledTrading(self, enabled: bool) -> model.SFGridTradeTarget:
PrintLog(LogLevel.INFO, f" |- [DEBUG] enabledTrading({enabled}) 调用前: grid_index={self.tradeTarget.grid_index}, status={self.tradeTarget.status}")
"""
启用或停用该标的的网格交易
启用时 (enabled=True):
- status=0: 初始化网格索引后调用 refreshGridOrder 下建仓单
- status=1: 检查持仓是否满足当前网格位置要求,满足则刷新网格订单
不满足则回退 enabled=False(风控保护)
停用时 (enabled=False):
- 取消该标的所有未成交订单,停止交易监控
返回:
更新后的 tradeTarget 对象
"""
PrintLog(LogLevel.INFO,
f" |- [DEBUG] enabledTrading({enabled}) 调用前: "
f"grid_index={self.tradeTarget.grid_index}, status={self.tradeTarget.status}")
self.tradeTarget.enabled = enabled # type: ignore
if enabled:
PrintLog(LogLevel.INFO, f" |- 标的{self.tradeTarget.targetName()}交易启动, 持仓量:{self.tradeTarget.current_position}")
if self.tradeTarget.status == 0: # 未建仓
# ── 启用交易 ──
PrintLog(LogLevel.INFO,
f" |- 标的{self.tradeTarget.targetName()}交易启动, "
f"持仓量:{self.tradeTarget.current_position}")
if self.tradeTarget.status == 0:
# 未建仓状态: 初始化网格索引
if self.tradeTarget.grid_index == 0:
# grid_index=0 表示从未初始化过,设为 1(价格网格最高点建仓)
self.tradeTarget.grid_index = 1 # pyright: ignore[reportAttributeAccessIssue]
PrintLog(LogLevel.INFO, f" |- 标的{self.tradeTarget.targetName()}初始状态, 设置网格序号 1,")
PrintLog(LogLevel.INFO,
f" |- 标的{self.tradeTarget.targetName()}初始状态, "
f"设置网格序号 1,")
else:
PrintLog(LogLevel.INFO, f" |- 标的{self.tradeTarget.targetName()}初始状态, 保留网格序号 {self.tradeTarget.grid_index},")
else: # 已建仓
# 交易阶段,检查仓位,检查现有订单
PrintLog(LogLevel.INFO, f" |- 标的{self.tradeTarget.targetName()}已有仓位或非初始状态 无需建初始仓 当前仓位: {self.tradeTarget.current_position} 状态: {self.tradeTarget.status}")
# grid_index 非零,保留之前设置的值(可能是手动修改的)
PrintLog(LogLevel.INFO,
f" |- 标的{self.tradeTarget.targetName()}初始状态, "
f"保留网格序号 {self.tradeTarget.grid_index},")
else:
# 已建仓状态: 检查现有持仓是否满足当前网格位置的仓位需求
# 最小需求仓位 = 每格股数 × 当前网格索引
# 例: grid_volume=100, grid_index=3 → 需持股 300 股
PrintLog(LogLevel.INFO,
f" |- 标的{self.tradeTarget.targetName()}已有仓位或非初始状态 "
f"无需建初始仓 当前仓位: {self.tradeTarget.current_position} "
f"状态: {self.tradeTarget.status}")
minRequirePosition: int = self.tradeTarget.grid_volume * int(self.tradeTarget.grid_index) # type: ignore
if minRequirePosition <= int(self.tradeTarget.current_position): # type: ignore
PrintLog(LogLevel.INFO, f' |- 仓位检查: 持仓需求充足, (gridVolume*gridIndex)={minRequirePosition}, 当前持仓:{self.tradeTarget.current_position}')
# 持仓充足,可以继续网格交易
PrintLog(LogLevel.INFO,
f' |- 仓位检查: 持仓需求充足, '
f'(gridVolume*gridIndex)={minRequirePosition}, '
f'当前持仓:{self.tradeTarget.current_position}')
else:
PrintLog(LogLevel.INFO, f' |- 仓位检查: 持仓需求不足, (gridVolume*gridIndex)={minRequirePosition}, 当前持仓:{self.tradeTarget.current_position}, 交易启动失败')
# 持仓不足(可能是之前部分成交或手动减仓),风控:拒绝启用
PrintLog(LogLevel.INFO,
f' |- 仓位检查: 持仓需求不足, '
f'(gridVolume*gridIndex)={minRequirePosition}, '
f'当前持仓:{self.tradeTarget.current_position}, '
f'交易启动失败')
self.tradeTarget.enabled = False # type: ignore
# 无论 status=0 还是 status=1,最终都调用 refreshGridOrder 下对应的单
self.refreshGridOrder()
else:
# ── 停用交易: 取消所有未成交订单 ──
orders = qmtv.queryPendingOrder(self.tradeTarget.stock_code, self.getName()) # type: ignore
for order in orders:
qmtv.xttrader.cancel_order_stock_async(qmtv.account, order.order_id)
try:
qmtv.xt_trader.cancel_order_stock_async(qmtv.account, order.order_id)
except AttributeError:
pass # 模拟模式无 xt_trader,跳过撤单
if len(orders) > 0:
PrintLog(LogLevel.INFO, f' |- 取消未成交订单 {len(orders)}')
PrintLog(LogLevel.INFO, f" |- 标的{self.tradeTarget.targetName()}交易监控暂停")
# 持久化状态到数据库
self.saveProxy()
return self.tradeTarget
def isEnabled(self) -> bool:
print(f'|- 检查交易状态[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}] - {self.tradeTarget.enabled}')
return bool(self.tradeTarget.enabled) # 修复返回类型问题
"""查询交易是否已启用"""
PrintLog(LogLevel.DEBUG, f'|- 检查交易状态[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}] - {self.tradeTarget.enabled}')
return bool(self.tradeTarget.enabled)
def onOrderCreateAsync(self, response:XtOrderResponse): # 下单成功回调,更新orderID到 self.orderGrid
remark = response.order_remark.split(',')
stockCode = remark[2] # 从remark中获取stockCode
if response.strategy_name != self.getName() or len(remark) < 3 or self.tradeTarget.stock_code != stockCode:
# ── 事件回调: 订单创建 ────────────────────────────────────
def onOrderCreateAsync(self, response: XtOrderResponse):
"""
QMT 异步下单成功回调
xtquant 下单是异步的:orderAsync() 返回 seq(序号),
交易所确认后通过此回调返回正式的 order_id。
此处将 orderGrid 中的临时 seq 替换为正式 order_id。
"""
parsed = self._filter_event(response.order_remark, response.strategy_name)
if parsed is None:
return
_, gridIdx, _ = parsed
self.dataUpdateLock.acquire()
try:
gridIdx = remark[1] # 从remark中获取gridIdx
PrintLog(LogLevel.INFO, f"委托创建通知 onOrderCreateAsync[{self.tradeTarget.targetName()}]: {response.order_id}")
PrintLog(LogLevel.INFO,
f"委托创建通知 onOrderCreateAsync[{self.tradeTarget.targetName()}]: "
f"{response.order_id}")
# 将 orderGrid 中的临时 seq 替换为正式 order_id
self.orderGrid[gridIdx] = response.order_id
PrintLog(LogLevel.INFO, f"委托创建通知 onOrderCreateAsync 更新 grid-{gridIdx} seq:{response.seq} -> order_id:{response.order_id}")
PrintLog(LogLevel.INFO,
f"委托创建通知 onOrderCreateAsync 更新 grid-{gridIdx} "
f"seq:{response.seq} -> order_id:{response.order_id}")
except Exception as e:
PrintLog(LogLevel.ERROR, f"|- 委托创建通知 onOrderCreateAsync[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}]: {response.order_id} - {str(e)}")
PrintLog(LogLevel.ERROR,
f"|- 委托创建通知 onOrderCreateAsync"
f"[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}]: "
f"{response.order_id} - {str(e)}")
finally:
self.dataUpdateLock.release()
def onOrderTrade(self, trade:XtTrade): # TODO 委托成交通知,处理成交后网格切换
remark = trade.order_remark.split(',')
if trade.strategy_name != self.getName() or len(remark) < 3 or self.tradeTarget.stock_code != trade.stock_code:
# ── 事件回调: 订单失败 ────────────────────────────────────
def onOrderError(self, order_error: XtOrderError):
"""
QMT 委托失败回调
当 xtquant 拒绝订单时触发(如资金不足、代码格式错误、涨跌停限制等)。
清理 orderGrid 中对应网格索引的孤立条目,防止后续 refreshGridOrder
误判"已有同价位订单"而跳过重新下单。
"""
parsed = self._filter_event(order_error.order_remark, order_error.strategy_name)
if parsed is None:
return
PrintLog(LogLevel.INFO, f'|- 委托成交通知[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{trade.order_id}] : {trade.order_id}')
_, gridIdx, _ = parsed
self.dataUpdateLock.acquire()
try:
orderType = trade.order_remark.split(',')[0]
gridIdx = trade.order_remark.split(',')[1] # 从remark中获取gridIdx
type:str = ""
if orderType == OrderTypeInit:
PrintLog(LogLevel.INFO, f'|- 委托成交通知[{self.tradeTarget.targetName()}-{trade.order_id}] - 建仓单成交')
self.tradeTarget.status = 1 # type: ignore
self.tradeTarget.init_price = trade.traded_price # type: ignore
PrintLog(LogLevel.INFO, f'|- [DEBUG] 建仓单成交: grid_index {self.tradeTarget.grid_index} → 1')
self.tradeTarget.grid_index = 1 # type: ignore
type = "建仓单"
else:
PrintLog(LogLevel.INFO, f'|- 委托成交通知[{self.tradeTarget.targetName()}-{trade.order_id}] - 网格单成交')
oriIdx = self.tradeTarget.grid_index
if gridIdx > self.tradeTarget.grid_index:
type = "下移一格"
self.tradeTarget.grid_index +=1
elif gridIdx < self.tradeTarget.grid_index:
type = "上移一格"
self.tradeTarget.grid_match_count += 1
self.tradeTarget.grid_total_profit += self.tradeTarget.grid_size * trade.traded_volume
self.tradeTarget.grid_index -= 1
else:
type = "保持格, 理论上不应该输出"
PrintLog(LogLevel.INFO, f'|- 委托成交通知[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} - 原网格位置 {oriIdx}, 现网格位置 {self.tradeTarget.grid_index}')
self.saveProxy()
# 从 orderGrid 中移除失败的订单条目,后续 refreshGridOrder 会重新挂单
if gridIdx in self.orderGrid:
del self.orderGrid[gridIdx]
PrintLog(LogLevel.INFO, f"|- 成交报告[{self.tradeTarget.targetName()}] : ====================================")
PrintLog(LogLevel.INFO, f"|- 标的[{self.tradeTarget.targetName()}] {type}-单号{trade.order_id}已成交 ")
PrintLog(LogLevel.INFO, f' 成交价: {trade.traded_price} 成交量: {trade.traded_volume}')
PrintLog(LogLevel.INFO, f' 手续费 : {trade.commission:.3f}')
self.refreshGridOrder() # 更新网格订单
PrintLog(LogLevel.ERROR,
f'委托失败[{self.tradeTarget.targetName()}] grid-{gridIdx}: '
f'order_id={order_error.order_id}, error_id={order_error.error_id}, '
f'error_msg={order_error.error_msg}')
except Exception as e:
PrintLog(LogLevel.ERROR,
f'委托失败处理异常[{self.tradeTarget.stock_code}]: {str(e)}')
finally:
self.dataUpdateLock.release()
# ── 事件回调: 订单成交 ────────────────────────────────────
def onOrderTrade(self, trade: XtTrade):
"""
QMT 委托成交通知回调
成交后:
1. 更新网格索引(卖出上移 / 买入下移)
2. 如果是建仓单成交: status 0→1, 记录建仓价
3. 如果是网格单成交: 累计网格匹配次数和总利润
4. 从 orderGrid 删除已成交订单
5. 持久化状态到数据库
6. 调用 refreshGridOrder 挂新的网格单
trade.order_remark 格式: "{type},{gridIdx},{stockCode}"
"""
parsed = self._filter_event(trade.order_remark, trade.strategy_name)
if parsed is None:
return
orderType, gridIdx, _ = parsed
PrintLog(LogLevel.INFO,
f'|- 委托成交通知'
f'[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{trade.order_id}] : '
f'{trade.order_id}')
self.dataUpdateLock.acquire()
try:
desc: str = "" # 用于日志展示的成交类型描述
# ── 分支1: 建仓单成交 ──
if orderType == OrderTypeInit:
PrintLog(LogLevel.INFO,
f'|- 委托成交通知[{self.tradeTarget.targetName()}-{trade.order_id}] '
f'- 建仓单成交')
# 状态切换: 未建仓(0) → 已建仓(1)
self.tradeTarget.status = 1 # type: ignore
# 记录建仓价格
self.tradeTarget.init_price = trade.traded_price # type: ignore
PrintLog(LogLevel.INFO,
f'|- [DEBUG] 建仓单成交: '
f'grid_index {self.tradeTarget.grid_index} → 1')
# 建仓后网格索引固定为 1(价格网格最高点)
self.tradeTarget.grid_index = 1 # type: ignore
desc = "建仓单"
# ── 分支2: 网格单成交 ──
else:
PrintLog(LogLevel.INFO,
f'|- 委托成交通知[{self.tradeTarget.targetName()}-{trade.order_id}] '
f'- 网格单成交')
oriIdx = self.tradeTarget.grid_index # 记录原网格位置(用于日志)
# 判断成交方向: gridIdx > currentIdx → 买入成交(下移)
if gridIdx > self.tradeTarget.grid_index:
desc = "下移一格"
self.tradeTarget.grid_index += 1
# 判断成交方向: gridIdx < currentIdx → 卖出成交(上移)
elif gridIdx < self.tradeTarget.grid_index:
desc = "上移一格"
# 累计统计
self.tradeTarget.grid_match_count += 1 # 网格匹配次数+1
self.tradeTarget.grid_total_profit += (
self.tradeTarget.grid_size * trade.traded_volume
) # 累计利润 = 网格间距 × 成交量
self.tradeTarget.grid_index -= 1
# gridIdx == currentIdx: 理论上不应出现(同一个位置不会挂单给自己)
else:
desc = "保持格, 理论上不应该输出"
PrintLog(LogLevel.INFO,
f'|- 委托成交通知'
f'[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} - '
f'原网格位置 {oriIdx}, 现网格位置 {self.tradeTarget.grid_index}')
# ── 成交后处理 ──
# 1. 持久化状态到数据库
self.saveProxy()
# 2. 从 orderGrid 中删除已成交的订单(pop 防重复推送 KeyError
self.orderGrid.pop(gridIdx, None)
# 3. 打印成交报告
PrintLog(LogLevel.INFO,
f"|- 成交报告[{self.tradeTarget.targetName()}] : "
f"====================================")
PrintLog(LogLevel.INFO,
f"|- 标的[{self.tradeTarget.targetName()}] "
f"{desc}-单号{trade.order_id}已成交 ")
PrintLog(LogLevel.INFO,
f' 成交价: {trade.traded_price} 成交量: {trade.traded_volume}')
PrintLog(LogLevel.INFO,
f' 手续费 : {trade.commission:.3f}')
# 4. 刷新网格订单:在新的 grid_index 上下重新挂买卖单
self.refreshGridOrder()
finally:
self.dataUpdateLock.release()
# ── 工具方法 ──────────────────────────────────────────────
def _make_remark(self, order_tag: str, grid_idx: int) -> str:
"""构建订单 remark: '{type},{gridIdx},{stockCode}'"""
return f'{order_tag},{grid_idx},{self.tradeTarget.stock_code}'
@staticmethod
def _parse_remark(remark: str):
"""
解析订单 remark → (orderType:str, gridIdx:int, stockCode:str)
格式不符返回 None
"""
if not remark:
return None
parts = remark.split(',')
if len(parts) < 3:
return None
try:
return parts[0], int(parts[1]), parts[2]
except (ValueError, IndexError):
return None
def _filter_event(self, remark: str, strategy_name: str):
"""
事件过滤器:解析 remark 并校验是否属于本策略本标的
通过返回 parsed tuple,不通过返回 None
"""
parsed = self._parse_remark(remark)
if parsed is None:
return None
if strategy_name != self.getName() or self.tradeTarget.stock_code != parsed[2]:
return None
return parsed
def getName(self):
"""返回策略名称,用于在 QMT 中标识订单归属"""
return "SFGRID"
def saveProxy(self):
PrintLog(LogLevel.DEBUG, f'|- [DEBUG] saveProxy: {self.tradeTarget.targetName()} grid_index={self.tradeTarget.grid_index}, status={self.tradeTarget.status}')
"""
持久化 tradeTarget 到数据库,并发布 UI 更新事件
每次状态变更后调用,确保数据库与内存一致,
同时通知 UI 刷新表格显示。
"""
PrintLog(LogLevel.DEBUG,
f'|- [DEBUG] saveProxy: {self.tradeTarget.targetName()} '
f'grid_index={self.tradeTarget.grid_index}, status={self.tradeTarget.status}')
rc = self.tradeTarget.save()
event_bus.publish(EventTradeTargetUpdate, self.tradeTarget)
return rc
+34 -20
View File
@@ -1,24 +1,24 @@
# coding:utf-8
import os
import sys
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import configparser
from core.main_ui import MainWindow
import config as sdConstants
class ConfigWindow:
def __init__(self, root):
self.root = root
self.root.title("系统配置")
self.root.geometry("500x250")
self.root.geometry("500x300")
self.root.resizable(False, False)
# 居中显示
self.root.withdraw() # 先隐藏窗口
self.root.update_idletasks()
x = (self.root.winfo_screenwidth() // 2) - (500 // 2)
y = (self.root.winfo_screenheight() // 2) - (250 // 2)
self.root.geometry(f"500x250+{x}+{y}")
y = (self.root.winfo_screenheight() // 2) - (300 // 2)
self.root.geometry(f"500x300+{x}+{y}")
self.root.deiconify() # 再显示窗口
self.miniQMTPath = tk.StringVar()
@@ -54,6 +54,15 @@ class ConfigWindow:
account_entry = ttk.Entry(account_frame, textvariable=self.account_no, width=40)
account_entry.pack(side=tk.LEFT, padx=(10, 0))
# 模拟模式复选框
self.use_simulated = tk.BooleanVar(value=False)
simulated_check = ttk.Checkbutton(
main_frame,
text="使用模拟交易模式(无需真实 QMT 连接)",
variable=self.use_simulated
)
simulated_check.pack(fill=tk.X, pady=5)
# 说明文本
info_label = ttk.Label(
main_frame,
@@ -100,6 +109,7 @@ class ConfigWindow:
config['config'] = {
'miniQMTPath': mini_qmt_path.replace('\\', '/'),
'account_no': account_number,
'use_simulated_qmt': str(self.use_simulated.get()),
'log_level': 'INFO'
}
@@ -118,26 +128,29 @@ def check_and_create_config():
config_window = ConfigWindow(root)
root.mainloop()
def ask_mode():
"""询问用户选择模式"""
root = tk.Tk()
root.withdraw() # 隐藏主窗口
result = messagebox.askyesno(
"选择交易模式",
"是否使用模拟交易模式?\n\n" +
"是 → 模拟交易(无需 miniQMT,可在 macOS/Linux 运行)\n" +
"否 → 真实交易(需要 Windows + miniQMT"
)
root.destroy()
return result
def resolve_simulated_mode() -> bool:
"""确定是否使用模拟模式(CLI > 配置文件 > 默认 real"""
if '--simulated' in sys.argv:
print('[配置] 命令行指定: 模拟交易模式')
return True
if sdConstants.exist_config():
sdConstants.initConfig()
if sdConstants.use_simulated_qmt:
print('[配置] 配置文件指定: 模拟交易模式')
return True
print('[配置] 默认: 真实交易模式')
return False
def initialize_system():
"""初始化系统"""
simulated = resolve_simulated_mode()
sdConstants.use_simulated_qmt = simulated
try:
# 询问用户选择模式
if ask_mode():
# 模拟模式
if simulated:
from core.qmt_dummy import qmtv as selected_qmtv
print("[模拟模式] 使用模拟交易器")
sdConstants.miniQMTPath = '/dummy/path'
@@ -145,16 +158,17 @@ def initialize_system():
sdConstants.log_level = 'INFO'
selected_qmtv.init_qmtv()
selected_qmtv.connect()
from core.main_ui import MainWindow
window = MainWindow(sdConstants.log_level)
window.run()
else:
# 真实 QMT 模式
from core.qmt_real import qmtv as selected_qmtv
while True:
if sdConstants.exist_config() and sdConstants.initConfig():
selected_qmtv.init_qmtv()
connected = selected_qmtv.connect()
if connected:
from core.main_ui import MainWindow
window = MainWindow(sdConstants.log_level)
window.run()
break