Files
sfgrid/core/sfgrid/sfgrid_ui.py
T

1063 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Any
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
from datetime import datetime
import threading
import time
import core.eventbus as eBus
from core.logger import LogLevel, PrintLog
from core.sfgrid.bus_events import ActionEventAddTradeTarget, ActionEventDeleteTradeTarget, ActionEventDisableTrade, ActionEventEnableTrade, ActionEventGridFix, EventTradeTargetUpdate, ResultEventTradeTargetAdded, ResultEventTradeTargetDeleted
from core.sfgrid.model import SFGridTradeTarget
import configparser
import config
from core.sfgrid.objects import GridFixData
from core.qmt import qmtv
from core.sfgrid.sfgrid_controller import SFGridController
from core.sfgrid.sfgrid_strategy import SFGridStrategy
class TradeTargetUI(ttk.Frame):
def __init__(self, parent):
super().__init__(parent)
self.controller = SFGridController()
self.tradeTargetData:dict[int, SFGridTradeTarget] = {} # id->trade_target
self.strategy_ctrl:dict[int, SFGridStrategy] = {} # stock_code->trade_target
self.init_trade_target_pool()
self.registerEventHandler()
# 创建刷新线程标志
self.refresh_thread_running = False # 默认不启动刷新线程
# 市场监控数据
self.marketData: dict[str, Any] = {} # 存储市场数据 {stock_code: {stock_name, last_price, time}}
# 创建界面
self.create_ui()
self.start_ui_refresh()
def refresh_targets(self):
# 更新标的池
results = SFGridTradeTarget.select()
for temp in results:
result :SFGridTradeTarget = temp
self.tradeTargetData[result.get_id()] = result
def init_trade_target_pool(self):
self.refresh_targets()
for id, tradeTarget in self.tradeTargetData.items():
status = "新建" if tradeTarget.status == 0 else "已建初始仓"
PrintLog(LogLevel.INFO, f' [序号-{id}] 股票代码: {tradeTarget.stock_code}-{tradeTarget.stock_name} 当前持仓: {qmtv.getStockPosition(tradeTarget.stock_code)} 网格索引: {tradeTarget.grid_index} 基准价格 {config.grid_price[tradeTarget.grid_index]} 状态: {status} 启用交易线程: {'自动交易中' if tradeTarget.enabled else '交易已停止'}') # type: ignore
tradeTarget.current_position = qmtv.getStockPosition(tradeTarget.stock_code) # type: ignore
result = tradeTarget.save()
PrintLog(LogLevel.INFO, f' |- 同步[{tradeTarget.stock_code}-{tradeTarget.stock_name}]持仓信息[{'成功' if result == 1 else '失败'}]')
stockTradeController = SFGridStrategy(tradeTarget) # type: ignore
self.strategy_ctrl[id] = stockTradeController # pyright: ignore[reportArgumentType]
# eBus.event_bus.publish(eBus.EventTradeTargetUpdate, tradeTarget)
PrintLog(LogLevel.INFO, f'- [成功]交易标的信息初始化, 共 {len(self.tradeTargetData)} 个标的')
def registerEventHandler(self):
eBus.event_bus.subscribe(EventTradeTargetUpdate, self.onTradeTargetUpdated)
# eBus.event_bus.subscribe(eBus.MarketDataUpdate, self.onMarketDataUpdated)
# eBus.event_bus.subscribe(eBus.ResultEventTradeEnabled, self.onTradeEnabled)
# eBus.event_bus.subscribe(eBus.ResultEventTradeDisabled, self.onTradeDisabled)
# eBus.event_bus.subscribe(eBus.MarketDataEnabled, self.onMarketDataToggled)
# eBus.event_bus.subscribe(eBus.MarketDataDisabled, self.onMarketDataToggled)
eBus.event_bus.subscribe(ResultEventTradeTargetDeleted, self.onTradeTargetDeleted)
eBus.event_bus.subscribe(ResultEventTradeTargetAdded, self.onTradeTargetUpdated)
def start_refresh_thread(self):
"""启动刷新线程"""
if not hasattr(self, 'refresh_thread') or not self.refresh_thread.is_alive():
self.refresh_thread = threading.Thread(target=self.refresh_loop, daemon=True)
self.refresh_thread.start()
def refresh_loop(self):
"""刷新循环"""
while self.refresh_thread_running:
# 在主线程中更新UI
self.after(0, self.refresh_table)
time.sleep(0.5) # 每0.5秒刷新一次
def stop_refresh_thread(self):
"""停止刷新线程"""
self.refresh_thread_running = False
def onTradeTargetDeleted(self, target: SFGridTradeTarget):
"""处理交易标的删除完成事件"""
# 从本地数据中删除
id = target.get_id()
if id in self.tradeTargetData:
PrintLog(LogLevel.DEBUG, f"删除交易标的,ID: {id}")
del self.tradeTargetData[id]
del self.strategy_ctrl[id]
# 添加日志
PrintLog(LogLevel.INFO, f"交易标的已删除,ID: {id}")
def onTradeEnabled(self, target:SFGridTradeTarget):
PrintLog(LogLevel.INFO, f"交易启用: {target.stock_code} - {target.stock_name}")
def onTradeDisabled(self, target:SFGridTradeTarget):
PrintLog(LogLevel.INFO, f"交易禁用: {target.stock_code} - {target.stock_name}")
def onTradeTargetUpdated(self, target: SFGridTradeTarget):
# 更新或添加数据到本地缓存
self.tradeTargetData[target.get_id()] = target
def onMarketDataUpdated(self, target: SFGridTradeTarget):
# 更新市场监控数据
current_time = datetime.now().strftime("%H:%M:%S")
self.marketData[str(target.stock_code)] = {
'stock_name': target.stock_name,
'last_price': target.market_price if target.market_price is not None else 0.0,
'time': current_time
}
def create_ui(self):
"""创建UI界面"""
# 主框架(使用self作为父容器)
main_frame = ttk.Frame(self)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# 创建工具栏
toolbar_frame = ttk.Frame(main_frame)
toolbar_frame.pack(fill=tk.X, pady=(0, 10))
# 工具栏按钮
ttk.Button(toolbar_frame, text="▶️ 启动交易",
command=self.start_selected_trade, width=12).pack(side=tk.LEFT, padx=2)
ttk.Button(toolbar_frame, text="⏸ 暂停交易",
command=self.pause_selected_trade, width=12).pack(side=tk.LEFT, padx=2)
ttk.Button(toolbar_frame, text=" 添加标的",
command=self.add_trade_target, width=12).pack(side=tk.LEFT, padx=2)
ttk.Button(toolbar_frame, text="🗑 删除标的",
command=self.delete_selected_trade, width=12).pack(side=tk.LEFT, padx=2)
ttk.Button(toolbar_frame, text="🛠 网格修正",
command=self.grid_correction, width=12).pack(side=tk.LEFT, padx=2)
# 添加分隔符
ttk.Separator(toolbar_frame, orient='vertical').pack(side=tk.LEFT, fill=tk.Y, padx=10)
# 表格区域
self.create_tables_area(main_frame)
def start_ui_refresh(self):
"""启动UI刷新线程"""
if not self.refresh_thread_running:
self.refresh_thread_running = True
self.start_refresh_thread()
PrintLog(LogLevel.INFO, "UI刷新线程已启动")
def stop_ui_refresh(self):
"""停止UI刷新线程"""
if self.refresh_thread_running:
self.stop_refresh_thread()
self.refresh_thread_running = False
PrintLog(LogLevel.INFO, "UI刷新线程已停止")
def create_tables_area(self, parent):
"""创建表格区域"""
# 创建主表格框架(水平排列)
tables_frame = ttk.Frame(parent)
tables_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 5))
# 左侧交易标的区域
trade_frame = ttk.LabelFrame(tables_frame, text="交易标的详情", padding=10)
trade_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5))
# 创建交易标的表格
self.create_trade_target_table(trade_frame)
# 右侧市场监控区域
market_frame = ttk.LabelFrame(tables_frame, text="市场监控", padding=10)
market_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=(5, 0))
# 创建市场监控表格
self.create_market_monitor_table(market_frame)
def create_trade_target_table(self, parent):
"""创建交易标的表格"""
columns = ("ID",
"股票代码", "股票名称", "市场价", "持仓数量", "网格索引",
"最新成交价", "计划买入价", "计划卖出价", "当前订单价", "当前订单号", "当前订单类型",
"启用状态", "交易状态"
)
self.trade_table = ttk.Treeview(parent, columns=columns, show='headings', height=15)
# 专业化的列配置
column_configs = {
"ID": (50, tk.CENTER),
"股票代码": (90, tk.CENTER),
"股票名称": (80, tk.CENTER),
"市场价": (70, tk.CENTER),
"持仓数量": (80, tk.CENTER),
"网格索引": (80, tk.CENTER),
"最新成交价": (90, tk.CENTER),
"计划买入价": (90, tk.CENTER),
"计划卖出价": (90, tk.CENTER),
"当前订单价": (90, tk.CENTER),
"当前订单号": (90, tk.CENTER),
"当前订单类型": (90, tk.CENTER),
"启用状态": (80, tk.CENTER),
"交易状态": (80, tk.CENTER)
}
for col in columns:
width, anchor = column_configs[col]
self.trade_table.heading(col, text=col)
self.trade_table.column(col, width=width, anchor=anchor) # type: ignore
# 填充数据
self.populate_trade_table()
# 滚动条
scrollbar = ttk.Scrollbar(parent, orient=tk.VERTICAL, command=self.trade_table.yview)
self.trade_table.configure(yscrollcommand=scrollbar.set)
self.trade_table.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 绑定双击事件
self.trade_table.bind("<Double-1>", self.on_table_double_click)
def create_market_monitor_table(self, parent):
"""创建市场监控表格"""
columns = ("时间", "股票代码", "股票名称", "最新价格")
self.market_table = ttk.Treeview(parent, columns=columns, show='headings', height=15)
# 列配置
column_configs = {
"时间": (120, "center"),
"股票代码": (90, "center"),
"股票名称": (80, "center"),
"最新价格": (80, "center")
}
for col in columns:
width, anchor = column_configs[col]
self.market_table.heading(col, text=col)
self.market_table.column(col, width=width, anchor=anchor) # type: ignore
# 滚动条
scrollbar = ttk.Scrollbar(parent, orient=tk.VERTICAL, command=self.market_table.yview)
self.market_table.configure(yscrollcommand=scrollbar.set)
self.market_table.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 绑定双击事件
self.market_table.bind("<Double-1>", self.on_market_table_double_click)
# 填充初始数据
self.populate_market_table()
def populate_market_table(self):
"""填充市场监控表格数据"""
pass
# 清空现有数据
for item in self.market_table.get_children():
self.market_table.delete(item)
# 填充市场数据
tmp = self.marketData.copy()
for stock_code, data in tmp.items():
values = [
data['time'],
stock_code,
data['stock_name'],
f"{data['last_price']:.3f}"
]
self.market_table.insert('', tk.END, values=values)
def on_market_table_double_click(self, event):
"""市场监控表格双击事件"""
selected = self.market_table.selection()
if selected:
item = selected[0]
values = self.market_table.item(item)['values']
stock_code = values[1]
stock_name = values[2]
last_price = values[3]
# 检查是否已在交易池中
is_in_trade_pool = any(target.stock_code == stock_code for target in self.tradeTargetData.values())
if is_in_trade_pool:
messagebox.showinfo("提示", f"{stock_code} ({stock_name}) 已在交易池中")
else:
result = messagebox.askyesno(
"添加交易标的",
f"确定要将以下股票添加到交易池吗?\n\n"
f"股票代码: {stock_code}\n"
f"股票名称: {stock_name}\n"
f"最新价格: {last_price}"
)
if result:
# 发布事件通知主控制器添加标的
eBus.event_bus.publish(ActionEventAddTradeTarget, stock_code)
PrintLog(LogLevel.INFO, f"已发送添加请求: {stock_code} - {stock_name}")
def get_status_indicator(self, target: SFGridTradeTarget) -> str:
"""获取状态指示器(带颜色色块的文本)"""
if target.status == 1:
# 绿色圆点表示交易中
return "🟢 已建仓"
elif target.status == 0:
# 黄色圆点表示暂停
return "🟡 未建仓"
else:
return "🔴 错误状态"
def get_trade_enabled_indicator(self, enabled: bool) -> str:
"""获取交易状态指示器"""
if enabled:
return "🟢 策略运行"
else:
return "🟡 策略暂停"
def populate_trade_table(self):
"""填充交易标的表格数据"""
for temp, target in self.tradeTargetData.items():
values = [
target.id, # type: ignore
target.stock_code,
target.stock_name,
"-" if target.market_price is None else f"{target.market_price:.3f}",
target.current_position,
target.grid_index,
'-' if target.last_trade_price is None else f"{target.last_trade_price:.3f}",
'-' if target.plan_buy_price is None else f"{target.plan_buy_price:.3f}",
'-' if target.plan_sell_price is None else f"{target.plan_sell_price:.3f}",
'-' if target.current_order_price is None else f"{target.current_order_price:.3f}",
target.current_order_no,
target.current_order_type,
self.get_status_indicator(target),
self.get_trade_enabled_indicator(target.enabled) # type: ignore
]
self.trade_table.insert('', tk.END, values=values)
def get_status_text(self, status):
"""获取状态文本"""
status_map = {
0: "新标的",
1: "交易中"
}
return status_map.get(status, "未知")
def on_table_double_click(self, event):
"""表格双击事件"""
selected = self.trade_table.selection()
if selected:
item = selected[0]
values = self.trade_table.item(item)['values']
PrintLog(LogLevel.DEBUG, f"双击查看详情: {values[0]} - {values[1]}")
def get_selected_target(self):
"""获取选中的交易标的"""
selected = self.trade_table.selection()
if not selected:
messagebox.showwarning("未选中", "请先选择一个交易标的")
return None
# 获取选中行的ID
item = selected[0]
values = self.trade_table.item(item)['values']
target_id = values[0]
# 从列表中找到对应的target对象
for id in self.tradeTargetData:
if int(target_id) == id: # type: ignore
return self.tradeTargetData[id]
return None
def start_selected_trade(self):
"""启动选中的交易"""
target = self.get_selected_target()
if not target:
return
if target.enabled: # type: ignore
messagebox.showinfo("提示", f"{target.stock_code} ({target.stock_name}) 已经在运行中")
return
result = messagebox.askyesno(
"确认启动",
f"确定要启动以下交易标的吗?\n\n"
f"股票代码: {target.stock_code}\n"
f"股票名称: {target.stock_name}"
)
if result:
PrintLog(LogLevel.INFO, f'启动标的交易 {target.targetName()}')
target.enabled = True # type: ignore
id = target.get_id()
if id in self.strategy_ctrl:
tradeController: SFGridStrategy = self.strategy_ctrl[target.get_id()]
tradeTarget = tradeController.enabledTrading(True)
self.tradeTargetData[id] = tradeTarget
else:
PrintLog(LogLevel.INFO, f"\t创建标的交易控制器 {target.targetName()}")
def pause_selected_trade(self):
"""暂停选中的交易"""
target = self.get_selected_target()
if not target:
return
if not target.enabled: # type: ignore
messagebox.showinfo("提示", f"{target.stock_code} ({target.stock_name}) 已经是暂停状态")
return
result = messagebox.askyesno(
"确认暂停",
f"确定要暂停以下交易标的吗?\n\n"
f"股票代码: {target.stock_code}\n"
f"股票名称: {target.stock_name}"
)
if result:
PrintLog(LogLevel.INFO, f'暂停标的交易 {target.targetName()}')
id = target.get_id()
if id in self.strategy_ctrl:
tradeController: SFGridStrategy = self.strategy_ctrl[target.get_id()]
tradeTarget = tradeController.enabledTrading(False)
orders = qmtv.queryPendingOrder(target.stock_code, tradeController.getName()) # type: ignore
for order in orders:
qmtv.xttrader.cancel_order_stock_async(qmtv.account, order.order_id)
print(f'取消未成交订单 {len(orders)}')
self.tradeTargetData[id] = tradeTarget
else:
print(f"标的交易控制器不存在 {target.stock_code} {target.stock_name}\n")
# self.add_log("INFO", f"已暂停交易: {target.stock_code} - {target.stock_name}")
# messagebox.showinfo("暂停成功", f"已暂停 {target.stock_code} ({target.stock_name}) 的交易")
def delete_selected_trade(self):
"""删除选中的交易标的"""
target = self.get_selected_target()
if not target:
return
result = messagebox.askyesno(
"确认删除",
f"确定要删除以下交易标的吗?\n\n"
f"股票代码: {target.stock_code}\n"
f"股票名称: {target.stock_name}\n\n"
f"⚠️ 此操作不可恢复!",
icon='warning'
)
if result:
# 通过事件总线发出删除动作
eBus.event_bus.publish(ActionEventDeleteTradeTarget, target)
PrintLog(LogLevel.INFO, f"已发送删除请求: {target.stock_code} - {target.stock_name}")
def add_trade_target(self):
"""添加新的交易标的"""
# 获取顶层窗口
root = self.winfo_toplevel()
# 创建顶层窗口
add_window = tk.Toplevel(root)
add_window.title("添加交易标的")
add_window.geometry("400x150")
add_window.resizable(False, False)
# 设置窗口模态
add_window.transient(root)
add_window.grab_set()
# 居中显示
root.update_idletasks()
x = root.winfo_x() + (root.winfo_width() // 2) - 200
y = root.winfo_y() + (root.winfo_height() // 2) - 75
add_window.geometry(f"400x150+{x}+{y}")
# 创建输入框架
input_frame = ttk.Frame(add_window, padding=20)
input_frame.pack(fill=tk.BOTH, expand=True)
# 股票代码输入
ttk.Label(input_frame, text="股票代码:").grid(row=0, column=0, sticky=tk.W, pady=5)
stock_code_entry = ttk.Entry(input_frame, width=30)
stock_code_entry.grid(row=0, column=1, pady=5, padx=(10, 0))
stock_code_entry.focus()
# 按钮框架
button_frame = ttk.Frame(input_frame)
button_frame.grid(row=1, column=0, columnspan=2, pady=20)
def confirm_add():
stock_code = stock_code_entry.get().strip()
if not stock_code:
messagebox.showwarning("输入错误", "请输入股票代码")
return
# 发布事件通知主控制器添加标的
eBus.event_bus.publish(ActionEventAddTradeTarget, stock_code)
add_window.destroy()
def cancel_add():
add_window.destroy()
# 确认和取消按钮
ttk.Button(button_frame, text="确认", command=confirm_add, width=10).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="取消", command=cancel_add, width=10).pack(side=tk.LEFT, padx=5)
# 绑定回车键确认
stock_code_entry.bind('<Return>', lambda event: confirm_add())
PrintLog(LogLevel.INFO, "点击添加交易标的按钮")
def refresh_table(self):
"""刷新表格数据"""
# 保存当前选中的项
selected_items = self.trade_table.selection()
selected_values = []
for item in selected_items:
values = self.trade_table.item(item)['values']
if values:
selected_values.append(values[0]) # 保存ID
# 清空表格
for item in self.trade_table.get_children():
self.trade_table.delete(item)
# 重新填充
self.populate_trade_table()
# 恢复之前选中的项
if selected_values:
for item in self.trade_table.get_children():
values = self.trade_table.item(item)['values']
if values and values[0] in selected_values:
self.trade_table.selection_add(item)
# 刷新市场监控表格
self.populate_market_table()
def system_settings(self):
"""系统设置"""
# 获取顶层窗口
root = self.winfo_toplevel()
settings_window = tk.Toplevel(root)
settings_window.title("网格交易系统配置")
# 设置窗口大小
window_width = 700
window_height = 550
# 先设置为模态窗口
settings_window.transient(root)
# 确保主窗口完全初始化
root.update_idletasks()
# 获取主窗口的实际大小(包括边框)
# 使用winfo_rootx/rooty获取窗口在屏幕上的绝对位置
main_x = root.winfo_rootx()
main_y = root.winfo_rooty()
main_width = root.winfo_width()
main_height = root.winfo_height()
# 计算设置窗口相对于主窗口的居中位置
x = main_x + (main_width - window_width) // 2
y = main_y + (main_height - window_height) // 2
# 设置窗口大小和位置
settings_window.geometry(f"{window_width}x{window_height}+{x}+{y}")
settings_window.resizable(False, False)
# 设置为模态窗口
settings_window.grab_set()
# 添加底部按钮区域(先创建,确保固定在底部)
button_frame = ttk.Frame(settings_window)
button_frame.pack(side=tk.BOTTOM, fill=tk.X, padx=20, pady=10)
# 创建选项卡(在按钮之后创建,填充剩余空间)
notebook = ttk.Notebook(settings_window)
notebook.pack(fill=tk.BOTH, expand=True, padx=10, pady=(10, 0))
# 基础设置
basic_frame = ttk.Frame(notebook)
notebook.add(basic_frame, text="基础设置")
# 高级设置
advanced_frame = ttk.Frame(notebook)
notebook.add(advanced_frame, text="高级设置")
# 读取当前配置
config = configparser.ConfigParser()
config_path = config.get('config', 'config_path')
config.read(config_path, encoding='utf-8')
# 创建输入框字典用于保存引用
entries = {}
# 网格价格计算参数
grid_params = {}
# 添加网格价格设置(特殊处理)
grid_price_frame = ttk.LabelFrame(basic_frame, text="网格价格设置", padding=15)
grid_price_frame.pack(fill=tk.X, padx=20, pady=10)
# 基准价格
base_price_row = ttk.Frame(grid_price_frame)
base_price_row.pack(fill=tk.X, pady=5)
ttk.Label(base_price_row, text="基准价格:", width=15, font=('Arial', 10)).pack(side=tk.LEFT)
base_price_entry = ttk.Entry(base_price_row, width=15, font=('Arial', 10))
base_price_entry.insert(0, "10.0")
base_price_entry.pack(side=tk.LEFT, padx=5)
ttk.Label(base_price_row, text="", foreground='gray', font=('Arial', 9)).pack(side=tk.LEFT)
grid_params['base_price'] = base_price_entry
# 网格类型
grid_type_row = ttk.Frame(grid_price_frame)
grid_type_row.pack(fill=tk.X, pady=5)
ttk.Label(grid_type_row, text="网格类型:", width=15, font=('Arial', 10)).pack(side=tk.LEFT)
grid_type_var = tk.StringVar(value="金额差")
ttk.Radiobutton(grid_type_row, text="百分比", variable=grid_type_var,
value="百分比", command=lambda: on_grid_type_change()).pack(side=tk.LEFT, padx=5)
ttk.Radiobutton(grid_type_row, text="金额差", variable=grid_type_var,
value="金额差", command=lambda: on_grid_type_change()).pack(side=tk.LEFT, padx=5)
grid_params['grid_type'] = grid_type_var
# 网格大小
grid_size_row = ttk.Frame(grid_price_frame)
grid_size_row.pack(fill=tk.X, pady=5)
ttk.Label(grid_size_row, text="网格大小:", width=15, font=('Arial', 10)).pack(side=tk.LEFT)
grid_size_entry = ttk.Entry(grid_size_row, width=15, font=('Arial', 10))
grid_size_entry.insert(0, "1.0")
grid_size_entry.pack(side=tk.LEFT, padx=5)
grid_size_unit_label = ttk.Label(grid_size_row, text="", foreground='gray', font=('Arial', 9))
grid_size_unit_label.pack(side=tk.LEFT)
grid_params['grid_size'] = grid_size_entry
grid_params['grid_size_unit_label'] = grid_size_unit_label
# 网格类型改变时更新单位
def on_grid_type_change():
if grid_type_var.get() == "百分比":
grid_size_unit_label.config(text="%")
grid_size_entry.delete(0, tk.END)
grid_size_entry.insert(0, "1.0")
else:
grid_size_unit_label.config(text="")
grid_size_entry.delete(0, tk.END)
grid_size_entry.insert(0, "1.0")
update_preview()
# 上方网格数量
upper_grid_row = ttk.Frame(grid_price_frame)
upper_grid_row.pack(fill=tk.X, pady=5)
ttk.Label(upper_grid_row, text="上方网格数量:", width=15, font=('Arial', 10)).pack(side=tk.LEFT)
upper_grid_entry = ttk.Entry(upper_grid_row, width=15, font=('Arial', 10))
upper_grid_entry.insert(0, "1")
upper_grid_entry.pack(side=tk.LEFT, padx=5)
ttk.Label(upper_grid_row, text="", foreground='gray', font=('Arial', 9)).pack(side=tk.LEFT)
grid_params['upper_count'] = upper_grid_entry
# 下方网格数量
lower_grid_row = ttk.Frame(grid_price_frame)
lower_grid_row.pack(fill=tk.X, pady=5)
ttk.Label(lower_grid_row, text="下方网格数量:", width=15, font=('Arial', 10)).pack(side=tk.LEFT)
lower_grid_entry = ttk.Entry(lower_grid_row, width=15, font=('Arial', 10))
lower_grid_entry.insert(0, "10")
lower_grid_entry.pack(side=tk.LEFT, padx=5)
ttk.Label(lower_grid_row, text="", foreground='gray', font=('Arial', 9)).pack(side=tk.LEFT)
grid_params['lower_count'] = lower_grid_entry
# 预览按钮和结果显示
preview_row = ttk.Frame(grid_price_frame)
preview_row.pack(fill=tk.X, pady=10)
preview_result = tk.StringVar(value="点击'预览'查看生成的网格价格序列")
def calculate_grid_prices():
"""计算网格价格序列"""
try:
base_price = float(base_price_entry.get())
grid_size = float(grid_size_entry.get())
upper_count = int(upper_grid_entry.get())
lower_count = int(lower_grid_entry.get())
grid_type = grid_type_var.get()
prices = []
# 计算上方网格价格
for i in range(upper_count, 0, -1):
if grid_type == "百分比":
price = base_price * (1 + grid_size / 100 * i)
else: # 金额差
price = base_price + grid_size * i
prices.append(round(price, 3))
# 添加基准价格
prices.append(base_price)
# 计算下方网格价格
for i in range(1, lower_count + 1):
if grid_type == "百分比":
price = base_price * (1 - grid_size / 100 * i)
else: # 金额差
price = base_price - grid_size * i
# 确保价格不为负
if price >= 0:
prices.append(round(price, 3))
else:
break
return prices
except ValueError as e:
return None
def update_preview():
"""自动更新网格价格序列预览"""
prices = calculate_grid_prices()
if prices:
price_str = ",".join([str(p) for p in prices])
preview_result.set(f"网格价格序列: {price_str}")
else:
preview_result.set("参数错误,请检查!")
# 绑定输入变化自动预览
for entry_widget in (base_price_entry, grid_size_entry, upper_grid_entry, lower_grid_entry):
entry_widget.bind("<KeyRelease>", lambda e: update_preview())
entry_widget.bind("<FocusOut>", lambda e: update_preview())
# 初始预览
update_preview()
ttk.Label(preview_row, textvariable=preview_result,
font=('Arial', 10)).pack(side=tk.LEFT, padx=10)
# 添加其他基础配置选项
other_basic_frame = ttk.LabelFrame(basic_frame, text="交易设置", padding=15)
other_basic_frame.pack(fill=tk.X, padx=20, pady=10)
other_basic_settings = [
("网格交易手数", "grid_volume", config.get('config', 'grid_volume'), "每个网格的交易手数")
]
for i, (label, key, default, tooltip) in enumerate(other_basic_settings):
frame = ttk.Frame(other_basic_frame)
frame.pack(fill=tk.X, pady=5)
label_widget = ttk.Label(frame, text=label + ":", width=15, font=('Arial', 10))
label_widget.pack(side=tk.LEFT)
entry = ttk.Entry(frame, width=15, font=('Arial', 10))
entry.insert(0, default)
entry.pack(side=tk.LEFT, padx=5)
entries[key] = entry
# 添加提示信息
tip_label = ttk.Label(frame, text=tooltip, font=('Arial', 9), foreground='gray')
tip_label.pack(side=tk.LEFT, padx=5)
# 添加高级设置选项
account_frame = ttk.LabelFrame(advanced_frame, text="账号设置", padding=15)
account_frame.pack(fill=tk.X, padx=20, pady=10)
# 交易账号
account_row = ttk.Frame(account_frame)
account_row.pack(fill=tk.X, pady=5)
ttk.Label(account_row, text="交易账号:", width=15, font=('Arial', 10)).pack(side=tk.LEFT)
account_entry = ttk.Entry(account_row, width=15, font=('Arial', 10))
account_entry.insert(0, config.get('config', 'account_no'))
account_entry.pack(side=tk.LEFT, padx=5)
entries['account_no'] = account_entry
ttk.Label(account_row, text="QMT交易账号", font=('Arial', 9), foreground='gray').pack(side=tk.LEFT, padx=5)
# QMT路径特殊处理 - 使用文件浏览器
qmt_path_frame = ttk.LabelFrame(advanced_frame, text="软件路径", padding=15)
qmt_path_frame.pack(fill=tk.X, padx=20, pady=10)
qmt_row = ttk.Frame(qmt_path_frame)
qmt_row.pack(fill=tk.X, pady=5)
ttk.Label(qmt_row, text="QMT路径:", width=15, font=('Arial', 10)).pack(side=tk.LEFT)
qmt_entry = ttk.Entry(qmt_row, width=30, font=('Arial', 10))
qmt_entry.insert(0, config.get('config', 'miniQMTPath'))
qmt_entry.pack(side=tk.LEFT, padx=5)
entries['miniQMTPath'] = qmt_entry
def browse_qmt_path():
"""打开文件夹浏览器选择QMT路径"""
initial_dir = qmt_entry.get() if qmt_entry.get() else "/"
folder_path = filedialog.askdirectory(
title="选择miniQMT安装路径",
initialdir=initial_dir
)
if folder_path:
qmt_entry.delete(0, tk.END)
qmt_entry.insert(0, folder_path)
ttk.Button(qmt_row, text="📁 浏览...", command=browse_qmt_path, width=10).pack(side=tk.LEFT, padx=5)
ttk.Label(qmt_row, text="miniQMT软件安装路径", font=('Arial', 9), foreground='gray').pack(side=tk.LEFT, padx=5)
# 定义保存和取消按钮的功能(button_frame已在上方创建)
def save_settings():
"""保存配置"""
# try:
# # 计算网格价格序列
# grid_prices = calculate_grid_prices()
# if not grid_prices:
# messagebox.showerror("错误", "网格价格参数有误,请检查输入!")
# return
# grid_price_str = ",".join([str(p) for p in grid_prices])
# # 更新配置对象
# config.set('config', 'miniQMTPath', entries['miniQMTPath'].get())
# config.set('config', 'grid_price', grid_price_str)
# config.set('config', 'grid_volume', entries['grid_volume'].get())
# config.set('config', 'account_no', entries['account_no'].get())
# # 写入配置文件
# config_path = config.get_config_path()
# with open(config_path, 'w', encoding='utf-8') as configfile:
# config.write(configfile)
# # 重新加载配置到内存中
# config.initConfig()
# messagebox.showinfo("成功", f"配置已保存!\n网格价格序列: {grid_price_str}\n部分配置可能需要重启程序后生效。")
# self.add_log(LogLevel.INFO, f"系统配置已更新 - 网格数量: {len(grid_prices)}")
# settings_window.destroy()
# except Exception as e:
# messagebox.showerror("错误", f"保存配置失败:{str(e)}")
# self.add_log(LogLevel.ERROR, f"保存配置失败: {str(e)}")
pass
def cancel_settings():
"""取消设置"""
# settings_window.destroy()
pass
# 在button_frame中添加按钮
ttk.Button(button_frame, text="💾 保存配置", command=save_settings, width=15).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="❌ 取消", command=cancel_settings, width=15).pack(side=tk.LEFT, padx=5)
def grid_correction(self):
"""网格修正功能"""
target = self.get_selected_target()
if not target:
return
# 创建网格修正窗口
self.create_grid_correction_window(target)
def create_grid_correction_window(self, target: SFGridTradeTarget):
"""创建网格修正窗口"""
# 获取顶层窗口
root = self.winfo_toplevel()
# 创建顶层窗口
correction_window = tk.Toplevel(root)
correction_window.title(f"网格修正 - {target.stock_code} ({target.stock_name})")
correction_window.geometry("500x400")
correction_window.resizable(False, False)
# 设置窗口模态
correction_window.transient(root)
correction_window.grab_set()
# 居中显示
root.update_idletasks()
x = root.winfo_x() + (root.winfo_width() // 2) - 250
y = root.winfo_y() + (root.winfo_height() // 2) - 200
correction_window.geometry(f"500x400+{x}+{y}")
# 创建主框架
main_frame = ttk.Frame(correction_window, padding=20)
main_frame.pack(fill=tk.BOTH, expand=True)
# 显示股票信息
info_frame = ttk.LabelFrame(main_frame, text="标的详情", padding=10)
info_frame.pack(fill=tk.X, pady=(0, 10))
ttk.Label(info_frame, text=f"股票代码: {target.stock_code}").grid(row=0, column=0, sticky=tk.W, pady=2)
ttk.Label(info_frame, text=f"股票名称: {target.stock_name}").grid(row=0, column=1, sticky=tk.W, padx=(20, 0), pady=2)
# 创建修正选项框架
options_frame = ttk.LabelFrame(main_frame, text="修正选项", padding=10)
options_frame.pack(fill=tk.X, pady=(0, 10))
# 网格序号
grid_index_frame = ttk.Frame(options_frame)
grid_index_frame.pack(fill=tk.X, pady=5)
ttk.Label(grid_index_frame, text="网格序号:", width=12).pack(side=tk.LEFT)
# 网格序号调整控件
grid_index_var = tk.IntVar(value=getattr(target, 'grid_index'))
# 当前持仓量
position_frame = ttk.Frame(options_frame)
position_frame.pack(fill=tk.X, pady=5)
current_position_value = getattr(target, 'current_position')
ttk.Label(position_frame, text="当前持仓量:", width=12).pack(side=tk.LEFT)
ttk.Label(position_frame, text=str(current_position_value), width=10, anchor=tk.CENTER).pack(side=tk.LEFT, padx=5)
# 需求持仓量
required_position_frame = ttk.Frame(options_frame)
required_position_frame.pack(fill=tk.X, pady=5)
grid_index_value = getattr(target, 'grid_index')
required_position = grid_index_value * config.grid_volume
ttk.Label(required_position_frame, text="需求持仓量:", width=12).pack(side=tk.LEFT)
required_position_label = ttk.Label(required_position_frame, text=str(required_position), width=10, anchor=tk.CENTER)
required_position_label.pack(side=tk.LEFT, padx=5)
# 持仓量状态提示
position_status_frame = ttk.Frame(options_frame)
position_status_frame.pack(fill=tk.X, pady=5)
position_status_label = ttk.Label(position_status_frame, text="", foreground="red")
position_status_label.pack(side=tk.LEFT, padx=(12, 0))
# 初始化持仓量状态
self.update_position_status(current_position_value, required_position, position_status_label)
# 网格序号显示和按钮
grid_index_label = ttk.Label(grid_index_frame, textvariable=grid_index_var, width=10, anchor=tk.CENTER)
grid_index_label.pack(side=tk.LEFT, padx=5)
# 减少按钮
ttk.Button(grid_index_frame, text="-", width=3,
command=lambda: self.decrease_grid_index(grid_index_var, target, required_position_label, position_status_label)).pack(side=tk.LEFT, padx=(5, 5))
# 增加按钮
ttk.Button(grid_index_frame, text="+", width=3,
command=lambda: self.increase_grid_index(grid_index_var, len(config.grid_price)-1, target, required_position_label, position_status_label)).pack(side=tk.LEFT, padx=(5, 0))
# 当前价格(实时更新)
price_frame = ttk.Frame(options_frame)
price_frame.pack(fill=tk.X, pady=5)
ttk.Label(price_frame, text="当前价格:", width=12).pack(side=tk.LEFT)
current_price_var = tk.StringVar(value="-")
current_price_label = ttk.Label(price_frame, textvariable=current_price_var, width=10, anchor=tk.CENTER)
current_price_label.pack(side=tk.LEFT, padx=5)
# 保存按钮框架
button_frame = ttk.Frame(main_frame)
button_frame.pack(fill=tk.X, pady=(10, 0))
# 保存按钮
ttk.Button(button_frame, text="确认修正",
command=lambda: self.save_grid_correction(correction_window, target, grid_index_var.get())).pack(side=tk.RIGHT, padx=5)
# 取消按钮
ttk.Button(button_frame, text="取消",
command=correction_window.destroy).pack(side=tk.RIGHT, padx=5)
# 监听市场数据更新
def on_market_data_update(updated_target: SFGridTradeTarget):
if updated_target.get_id() == target.get_id():
current_price_var.set(f"{updated_target.market_price:.3f}" if updated_target.market_price else "-")
# 订阅市场数据更新事件
eBus.event_bus.subscribe(eBus.MarketDataUpdate, on_market_data_update)
# 窗口关闭时取消订阅
def on_window_close():
if eBus.MarketDataUpdate in eBus.event_bus.listeners and on_market_data_update in eBus.event_bus.listeners[eBus.MarketDataUpdate]:
eBus.event_bus.listeners[eBus.MarketDataUpdate].remove(on_market_data_update)
correction_window.destroy()
correction_window.protocol("WM_DELETE_WINDOW", on_window_close)
# 初始化当前价格
if target.market_price is not None:
current_price_var.set(f"{target.market_price:.3f}")
def update_position_status(self, current_position: int, required_position: int, status_label: ttk.Label):
"""更新持仓量状态提示"""
if current_position >= required_position:
status_label.config(text="持仓量充足", foreground="green")
else:
shortage = required_position - current_position
status_label.config(text=f"还需补充 {shortage} 手仓位", foreground="red")
def save_grid_correction(self, window, target: SFGridTradeTarget, new_grid_index: int):
"""保存网格修正"""
# 更新网格序号
setattr(target, 'grid_index', new_grid_index)
# 重新计算需求持仓量
required_position = new_grid_index * config.grid_volume
# 检查持仓量是否满足要求
current_position = getattr(target, 'current_position')
if current_position < required_position:
shortage = required_position - current_position
result = messagebox.askyesno(
"持仓量不足",
f"当前持仓量({current_position}手)小于需求持仓量({required_position}手)\n"
f"还需补充 {shortage} 手仓位,是否继续保存?"
)
if not result:
return
# 发布网格修正事件,传递GridFixData对象
grid_fix_data = GridFixData(new_grid_index, target)
eBus.event_bus.publish(ActionEventGridFix, grid_fix_data)
# 关闭窗口
window.destroy()
# 添加日志
PrintLog(LogLevel.INFO, f"网格修正已保存: {target.stock_code} - {target.stock_name}, 网格序号: {new_grid_index}")
def decrease_grid_index(self, grid_index_var: tk.IntVar, target: SFGridTradeTarget, required_position_label: ttk.Label, position_status_label: ttk.Label):
"""减少网格序号"""
current_value = grid_index_var.get()
if current_value > 0:
grid_index_var.set(current_value - 1)
# 同步更新需求持仓量和持仓状态
self.update_required_position_and_status(grid_index_var.get(), target, required_position_label, position_status_label)
def increase_grid_index(self, grid_index_var: tk.IntVar, max_index: int, target: SFGridTradeTarget, required_position_label: ttk.Label, position_status_label: ttk.Label):
"""增加网格序号"""
current_value = grid_index_var.get()
if current_value < max_index:
grid_index_var.set(current_value + 1)
# 同步更新需求持仓量和持仓状态
self.update_required_position_and_status(grid_index_var.get(), target, required_position_label, position_status_label)
def update_required_position_and_status(self, grid_index: int, target: SFGridTradeTarget, required_position_label: ttk.Label, position_status_label: ttk.Label):
"""更新需求持仓量和持仓状态"""
# 计算需求持仓量
required_position = grid_index * config.grid_volume
required_position_label.config(text=str(required_position))
# 更新持仓量状态
current_position = getattr(target, 'current_position')
self.update_position_status(current_position, required_position, position_status_label)