Files
sfgrid/core/ui/tkinter/sfgrid_view.py
T
2026-06-12 16:25:41 +08:00

1534 lines
66 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Any
import tkinter as tk
from tkinter import ttk, messagebox
from datetime import datetime
import threading
import time
import core.eventbus as eBus
from core.logger import LogLevel, PrintLog
from core.sfgrid import bus_events
from core.sfgrid.model import SFGridTradeTarget
from core.qmt import qmtv
from core.sfgrid.sfgrid_strategy import SFGridStrategy
class TradeTargetUI(ttk.Frame):
def __init__(self, parent, progress=None):
import time as _time
_t0 = _time.time()
super().__init__(parent)
self.tradeTargetData:dict[int, SFGridTradeTarget] = {} # id->trade_target
self.stockCodeIdMap:dict[str, int] = {}
self.strategy_ctrl:dict[int, SFGridStrategy] = {} # stock_code->trade_target
self.targetMarketPrice: dict[int, float] = {}
self.targetAvgPrice: dict[int, float] = {}
self.listening_stock = []
# 监控价格,默认值为10
self.monitor_price = 10.0
# 追踪最后点击的表格 (0=网格, 1=未分类)
self._active_table = 0
if progress:
progress("正在加载持仓数据...", 0.3)
self.init_trade_target_pool()
# 市场监控数据
self.marketData: dict[str, Any] = {} # 存储市场数据 {stock_code: {stock_name, last_price, time}}
# 面板显示状态
self.market_monitor_visible = True
self.bottom_panel_visible = True
# 市场活跃状态 + 刷新控制
self._market_active = qmtv.isMarketActive # type: ignore
self._refresh_event = threading.Event()
self._refresh_cycle = 0 # 后台刷新周期计数
self._prices_pulled_after_close = False # 收盘后是否已拉取过
if progress:
progress("正在构建界面...", 0.6)
# 创建界面
self.create_ui()
if progress:
progress("正在初始化策略...", 0.85)
eBus.event_bus.subscribe(eBus.MarketDataUpdate, self.onMarketDataUpdated)
eBus.event_bus.subscribe(eBus.EventMarketActiveSwitch, self._on_market_active_switch)
eBus.event_bus.subscribe(bus_events.EventTradeTargetUpdate, self.onStrategyUpdate)
eBus.event_bus.subscribe(bus_events.EventTradeTargetDeleted, self.onTradeTargetDeleted)
print(f'[计时] TradeTargetUI.__init__ 总计: {_time.time() - _t0:.2f}s')
def init_trade_target_pool(self):
import time as _time
_t = _time.time()
# 一次性从 QMT 获取全部持仓
all_positions = qmtv.getAllPositions()
PrintLog(LogLevel.INFO, f'- [持仓] 从 QMT 获取到 {len(all_positions)} 个持仓')
print(f'[计时] └─ getAllPositions: {_time.time() - _t:.2f}s')
# 自动将 QMT 持仓导入到数据库(持仓但未在交易池中的标的)
_t2 = _time.time()
imported_count = 0
for code, pos in all_positions.items():
existing = SFGridTradeTarget.get_or_none(SFGridTradeTarget.stock_code == code)
if existing is None:
name = getattr(pos, 'instrument_name', '') or qmtv.getInstrumentName(code)
volume = int(pos.volume)
avg_price = float(pos.avg_price) if pos.avg_price else 0.0
SFGridTradeTarget.create(
stock_code=code,
stock_name=name,
current_position=volume,
grid_index=0,
init_price=avg_price,
grid_match_count=0,
grid_total_profit=0.0,
enabled=False,
grid_start_price=avg_price if avg_price > 0 else 10.0,
grid_size=1.0,
grid_volume=200,
grid_upper_count=1,
grid_lower_count=10
)
imported_count += 1
PrintLog(LogLevel.INFO, f'- [导入] QMT持仓 → 交易池: {code} {name} 持仓:{volume} 成本:{avg_price:.4f}')
if imported_count:
PrintLog(LogLevel.INFO, f'- [导入] 共新增 {imported_count} 个标的到交易池')
print(f'[计时] └─ 持仓导入DB: {_time.time() - _t2:.2f}s ({imported_count} 个新增)')
_t3 = _time.time()
results = SFGridTradeTarget.select()
for temp in results:
tradeTarget:SFGridTradeTarget = temp
pos = all_positions.get(tradeTarget.stock_code)
tradeTarget.current_position = 0 if pos is None else pos.volume # type: ignore
if pos is None:
self.targetAvgPrice[tradeTarget.get_id()] = 0.0
else:
self.targetAvgPrice[tradeTarget.get_id()] = pos.avg_price
PrintLog(LogLevel.INFO, f'- [成功]获取持仓信息: {tradeTarget.stock_code} {tradeTarget.targetName()} {tradeTarget.current_position} {pos.avg_price}')
PrintLog(LogLevel.DEBUG, f'- [DEBUG] updateTradeTarget 前: {tradeTarget.stock_code} grid_index={tradeTarget.grid_index}, status={tradeTarget.status}, enabled={tradeTarget.enabled}')
self.updateTradeTarget(tradeTarget, True) # 初始化的时候
PrintLog(LogLevel.DEBUG, f'- [DEBUG] updateTradeTarget 后: {tradeTarget.stock_code} grid_index={tradeTarget.grid_index}')
print(f'[计时] └─ 策略初始化: {_time.time() - _t3:.2f}s ({len(results)} 个标的)')
print(f'[计时] └─ init_trade_target_pool 总计: {_time.time() - _t:.2f}s')
PrintLog(LogLevel.INFO, f'- [成功]交易标的信息初始化, 共 {len(self.tradeTargetData)} 个标的')
# 收集所有市场数据用于市场监控
def onMarketDataUpdated(self, data):
for stock_code, tickData in data.items():
# 统一去掉后缀用于内部查找
plain_code = stock_code.split('.')[0] if '.' in stock_code else stock_code
if plain_code in self.stockCodeIdMap:
id:int = self.stockCodeIdMap[plain_code]
self.targetMarketPrice[id] = tickData['lastPrice']
tradeTarget = self.tradeTargetData[id]
lastPrice = float("{:.3f}".format(tickData['lastPrice']))
tradeTarget.market_price = lastPrice # type: ignore
self.updateTradeTarget(tradeTarget, False) # 市价更新
else:
# 非目标交易,发布市场数据更新事件用于市场监控
lastPrice = tickData['lastPrice']
# 使用用户设置的监控价格替代硬编码的10
if lastPrice == self.monitor_price or stock_code in self.listening_stock:
# 发布市场数据更新事件用于市场监控
if stock_code not in self.listening_stock:
self.listening_stock.append(stock_code)
# 更新市场监控数据用于UI显示
current_time = datetime.now().strftime("%H:%M:%S")
self.marketData[str(stock_code)] = {
'stock_name': qmtv.getInstrumentName(stock_code),
'last_price': tickData['lastPrice'],
'time': current_time
}
# 来自策略的数据更新
def onStrategyUpdate(self, target: SFGridTradeTarget):
id = target.get_id()
self.tradeTargetData[id] = target
# 唤醒 refresh_loop 立即刷新表格,统一由 refresh_table 处理
self._refresh_event.set()
# priceChange 用于控制是否对更新价格数据,进行交易判断
def updateTradeTarget(self, target: SFGridTradeTarget, save: bool = True):
if save:
target.save()
id = target.get_id()
# 更新或添加数据到本地缓存
self.tradeTargetData[id] = target
# 注册到 stockCodeIdMap(所有标的都需要行情数据)
if id not in self.stockCodeIdMap.values() and target.stock_code not in self.stockCodeIdMap:
self.stockCodeIdMap[target.stock_code] = id # type: ignore
# 只有网格策略标的才创建策略控制器
from core.sfgrid.model import STRATEGY_TYPE_GRID
if target.strategy_type == STRATEGY_TYPE_GRID: # type: ignore
if id not in self.strategy_ctrl:
self.strategy_ctrl[id] = SFGridStrategy(target) # pyright: ignore[reportArgumentType]
if id in self.targetAvgPrice:
pos = qmtv.getStockPosition(target.stock_code)
if pos is not None:
self.targetAvgPrice[id] = pos.avg_price
# UI CREATE
def create_ui(self):
"""创建UI界面"""
import time as _time
_t = _time.time()
# 主框架(使用self作为父容器)
main_frame = ttk.Frame(self)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# 表格区域(左右布局:左侧=工具栏+持仓表格,右侧=Notebook标签页)
self.create_tables_area(main_frame)
print(f'[计时] └─ create_tables_area: {_time.time() - _t:.2f}s')
# 启动刷新线程
self.refresh_thread = threading.Thread(target=self.refresh_loop, daemon=True)
self.refresh_thread.start()
print(f'[计时] └─ create_ui 总计: {_time.time() - _t:.2f}s')
def _on_market_active_switch(self, is_active: bool):
"""市场活跃状态变更回调"""
self._market_active = is_active
if is_active:
self._prices_pulled_after_close = False
self._refresh_event.set() # 唤醒 refresh_loop
def refresh_loop(self):
"""刷新循环(后台线程:拉取市价 + 调度UI刷新)"""
while True:
if self._market_active:
# 盘中:每5s拉取缺失的市价
prices = {} # id -> price
for id, target in list(self.tradeTargetData.items()):
if id not in self.targetMarketPrice or self.targetMarketPrice[id] == 0:
price = qmtv.getLastPrice(target.stock_code) # type: ignore
if price > 0:
prices[id] = price
if prices:
self.after(0, lambda p=prices: self._apply_prices(p))
self.after(0, self.refresh_table)
self.after(0, self.populate_market_table)
# 每 6 个周期 (30s) 自动刷新委托/成交
self._refresh_cycle += 1
if self._refresh_cycle % 6 == 0:
self.after(0, self._refresh_orders)
self.after(0, self._refresh_trades)
self._refresh_event.wait(timeout=5)
self._refresh_event.clear()
else:
# 收盘后:只拉取一次收盘价,之后等待市场重新活跃
if not self._prices_pulled_after_close:
prices = {}
for id, target in list(self.tradeTargetData.items()):
if id not in self.targetMarketPrice or self.targetMarketPrice[id] == 0:
price = qmtv.getLastPrice(target.stock_code) # type: ignore
if price > 0:
prices[id] = price
if prices:
self.after(0, lambda p=prices: self._apply_prices(p))
self.after(0, self.refresh_table)
self.after(0, self.populate_market_table)
self._prices_pulled_after_close = True
PrintLog(LogLevel.INFO, '[刷新] 收盘后已拉取收盘价,进入休眠等待开盘')
# 休眠直到市场重新活跃(或每60s检查一次以防漏信号)
self._refresh_event.wait(timeout=60)
self._refresh_event.clear()
def _apply_prices(self, prices: dict):
"""主线程回调:将后台拉取的市价写入缓存"""
for id, price in prices.items():
self.targetMarketPrice[id] = price
if id in self.tradeTargetData:
self.tradeTargetData[id].market_price = price # type: ignore
def create_tables_area(self, parent):
"""创建表格区域 — 左侧工具栏+持仓表格,右侧Notebook"""
# 创建主表格框架(水平排列)
tables_frame = ttk.Frame(parent)
tables_frame.pack(fill=tk.BOTH, expand=True)
# ========== 左侧:工具栏 + 持仓表格 ==========
left_frame = ttk.Frame(tables_frame)
left_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5))
# 工具栏(在左侧顶部)
toolbar_frame = ttk.Frame(left_frame)
toolbar_frame.pack(fill=tk.X, pady=(0, 5))
ttk.Button(toolbar_frame, text=" 添加标的",
command=self.btnHandlerAddTradeTarget, width=12).pack(side=tk.LEFT, padx=2)
ttk.Button(toolbar_frame, text="🗑 删除标的",
command=self.btnHandlerDelSelectedTradeTarget, width=12).pack(side=tk.LEFT, padx=2)
ttk.Button(toolbar_frame, text="▶️ 启动交易",
command=self.btnHandlerStartSelectedTrade, width=12).pack(side=tk.LEFT, padx=2)
ttk.Button(toolbar_frame, text="⏸ 暂停交易",
command=self.btnHandlerStopSelectedTrade, width=12).pack(side=tk.LEFT, padx=2)
ttk.Button(toolbar_frame, text="🛠 交易设置",
command=self.btnHandlerTradeSettings, width=12).pack(side=tk.LEFT, padx=2)
# 右上角 VSCode 风格图标按钮
sidebar_btn = PanelIcon(toolbar_frame, 'sidebar', self.btnHandlerToggleMarketMonitor,
active=self.market_monitor_visible)
sidebar_btn.pack(side=tk.RIGHT, padx=1, pady=2)
bottom_btn = PanelIcon(toolbar_frame, 'bottom', self._toggle_bottom_panel,
active=self.bottom_panel_visible)
bottom_btn.pack(side=tk.RIGHT, padx=1, pady=2)
self._sidebar_icon = sidebar_btn
self._bottom_icon = bottom_btn
# 上半部分: 网格策略持仓
grid_frame = ttk.LabelFrame(left_frame, text="网格策略持仓", padding=5)
grid_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 3))
self.create_grid_table(grid_frame)
# 下半部分: 未分类持仓
self.unclassified_frame = ttk.LabelFrame(left_frame, text="未分类持仓", padding=5)
self.unclassified_frame.pack(fill=tk.BOTH, expand=True, pady=(3, 0))
self.create_unclassified_table(self.unclassified_frame)
# 右侧: Notebook 标签页容器
self.right_notebook = ttk.Notebook(tables_frame)
self.right_notebook.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=(5, 0))
# Tab 1: 实时价格监控
self.market_frame = ttk.Frame(self.right_notebook)
self.right_notebook.add(self.market_frame, text="实时价格监控")
# 价格监控控件(tab 内顶部)
monitor_control_frame = ttk.Frame(self.market_frame)
monitor_control_frame.pack(fill=tk.X, pady=(0, 5))
ttk.Label(monitor_control_frame, text="监控配置").pack(side=tk.LEFT, padx=(0, 5))
ttk.Label(monitor_control_frame, text="价格").pack(side=tk.LEFT, padx=(0, 2))
self.monitor_price_entry = ttk.Entry(monitor_control_frame, width=8)
self.monitor_price_entry.insert(0, str(self.monitor_price))
self.monitor_price_entry.pack(side=tk.LEFT, padx=2)
ttk.Button(monitor_control_frame, text="确认",
command=self.btnHandlerSetMonitorPrice, width=6).pack(side=tk.LEFT, padx=2)
self.create_market_monitor_table(self.market_frame)
# Tab 2: 当日委托
self.order_tab = ttk.Frame(self.right_notebook)
self.right_notebook.add(self.order_tab, text="当日委托")
self._create_order_table(self.order_tab)
# Tab 3: 当日成交
self.trade_tab = ttk.Frame(self.right_notebook)
self.right_notebook.add(self.trade_tab, text="当日成交")
self._create_trade_table(self.trade_tab)
# Tab 切换时自动刷新
self.right_notebook.bind("<<NotebookTabChanged>>", self._on_tab_changed)
def create_grid_table(self, parent):
"""创建网格策略表格"""
columns = ("ID",
"股票", "市场价", "当前持仓",
"平均成本", "当前网格基准价", "交易状态"
)
self.grid_table = ttk.Treeview(parent, columns=columns, show='headings', height=15)
column_configs = {
"ID": (50, tk.CENTER),
"股票": (120, tk.CENTER),
"市场价": (60, tk.E),
"当前持仓": (70, tk.E),
"平均成本": (60, tk.E),
"当前网格基准价": (100, tk.E),
"交易状态": (100, tk.CENTER)
}
for col in columns:
width, anchor = column_configs[col]
self.grid_table.heading(col, text=col)
self.grid_table.column(col, width=width, anchor=anchor) # type: ignore
self.populate_grid_table()
scrollbar = ttk.Scrollbar(parent, orient=tk.VERTICAL, command=self.grid_table.yview)
self.grid_table.configure(yscrollcommand=scrollbar.set)
self.grid_table.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.grid_table.bind("<Double-1>", self.on_grid_table_double_click)
self.grid_table.bind("<Button-3>", self.on_grid_table_right_click)
self.grid_table.bind("<Button-1>", lambda e: setattr(self, '_active_table', 0))
def create_unclassified_table(self, parent):
"""创建未分类持仓表格"""
columns = ("ID",
"股票", "市场价", "当前持仓",
"平均成本"
)
self.unclassified_table = ttk.Treeview(parent, columns=columns, show='headings', height=15)
column_configs = {
"ID": (50, tk.CENTER),
"股票": (120, tk.CENTER),
"市场价": (60, tk.E),
"当前持仓": (70, tk.E),
"平均成本": (60, tk.E),
}
for col in columns:
width, anchor = column_configs[col]
self.unclassified_table.heading(col, text=col)
self.unclassified_table.column(col, width=width, anchor=anchor) # type: ignore
self.populate_unclassified_table()
scrollbar = ttk.Scrollbar(parent, orient=tk.VERTICAL, command=self.unclassified_table.yview)
self.unclassified_table.configure(yscrollcommand=scrollbar.set)
self.unclassified_table.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.unclassified_table.bind("<Double-1>", self.on_unclassified_table_double_click)
self.unclassified_table.bind("<Button-3>", self.on_unclassified_table_right_click)
self.unclassified_table.bind("<Button-1>", lambda e: setattr(self, '_active_table', 1))
def create_market_monitor_table(self, parent):
"""创建市场监控表格"""
columns = ("时间", "股票名称", "最新价格")
self.market_table = ttk.Treeview(parent, columns=columns, show='headings', height=15)
# 列配置
column_configs = {
"时间": (50, tk.CENTER),
"股票名称": (80, tk.CENTER),
"最新价格": (50, tk.CENTER)
}
for col in columns:
width, anchor = column_configs[col]
self.market_table.heading(col, text=col)
self.market_table.column(col, width=width, anchor=anchor) # type: ignore
# 滚动条
scrollbar = ttk.Scrollbar(parent, orient=tk.VERTICAL, command=self.market_table.yview)
self.market_table.configure(yscrollcommand=scrollbar.set)
self.market_table.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 绑定双击事件
self.market_table.bind("<Double-1>", self.on_market_table_double_click)
# 填充初始数据
self.populate_market_table()
# ---- 委托/成交 Tab 页 ----
_ORDER_STATUS_MAP = {
48: '未报', 49: '待报', 50: '已报', 51: '已报待撤',
52: '部成待撤', 53: '部撤', 54: '已撤', 55: '部成',
56: '已成', 57: '废单',
}
@staticmethod
def _order_direction(ot: int) -> str:
return '' if ot in (23,) else '' if ot in (24,) else str(ot)
def _create_order_table(self, parent):
"""创建当日委托表格"""
toolbar = ttk.Frame(parent)
toolbar.pack(fill=tk.X, pady=(0, 3))
ttk.Button(toolbar, text="刷新", command=self._refresh_orders, width=6).pack(side=tk.RIGHT)
ttk.Label(toolbar, text="当日委托", font=('', 10, 'bold')).pack(side=tk.LEFT)
cols = ("时间", "代码", "名称", "方向", "委托价", "委托量", "已成交", "均价", "状态")
self.order_tree = ttk.Treeview(parent, columns=cols, show='headings', height=14)
widths = {"时间": 70, "代码": 60, "名称": 70, "方向": 30, "委托价": 55, "委托量": 55, "已成交": 55, "均价": 55, "状态": 55}
for c in cols:
self.order_tree.heading(c, text=c)
self.order_tree.column(c, width=widths.get(c, 60), anchor=tk.CENTER)
sb = ttk.Scrollbar(parent, orient=tk.VERTICAL, command=self.order_tree.yview)
self.order_tree.configure(yscrollcommand=sb.set)
self.order_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
sb.pack(side=tk.RIGHT, fill=tk.Y)
def _create_trade_table(self, parent):
"""创建当日成交表格"""
toolbar = ttk.Frame(parent)
toolbar.pack(fill=tk.X, pady=(0, 3))
ttk.Button(toolbar, text="刷新", command=self._refresh_trades, width=6).pack(side=tk.RIGHT)
ttk.Label(toolbar, text="当日成交", font=('', 10, 'bold')).pack(side=tk.LEFT)
cols = ("时间", "代码", "名称", "方向", "成交价", "成交量", "成交金额", "手续费")
self.trade_tree = ttk.Treeview(parent, columns=cols, show='headings', height=14)
widths = {"时间": 70, "代码": 60, "名称": 70, "方向": 30, "成交价": 55, "成交量": 55, "成交金额": 65, "手续费": 50}
for c in cols:
self.trade_tree.heading(c, text=c)
self.trade_tree.column(c, width=widths.get(c, 60), anchor=tk.CENTER)
sb = ttk.Scrollbar(parent, orient=tk.VERTICAL, command=self.trade_tree.yview)
self.trade_tree.configure(yscrollcommand=sb.set)
self.trade_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
sb.pack(side=tk.RIGHT, fill=tk.Y)
def _on_tab_changed(self, event):
"""Tab 切换时自动刷新"""
nb = self.right_notebook
try:
tab_id = nb.select()
tab_text = nb.tab(tab_id, "text")
if tab_text == "当日委托":
self._refresh_orders()
elif tab_text == "当日成交":
self._refresh_trades()
except Exception:
pass
def _refresh_orders(self):
"""从 QMT 读取当日委托并刷新表格"""
from core.qmt import qmtv
try:
orders = qmtv.queryTodayOrders()
except Exception:
orders = []
for item in self.order_tree.get_children():
self.order_tree.delete(item)
_CANCELED = {54, 57}
for o in orders:
if getattr(o, 'order_status', 0) in _CANCELED:
continue
code = getattr(o, 'stock_code', '')
if '.' in code:
code = code.split('.')[0]
self.order_tree.insert('', tk.END, values=(
str(getattr(o, 'order_time', '')),
code,
getattr(o, 'instrument_name', '') or code,
self._order_direction(getattr(o, 'order_type', 0)),
f"{getattr(o, 'price', 0):.3f}",
getattr(o, 'order_volume', 0),
getattr(o, 'traded_volume', 0),
f"{getattr(o, 'traded_price', 0):.3f}" if getattr(o, 'traded_price', 0) > 0 else '-',
self._ORDER_STATUS_MAP.get(getattr(o, 'order_status', 255), '未知'),
))
def _refresh_trades(self):
"""从 QMT 读取当日成交并刷新表格"""
from core.qmt import qmtv
try:
trades = qmtv.queryTodayTrades()
except Exception:
trades = []
for item in self.trade_tree.get_children():
self.trade_tree.delete(item)
for t in trades:
code = getattr(t, 'stock_code', '')
if '.' in code:
code = code.split('.')[0]
self.trade_tree.insert('', tk.END, values=(
str(getattr(t, 'traded_time', '')),
code,
getattr(t, 'instrument_name', '') or code,
self._order_direction(getattr(t, 'order_type', 0)),
f"{getattr(t, 'traded_price', 0):.3f}",
getattr(t, 'traded_volume', 0),
f"{getattr(t, 'traded_amount', 0):.2f}",
f"{getattr(t, 'commission', 0):.2f}",
))
def populate_market_table(self):
"""填充市场监控表格数据"""
# 保存当前选中的项
selected_items = self.market_table.selection()
selected_values = []
for item in selected_items:
values = self.market_table.item(item)['values']
if values:
selected_values.append(values[1]) # 保存股票代码
# 清空现有数据
for item in self.market_table.get_children():
self.market_table.delete(item)
# 填充市场数据
tmp = self.marketData.copy()
for stock_code, data in tmp.items():
# 处理时间格式,仅显示 hh:mm:ss
time_str = data['time']
# 如果时间字符串包含空格,说明包含日期和时间,只取时间部分
if ' ' in time_str:
time_str = time_str.split(' ')[1]
# 确保时间格式为 hh:mm:ss,如果只有 hh:mm 则补充 :00
if ':' in time_str:
time_components = time_str.split(':')
if len(time_components) == 2:
# 只有小时和分钟,补充秒
time_str = f"{time_components[0]}:{time_components[1]}:00"
elif len(time_components) >= 3:
# 有小时、分钟和秒,只取前三个部分
time_str = f"{time_components[0]}:{time_components[1]}:{time_components[2]}"
values = [
time_str,
data['stock_name']+f"-{stock_code}",
f"{data['last_price']:.3f}",
stock_code
]
self.market_table.insert('', tk.END, values=values)
# 恢复之前选中的项
if selected_values:
for item in self.market_table.get_children():
values = self.market_table.item(item)['values']
if values and values[1] in selected_values: # 比较股票代码
self.market_table.selection_add(item)
def on_market_table_double_click(self, event):
"""市场监控表格双击事件"""
selected = self.market_table.selection()
if selected:
item = selected[0]
values = self.market_table.item(item)['values']
print(values)
stock_name = values[1]
last_price = values[2]
stock_code = values[3]
# 检查是否已在交易池中
is_in_trade_pool = any(target.stock_code == stock_code for target in self.tradeTargetData.values())
if is_in_trade_pool:
messagebox.showinfo("提示", f"{stock_code} ({stock_name}) 已在交易池中")
else:
result = messagebox.askyesno(
"添加交易标的",
f"确定要将以下股票添加到交易池吗?\n\n"
f"股票代码: {stock_code}\n"
f"股票名称: {stock_name}\n"
f"最新价格: {last_price}"
)
if result:
# 发布事件通知主控制器添加标的
self.addTradeTarget(stock_code)
def get_trade_enabled_indicator(self, target: SFGridTradeTarget) -> str:
"""获取交易状态指示器"""
if target.strategy_type == 0: # 未分类 = 未配置
return "请做交易设置"
else:
if target.enabled:
return "▶ 运行中"
else:
return "⏸ 已停止"
# ---- 右键菜单 ----
def on_grid_table_right_click(self, event):
"""网格策略表格右键菜单"""
item = self.grid_table.identify_row(event.y)
if item:
self.grid_table.selection_set(item)
menu = tk.Menu(self, tearoff=0)
menu.add_command(label="移回未分类", command=self.move_to_unclassified)
menu.post(event.x_root, event.y_root)
def on_unclassified_table_right_click(self, event):
"""未分类持仓表格右键菜单"""
item = self.unclassified_table.identify_row(event.y)
if item:
self.unclassified_table.selection_set(item)
menu = tk.Menu(self, tearoff=0)
menu.add_command(label="添加到网格策略", command=self.move_to_grid)
menu.post(event.x_root, event.y_root)
def move_to_grid(self):
"""打开网格配置窗口,保存后自动标记为网格策略"""
target = self.get_selected_target()
if not target:
return
# 对未分类标的应用新默认值(仅内存,不存库)
if target.strategy_type == 0: # type: ignore
target.grid_size = 1.0 # type: ignore
target.grid_volume = 200 # type: ignore
# 不提前标记策略类型,等用户配置保存后由 save_config 自动设置
self.create_grid_config_window(target)
def move_to_unclassified(self):
"""将网格策略标的移回未分类"""
target = self.get_selected_target()
if not target:
return
result = messagebox.askyesno("确认", f"{target.targetName()} 移回未分类?\n\n网格配置将保留但交易将停止。")
if not result:
return
from core.sfgrid.model import STRATEGY_TYPE_UNCLASSIFIED
# 如果正在交易,先停止
if target.enabled and target.get_id() in self.strategy_ctrl: # type: ignore
self.strategy_ctrl[target.get_id()].enabledTrading(False)
target.strategy_type = STRATEGY_TYPE_UNCLASSIFIED # type: ignore
target.enabled = False # type: ignore
target.save()
self.refresh_table()
PrintLog(LogLevel.INFO, f'- [分类] {target.targetName()} → 未分类')
def populate_grid_table(self):
"""填充网格策略表格数据"""
# 先清空所有行再重建,防止重复
self.grid_table.delete(*self.grid_table.get_children())
for id, target in self.tradeTargetData.items():
from core.sfgrid.model import STRATEGY_TYPE_GRID
if target.strategy_type != STRATEGY_TYPE_GRID: # type: ignore
continue
grid_idx = target.grid_index
price_grid = target.getPriceGrid()
grid_info = '-'
if 0 <= grid_idx < len(price_grid):
grid_info = f'{grid_idx}({price_grid[grid_idx]:.2f}元)'
else:
grid_info = f'{grid_idx}(?)'
values = [
id,
f"{target.stock_code} {target.stock_name}",
f"{self.targetMarketPrice[id]:.3f}" if id in self.targetMarketPrice else '-',
target.current_position,
f"{self.targetAvgPrice[id]:.3f}",
grid_info,
self.get_trade_enabled_indicator(target)
]
self.grid_table.insert('', tk.END, values=values)
def populate_unclassified_table(self):
"""填充未分类持仓表格数据"""
# 先清空所有行再重建,防止重复
self.unclassified_table.delete(*self.unclassified_table.get_children())
for id, target in self.tradeTargetData.items():
from core.sfgrid.model import STRATEGY_TYPE_UNCLASSIFIED
if target.strategy_type != STRATEGY_TYPE_UNCLASSIFIED: # type: ignore
continue
values = [
id,
f"{target.stock_code} {target.stock_name}",
f"{self.targetMarketPrice[id]:.3f}" if id in self.targetMarketPrice else '-',
target.current_position,
f"{self.targetAvgPrice[id]:.3f}",
]
self.unclassified_table.insert('', tk.END, values=values)
def on_grid_table_double_click(self, event):
"""网格表格双击事件"""
selected = self.grid_table.selection()
if selected:
item = selected[0]
values = self.grid_table.item(item)['values']
target_id = int(values[0])
if target_id in self.strategy_ctrl:
ctrl = self.strategy_ctrl[target_id]
PrintLog(LogLevel.DEBUG, f"双击查看详情: {values[0]} - {values[1]}")
PrintLog(LogLevel.DEBUG, f"双击查看详情 - 订单网格")
ctrl.printPendingOrder()
def on_unclassified_table_double_click(self, event):
"""未分类表格双击 — 添加到网格策略"""
self.move_to_grid()
def get_selected_target(self):
"""获取选中的交易标的(根据最后点击的表格优先)"""
# 根据最后点击的表格决定检查顺序
if self._active_table == 1:
primary, secondary = self.unclassified_table, self.grid_table
else:
primary, secondary = self.grid_table, self.unclassified_table
for table in (primary, secondary):
selected = table.selection()
if selected:
item = selected[0]
values = table.item(item)['values']
target_id = values[0]
return self.tradeTargetData.get(int(target_id))
messagebox.showwarning("未选中", "请先选择一个交易标的")
return None
def get_selected_targets(self):
"""获取所有选中的交易标的(支持多选)"""
targets = []
for table in (self.grid_table, self.unclassified_table):
for item in table.selection():
values = table.item(item)['values']
if values:
target = self.tradeTargetData.get(int(values[0]))
if target:
targets.append(target)
return targets
def refresh_table(self):
"""刷新表格数据(纯UI操作,在主线程执行)"""
# 刷新网格策略表格
selected_items = self.grid_table.selection()
selected_values = []
for item in selected_items:
values = self.grid_table.item(item)['values']
if values:
selected_values.append(values[0])
for item in self.grid_table.get_children():
self.grid_table.delete(item)
self.populate_grid_table()
if selected_values:
for item in self.grid_table.get_children():
values = self.grid_table.item(item)['values']
if values and values[0] in selected_values:
self.grid_table.selection_add(item)
# 刷新未分类表格
unselected_items = self.unclassified_table.selection()
unselected_values = []
for item in unselected_items:
values = self.unclassified_table.item(item)['values']
if values:
unselected_values.append(values[0])
for item in self.unclassified_table.get_children():
self.unclassified_table.delete(item)
self.populate_unclassified_table()
if unselected_values:
for item in self.unclassified_table.get_children():
values = self.unclassified_table.item(item)['values']
if values and values[0] in unselected_values:
self.unclassified_table.selection_add(item)
# 刷新市场监控表格
self.populate_market_table()
def create_grid_view_window(self, target: SFGridTradeTarget):
"""创建网格配置查看窗口(只读)"""
# 获取顶层窗口
root = self.winfo_toplevel()
# 创建顶层窗口
view_window = tk.Toplevel(root)
view_window.title(f"网格配置查看 - {target.stock_code} ({target.stock_name})")
view_window.geometry("500x450")
view_window.resizable(False, False)
# 设置窗口模态
view_window.transient(root)
view_window.grab_set()
# 居中显示
root.update_idletasks()
x = root.winfo_x() + (root.winfo_width() // 2) - 250
y = root.winfo_y() + (root.winfo_height() // 2) - 225
view_window.geometry(f"500x450+{x}+{y}")
# 创建主框架
main_frame = ttk.Frame(view_window, padding=20)
main_frame.pack(fill=tk.BOTH, expand=True)
# 显示股票信息
info_frame = ttk.LabelFrame(main_frame, text="标的详情", padding=10)
info_frame.pack(fill=tk.X, pady=(0, 10))
ttk.Label(info_frame, text=f"股票代码: {target.stock_code}").grid(row=0, column=0, sticky=tk.W, pady=2)
ttk.Label(info_frame, text=f"股票名称: {target.stock_name}").grid(row=0, column=1, sticky=tk.W, padx=(20, 0), pady=2)
# 建仓状态(可变更)
status_text = "已建仓" if target.grid_index > 0 else "未建仓"
status_color = "green" if target.grid_index > 0 else "orange"
status_label = ttk.Label(info_frame, text=f"建仓状态: {status_text}", foreground=status_color)
status_label.grid(row=1, column=0, sticky=tk.W, pady=2)
def toggle_position_status():
new_idx = 0 if target.grid_index > 0 else 1
setattr(target, 'grid_index', new_idx)
target.save()
new_text = "已建仓" if new_idx > 0 else "未建仓"
new_color = "green" if new_idx > 0 else "orange"
status_label.config(text=f"建仓状态: {new_text}", foreground=new_color)
toggle_btn.config(text="标记为未建仓" if new_idx > 0 else "标记为已建仓")
self.updateTradeTarget(target, False)
PrintLog(LogLevel.INFO, f"建仓状态变更: {target.stock_code}{new_text}")
toggle_btn = ttk.Button(
info_frame,
text="标记为未建仓" if target.grid_index > 0 else "标记为已建仓",
command=toggle_position_status
)
toggle_btn.grid(row=1, column=1, sticky=tk.W, padx=(20, 0), pady=2)
ttk.Label(info_frame, text=f"状态: 已建初始仓(仅查看模式)").grid(row=2, column=0, columnspan=2, sticky=tk.W, pady=2)
# 创建网格配置查看框架
config_frame = ttk.LabelFrame(main_frame, text="网格配置", padding=10)
config_frame.pack(fill=tk.X, pady=(0, 10))
# 基准价格
base_price_frame = ttk.Frame(config_frame)
base_price_frame.pack(fill=tk.X, pady=5)
ttk.Label(base_price_frame, text="基准价格:", width=15).pack(side=tk.LEFT)
ttk.Label(base_price_frame, text=f"{target.grid_start_price:.3f}", width=15, anchor=tk.W).pack(side=tk.LEFT, padx=5)
ttk.Label(base_price_frame, text="", foreground='gray').pack(side=tk.LEFT)
# 网格大小
grid_size_frame = ttk.Frame(config_frame)
grid_size_frame.pack(fill=tk.X, pady=5)
ttk.Label(grid_size_frame, text="网格大小:", width=15).pack(side=tk.LEFT)
ttk.Label(grid_size_frame, text=f"{target.grid_size:.3f}", width=15, anchor=tk.W).pack(side=tk.LEFT, padx=5)
ttk.Label(grid_size_frame, text="", foreground='gray').pack(side=tk.LEFT)
# 网格交易量
grid_volume_frame = ttk.Frame(config_frame)
grid_volume_frame.pack(fill=tk.X, pady=5)
ttk.Label(grid_volume_frame, text="网格交易量:", width=15).pack(side=tk.LEFT)
ttk.Label(grid_volume_frame, text=str(target.grid_volume), width=15, anchor=tk.W).pack(side=tk.LEFT, padx=5)
ttk.Label(grid_volume_frame, text="", foreground='gray').pack(side=tk.LEFT)
# 上方网格数量
upper_count_frame = ttk.Frame(config_frame)
upper_count_frame.pack(fill=tk.X, pady=5)
ttk.Label(upper_count_frame, text="上方网格数量:", width=15).pack(side=tk.LEFT)
ttk.Label(upper_count_frame, text=str(target.grid_upper_count), width=15, anchor=tk.W).pack(side=tk.LEFT, padx=5)
ttk.Label(upper_count_frame, text="", foreground='gray').pack(side=tk.LEFT)
# 下方网格数量
lower_count_frame = ttk.Frame(config_frame)
lower_count_frame.pack(fill=tk.X, pady=5)
ttk.Label(lower_count_frame, text="下方网格数量:", width=15).pack(side=tk.LEFT)
ttk.Label(lower_count_frame, text=str(target.grid_lower_count), width=15, anchor=tk.W).pack(side=tk.LEFT, padx=5)
ttk.Label(lower_count_frame, text="", foreground='gray').pack(side=tk.LEFT)
# 生成网格价格序列
price_grid_frame = ttk.LabelFrame(main_frame, text="网格价格序列", padding=10)
price_grid_frame.pack(fill=tk.X, pady=(0, 10))
# 计算并显示网格价格序列
price_list = target.getPriceGrid()
price_text = ", ".join([f"{price:.3f}" for price in price_list])
# 创建文本框显示网格价格序列
text_frame = ttk.Frame(price_grid_frame)
text_frame.pack(fill=tk.BOTH, expand=True)
text_widget = tk.Text(text_frame, height=4, wrap=tk.WORD)
text_widget.insert(tk.END, price_text)
text_widget.config(state=tk.DISABLED) # 只读
scrollbar = ttk.Scrollbar(text_frame, orient=tk.VERTICAL, command=text_widget.yview)
text_widget.configure(yscrollcommand=scrollbar.set)
text_widget.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 关闭按钮
button_frame = ttk.Frame(main_frame)
button_frame.pack(fill=tk.X, pady=(10, 0))
ttk.Button(button_frame, text="关闭", command=view_window.destroy).pack(side=tk.RIGHT, padx=5)
def create_grid_config_window(self, target: SFGridTradeTarget):
"""创建网格配置窗口(可编辑)"""
# 获取顶层窗口
root = self.winfo_toplevel()
# 创建顶层窗口
config_window = tk.Toplevel(root)
config_window.title(f"网格配置 - {target.stock_code} ({target.stock_name})")
config_window.geometry("550x550")
config_window.resizable(False, False)
# 设置窗口模态
config_window.transient(root)
config_window.grab_set()
# 居中显示
root.update_idletasks()
x = root.winfo_x() + (root.winfo_width() // 2) - 275
y = root.winfo_y() + (root.winfo_height() // 2) - 275
config_window.geometry(f"550x550+{x}+{y}")
# 创建主框架
main_frame = ttk.Frame(config_window, padding=20)
main_frame.pack(fill=tk.BOTH, expand=True)
# 显示股票信息
info_frame = ttk.LabelFrame(main_frame, text="标的详情", padding=10)
info_frame.pack(fill=tk.X, pady=(0, 10))
ttk.Label(info_frame, text=f"股票代码: {target.stock_code}").grid(row=0, column=0, sticky=tk.W, pady=2)
ttk.Label(info_frame, text=f"股票名称: {target.stock_name}").grid(row=0, column=1, sticky=tk.W, padx=(20, 0), pady=2)
ttk.Label(info_frame, text=f"状态: 新标的(可配置模式)").grid(row=2, column=0, columnspan=2, sticky=tk.W, pady=2)
# 创建网格配置框架
config_frame = ttk.LabelFrame(main_frame, text="网格配置", padding=15)
config_frame.pack(fill=tk.X, pady=(0, 10))
# 创建输入框字典用于保存引用
entries = {}
# 基准价格
base_price_frame = ttk.Frame(config_frame)
base_price_frame.pack(fill=tk.X, pady=5)
ttk.Label(base_price_frame, text="基准价格:", width=15).pack(side=tk.LEFT)
base_price_entry = ttk.Entry(base_price_frame, width=15)
base_price_entry.insert(0, str(target.grid_start_price))
base_price_entry.pack(side=tk.LEFT, padx=5)
ttk.Label(base_price_frame, text="", foreground='gray').pack(side=tk.LEFT)
entries['grid_start_price'] = base_price_entry
# 网格大小
grid_size_frame = ttk.Frame(config_frame)
grid_size_frame.pack(fill=tk.X, pady=5)
ttk.Label(grid_size_frame, text="网格大小:", width=15).pack(side=tk.LEFT)
grid_size_entry = ttk.Entry(grid_size_frame, width=15)
grid_size_entry.insert(0, str(target.grid_size))
grid_size_entry.pack(side=tk.LEFT, padx=5)
ttk.Label(grid_size_frame, text="", foreground='gray').pack(side=tk.LEFT)
entries['grid_size'] = grid_size_entry
# 网格交易量
grid_volume_frame = ttk.Frame(config_frame)
grid_volume_frame.pack(fill=tk.X, pady=5)
ttk.Label(grid_volume_frame, text="网格交易量:", width=15).pack(side=tk.LEFT)
grid_volume_entry = ttk.Entry(grid_volume_frame, width=15)
grid_volume_entry.insert(0, str(target.grid_volume))
grid_volume_entry.pack(side=tk.LEFT, padx=5)
ttk.Label(grid_volume_frame, text="", foreground='gray').pack(side=tk.LEFT)
entries['grid_volume'] = grid_volume_entry
# 上方网格数量
upper_count_frame = ttk.Frame(config_frame)
upper_count_frame.pack(fill=tk.X, pady=5)
ttk.Label(upper_count_frame, text="上方网格数量:", width=15).pack(side=tk.LEFT)
upper_count_entry = ttk.Entry(upper_count_frame, width=15)
upper_count_entry.insert(0, str(target.grid_upper_count))
upper_count_entry.pack(side=tk.LEFT, padx=5)
ttk.Label(upper_count_frame, text="", foreground='gray').pack(side=tk.LEFT)
entries['grid_upper_count'] = upper_count_entry
# 下方网格数量
lower_count_frame = ttk.Frame(config_frame)
lower_count_frame.pack(fill=tk.X, pady=5)
ttk.Label(lower_count_frame, text="下方网格数量:", width=15).pack(side=tk.LEFT)
lower_count_entry = ttk.Entry(lower_count_frame, width=15)
lower_count_entry.insert(0, str(target.grid_lower_count))
lower_count_entry.pack(side=tk.LEFT, padx=5)
ttk.Label(lower_count_frame, text="", foreground='gray').pack(side=tk.LEFT)
entries['grid_lower_count'] = lower_count_entry
# 预览按钮和结果显示
preview_frame = ttk.LabelFrame(main_frame, text="网格价格序列预览", padding=10)
preview_frame.pack(fill=tk.X, pady=(0, 10))
preview_result = tk.StringVar(value="点击'预览'查看生成的网格价格序列")
def calculate_grid_prices():
"""计算网格价格序列"""
try:
base_price = float(base_price_entry.get())
grid_size = float(grid_size_entry.get())
upper_count = int(upper_count_entry.get())
lower_count = int(lower_count_entry.get())
prices = []
# 计算上方网格价格
for i in range(upper_count, 0, -1):
price = base_price + grid_size * i
prices.append(round(price, 3))
# 添加基准价格
prices.append(base_price)
# 计算下方网格价格
for i in range(1, lower_count + 1):
price = base_price - grid_size * i
# 确保价格不为负
if price >= 0:
prices.append(round(price, 3))
else:
break
return prices
except ValueError:
return None
def update_preview():
"""更新网格价格序列预览"""
prices = calculate_grid_prices()
if prices:
price_str = ", ".join([str(p) for p in prices])
preview_result.set(f"网格价格序列: {price_str}")
else:
preview_result.set("参数错误,请检查输入!")
# 绑定输入变化自动预览
for entry_widget in entries.values():
entry_widget.bind("<KeyRelease>", lambda e: update_preview())
entry_widget.bind("<FocusOut>", lambda e: update_preview())
# 预览按钮
preview_button_frame = ttk.Frame(preview_frame)
preview_button_frame.pack(fill=tk.X, pady=5)
# ttk.Button(preview_button_frame, text="预览", command=update_preview).pack(side=tk.LEFT)
# 预览结果显示
preview_label = ttk.Label(preview_button_frame, textvariable=preview_result, foreground='blue')
preview_label.pack(side=tk.LEFT, padx=10)
# 初始预览
update_preview()
# 按钮框架
button_frame = ttk.Frame(main_frame)
button_frame.pack(fill=tk.X, pady=(10, 0))
def save_config():
"""保存配置"""
try:
# 获取输入值
grid_start_price = float(base_price_entry.get())
grid_size = float(grid_size_entry.get())
grid_volume = int(grid_volume_entry.get())
grid_upper_count = int(upper_count_entry.get())
grid_lower_count = int(lower_count_entry.get())
# 更新target对象(使用setattr来正确设置Peewee字段的值)
setattr(target, 'grid_start_price', grid_start_price)
setattr(target, 'grid_size', grid_size)
setattr(target, 'grid_volume', grid_volume)
setattr(target, 'grid_upper_count', grid_upper_count)
setattr(target, 'grid_lower_count', grid_lower_count)
# grid_index 设为基准价在网格中的位置 (grid_upper_count)
setattr(target, 'grid_index', grid_upper_count)
# 自动标记为网格策略
from core.sfgrid.model import STRATEGY_TYPE_GRID
setattr(target, 'strategy_type', STRATEGY_TYPE_GRID)
# 更新策略控制器
self.updateTradeTarget(target, True) # 网格配置变更
# 关闭窗口
config_window.destroy()
# 立即刷新表格确保数据同步
self.refresh_table()
# 添加日志
PrintLog(LogLevel.INFO, f"网格配置已保存: {target.stock_code} - {target.stock_name}")
messagebox.showinfo("成功", "网格配置已保存!")
except ValueError:
messagebox.showerror("错误", "输入参数有误,请检查!")
except Exception as e:
messagebox.showerror("错误", f"保存配置失败:{str(e)}")
PrintLog(LogLevel.ERROR, f"保存网格配置失败: {str(e)}")
# 保存和取消按钮
ttk.Button(button_frame, text="保存", command=save_config).pack(side=tk.RIGHT, padx=5)
ttk.Button(button_frame, text="取消", command=config_window.destroy).pack(side=tk.RIGHT, padx=5)
def decrease_grid_index(self, grid_index_var: tk.IntVar, target: SFGridTradeTarget, required_position_label: ttk.Label, position_status_label: ttk.Label):
"""减少网格序号"""
current_value = grid_index_var.get()
if current_value > 0:
grid_index_var.set(current_value - 1)
# 同步更新需求持仓量和持仓状态
self.update_required_position_and_status(grid_index_var.get(), target, required_position_label, position_status_label)
def increase_grid_index(self, grid_index_var: tk.IntVar, max_index: int, target: SFGridTradeTarget, required_position_label: ttk.Label, position_status_label: ttk.Label):
"""增加网格序号"""
current_value = grid_index_var.get()
if current_value < max_index:
grid_index_var.set(current_value + 1)
# 同步更新需求持仓量和持仓状态
self.update_required_position_and_status(grid_index_var.get(), target, required_position_label, position_status_label)
def update_position_status(self, current_position: int, required_position: int, status_label: ttk.Label):
"""更新持仓量状态提示"""
if current_position >= required_position:
status_label.config(text="持仓量充足", foreground="green")
else:
shortage = required_position - current_position
status_label.config(text=f"还需补充 {shortage} 手仓位", foreground="red")
def update_required_position_and_status(self, grid_index: int, target: SFGridTradeTarget, required_position_label: ttk.Label, position_status_label: ttk.Label):
"""更新需求持仓量和持仓状态"""
# 计算需求持仓量
required_position:int = grid_index * target.grid_volume # type: ignore
required_position_label.config(text=str(required_position))
# 更新持仓量状态
current_position = getattr(target, 'current_position')
self.update_position_status(current_position, required_position, position_status_label)
# 交易池管理
def addTradeTarget(self, stock_code: str, gridIndex: int = 1): # 新增
"""处理添加交易标的事件"""
try:
stock_name = qmtv.getInstrumentName(stock_code)
if not stock_name:
PrintLog(LogLevel.ERROR, f'无法获取股票代码 {stock_code} 的名称,请检查代码是否正确')
return
PrintLog(LogLevel.DEBUG, f'添加交易标的: {stock_code} {stock_name}')
# 检查是否已存在该标的
existing_target = SFGridTradeTarget.get_or_none(SFGridTradeTarget.stock_code == stock_code)
if existing_target:
PrintLog(LogLevel.INFO, f'交易标的 {stock_code} {stock_name} 已存在')
return
# 刷新标的持仓
pos = qmtv.getStockPosition(stock_code) # type: ignore
new_target = SFGridTradeTarget.create(
stock_name=stock_name,
stock_code=stock_code,
current_position=0 if pos is None else int(pos.volume),
grid_index=gridIndex,
init_price=0.0,
strategy_type=0 # 默认为未分类
)
# 更新标的池
self.updateTradeTarget(new_target, True) # 新增标的,相当于也是初始化
except Exception as e:
PrintLog(LogLevel.ERROR, f'新增交易标的失败 {stock_code} {e}')
# button handlers =============================================================================================
def btnHandlerGridCorrect(self):
target = self.get_selected_target()
if not target:
return
self.create_grid_correction_window(target)
def btnHandlerToggleMarketMonitor(self):
"""切换右侧面板显示/隐藏"""
if self.market_monitor_visible:
self.right_notebook.pack_forget()
self.market_monitor_visible = False
else:
self.right_notebook.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=(5, 0))
self.market_monitor_visible = True
if hasattr(self, '_sidebar_icon'):
self._sidebar_icon.set_active(self.market_monitor_visible)
def _toggle_bottom_panel(self):
"""切换底部未分类持仓面板显示/隐藏"""
if self.bottom_panel_visible:
self.unclassified_frame.pack_forget()
self.bottom_panel_visible = False
else:
self.unclassified_frame.pack(fill=tk.BOTH, expand=True, pady=(3, 0))
self.bottom_panel_visible = True
if hasattr(self, '_bottom_icon'):
self._bottom_icon.set_active(self.bottom_panel_visible)
def btnHandlerTradeSettings(self):
"""网格配置功能"""
target = self.get_selected_target()
if not target:
return
# 只要暂停交易就可以修改参数,运行中则仅可查看
if not target.enabled: # type: ignore
self.create_grid_config_window(target)
else:
# 创建只读的网格配置查看窗口
self.create_grid_view_window(target)
def btnHandlerStartSelectedTrade(self):
"""启动选中的交易(支持多选)"""
from core.sfgrid.model import STRATEGY_TYPE_GRID
targets = self.get_selected_targets()
if not targets:
messagebox.showwarning("未选中", "请先选择交易标的")
return
# 过滤:只处理已暂停的网格策略标的
to_start = [t for t in targets
if t.strategy_type == STRATEGY_TYPE_GRID and not t.enabled] # type: ignore
if not to_start:
messagebox.showinfo("提示", "选中的标的中没有可启动的(已在运行中或非网格策略)")
return
# 确认对话框
names = "\n".join(f"{t.stock_code} {t.stock_name}" for t in to_start)
result = messagebox.askyesno("确认启动", f"确定要启动以下 {len(to_start)} 个交易标的吗?\n\n{names}")
if not result:
return
for target in to_start:
PrintLog(LogLevel.INFO, f'启动标的交易 {target.targetName()}')
target.enabled = True # type: ignore
ctrl = self.strategy_ctrl.get(target.get_id())
if ctrl:
ctrl.enabledTrading(True)
else:
PrintLog(LogLevel.INFO, f"\t创建标的交易控制器 {target.targetName()}")
def btnHandlerStopSelectedTrade(self):
"""暂停选中的交易(支持多选)"""
from core.sfgrid.model import STRATEGY_TYPE_GRID
targets = self.get_selected_targets()
if not targets:
messagebox.showwarning("未选中", "请先选择交易标的")
return
# 过滤:只处理已启用的网格策略标的
to_stop = [t for t in targets
if t.strategy_type == STRATEGY_TYPE_GRID and t.enabled] # type: ignore
if not to_stop:
messagebox.showinfo("提示", "选中的标的中没有可暂停的(已是暂停状态或非网格策略)")
return
# 确认对话框
names = "\n".join(f"{t.stock_code} {t.stock_name}" for t in to_stop)
result = messagebox.askyesno("确认暂停", f"确定要暂停以下 {len(to_stop)} 个交易标的吗?\n\n{names}")
if not result:
return
for target in to_stop:
PrintLog(LogLevel.INFO, f'暂停标的交易 {target.targetName()}')
ctrl = self.strategy_ctrl.get(target.get_id())
if ctrl:
ctrl.enabledTrading(False)
else:
print(f"标的交易控制器不存在 {target.stock_code} {target.stock_name}\n")
def btnHandlerDelSelectedTradeTarget(self):
"""删除选中的交易标的"""
target = self.get_selected_target()
if not target:
return
result = messagebox.askyesno(
"确认删除",
f"确定要删除以下交易标的吗?\n\n"
f"股票代码: {target.stock_code}\n"
f"股票名称: {target.stock_name}\n\n"
f"⚠️ 此操作不可恢复!",
icon='warning'
)
if result:
id = target.get_id()
# try:
if id in self.strategy_ctrl:
ctrl = self.strategy_ctrl[id]
ctrl.deleteTradeTarget(target)
else:
self.onTradeTargetDeleted(target)
PrintLog(LogLevel.INFO, f"已发送删除请求: {target.stock_code} - {target.stock_name}")
def onTradeTargetDeleted(self, target: SFGridTradeTarget):
id = target.get_id()
del self.tradeTargetData[id]
if id in self.strategy_ctrl:
del self.strategy_ctrl[id]
if target.stock_code in self.stockCodeIdMap: # type: ignore
del self.stockCodeIdMap[target.stock_code] # type: ignore
def btnHandlerAddTradeTarget(self):
"""添加新的交易标的"""
# 获取顶层窗口
root = self.winfo_toplevel()
# 创建顶层窗口
add_window = tk.Toplevel(root)
add_window.title("添加交易标的")
add_window.geometry("400x150")
add_window.resizable(False, False)
# 设置窗口模态
add_window.transient(root)
add_window.grab_set()
# 居中显示
root.update_idletasks()
x = root.winfo_x() + (root.winfo_width() // 2) - 200
y = root.winfo_y() + (root.winfo_height() // 2) - 75
add_window.geometry(f"400x150+{x}+{y}")
# 创建输入框架
input_frame = ttk.Frame(add_window, padding=20)
input_frame.pack(fill=tk.BOTH, expand=True)
# 股票代码输入
ttk.Label(input_frame, text="股票代码:").grid(row=0, column=0, sticky=tk.W, pady=5)
stock_code_entry = ttk.Entry(input_frame, width=30)
stock_code_entry.grid(row=0, column=1, pady=5, padx=(10, 0))
stock_code_entry.focus()
# 按钮框架
button_frame = ttk.Frame(input_frame)
button_frame.grid(row=1, column=0, columnspan=2, pady=20)
def confirm_add():
stock_code = stock_code_entry.get().strip()
if not stock_code:
messagebox.showwarning("输入错误", "请输入股票代码")
return
# 发布事件通知主控制器添加标的
self.addTradeTarget(stock_code)
add_window.destroy()
def cancel_add():
add_window.destroy()
# 确认和取消按钮
ttk.Button(button_frame, text="确认", command=confirm_add, width=10).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="取消", command=cancel_add, width=10).pack(side=tk.LEFT, padx=5)
# 绑定回车键确认
stock_code_entry.bind('<Return>', lambda event: confirm_add())
PrintLog(LogLevel.INFO, "点击添加交易标的按钮")
def btnHandlerSetMonitorPrice(self):
"""设置监控价格"""
try:
# 获取输入的价格
price_str = self.monitor_price_entry.get()
new_price = float(price_str)
# 更新监控价格
self.monitor_price = new_price
# 清空当前监控的数据
self.marketData.clear()
self.listening_stock.clear()
# 清空市场监控表格
for item in self.market_table.get_children():
self.market_table.delete(item)
PrintLog(LogLevel.INFO, f"监控价格已更新为: {new_price}")
except ValueError:
messagebox.showerror("错误", "请输入有效的数字")
class PanelIcon(tk.Canvas):
"""VSCode 风格面板切换图标,颜色自适应系统主题"""
_SIZE = 22
def __init__(self, parent, kind: str, command, active: bool = True):
super().__init__(parent, width=self._SIZE, height=self._SIZE,
bd=0, highlightthickness=0, cursor='hand2')
self._command = command
self._kind = kind
self._active = active
self.bind('<Button-1>', self._on_click)
self.bind('<Enter>', self._on_enter)
self.bind('<Leave>', self._on_leave)
self.bind('<Map>', lambda e: self._update_colors())
self._update_colors()
def _update_colors(self):
"""从父容器读取实际背景色,推导图标配色"""
try:
bg = self.master.cget('background') or 'SystemButtonFace'
# 转 RGB 判断明暗
rgb = self.winfo_rgb(bg) if bg.startswith('#') else None
if rgb:
r, g, b = rgb[0] >> 8, rgb[1] >> 8, rgb[2] >> 8
bright = (r * 299 + g * 587 + b * 114) / 1000
else:
bright = 240 # 系统颜色默认当作亮色
except Exception:
bright = 240
if bright > 128:
# 亮色主题
self._BG = '#e8e8e8' if bright < 220 else '#f0f0f0'
self._HOVER = '#d4d4d4'
self._OFF = '#b0b0b0'
self._ON = '#808080'
self._ACTIVE = '#0078d4' # Windows 蓝色
else:
# 暗色主题
self._BG = '#3c3c3c'
self._HOVER = '#505050'
self._OFF = '#6a6a6a'
self._ON = '#a0a0a0'
self._ACTIVE = '#ffffff'
self.configure(bg=self._BG)
self._draw()
def _draw(self):
self.delete('all')
m = 3
s = self._SIZE - m * 2
off = self._OFF
on = self._ON
if self._kind == 'sidebar':
# 左右分栏图标,右边面板高亮
l = self._ON if self._active else off
r = self._ACTIVE if self._active else off
self.create_rectangle(m + 1, m + 1, m + 6, m + s - 2, fill=l, outline='', width=0)
self.create_rectangle(m + 8, m + 1, m + 13, m + s - 2, fill=r, outline='', width=0)
else:
# 上下分栏图标,下面板高亮
t = self._ON if self._active else off
b = self._ACTIVE if self._active else off
self.create_rectangle(m + 1, m + 1, m + s - 2, m + 6, fill=t, outline='', width=0)
self.create_rectangle(m + 1, m + 8, m + s - 2, m + 13, fill=b, outline='', width=0)
def set_active(self, active: bool):
self._active = active
self._draw()
def _on_click(self, event):
self._command()
def _on_enter(self, event):
self.configure(bg=self._HOVER)
def _on_leave(self, event):
self.configure(bg=self._BG)