Files
sfgrid/ui.py
T
2025-11-05 00:28:01 +08:00

668 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import random
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
from typing import List, Optional
from datetime import datetime
from core.strategy_db import TradeTarget
import configparser
import sfgrid_constants
class TradeTargetUI:
def __init__(self, trade_targets: Optional[List[TradeTarget]] = None):
if trade_targets is not None:
self.trade_targets = trade_targets
else:
self.trade_targets = []
self.root = tk.Tk()
self.root.title("三疯交易系统")
self.root.geometry("1200x700")
# 创建界面
self.create_ui()
def create_ui(self):
"""创建UI界面"""
# 创建菜单栏
self.create_menu_bar()
# 主框架
main_frame = ttk.Frame(self.root)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# 表格区域
self.create_tables_area(main_frame)
def create_menu_bar(self):
"""创建菜单栏"""
menubar = tk.Menu(self.root)
self.root.config(menu=menubar)
# 系统菜单
system_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="系统", menu=system_menu)
system_menu.add_command(label="系统设置", command=self.system_settings)
system_menu.add_separator()
system_menu.add_command(label="退出", command=self.root.quit)
def create_tables_area(self, parent):
"""创建表格区域"""
# 上方交易标的区域
trade_frame = ttk.LabelFrame(parent, text="交易标的详情", padding=10)
trade_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 5))
# 创建交易标的表格
self.create_trade_target_table(trade_frame)
# 下方操作日志区域(默认隐藏)
self.log_frame = ttk.LabelFrame(parent, text="操作日志", padding=10)
# 默认不显示,通过工具栏按钮控制
# self.log_frame.pack(fill=tk.X, pady=(5, 0))
self.log_visible = False # 日志区域可见性标志
# 创建操作日志表格
self.create_log_table(self.log_frame)
def create_trade_target_table(self, parent):
"""创建交易标的表格"""
# 创建工具栏
toolbar_frame = ttk.Frame(parent)
toolbar_frame.pack(fill=tk.X, pady=(0, 10))
# 工具栏按钮
ttk.Button(toolbar_frame, text="▶️ 启动交易",
command=self.start_selected_trade, width=12).pack(side=tk.LEFT, padx=2)
ttk.Button(toolbar_frame, text="⏸ 暂停交易",
command=self.pause_selected_trade, width=12).pack(side=tk.LEFT, padx=2)
ttk.Button(toolbar_frame, text=" 添加标的",
command=self.add_trade_target, width=12).pack(side=tk.LEFT, padx=2)
ttk.Button(toolbar_frame, text="🗑 删除标的",
command=self.delete_selected_trade, width=12).pack(side=tk.LEFT, padx=2)
# 添加分隔符
ttk.Separator(toolbar_frame, orient='vertical').pack(side=tk.LEFT, fill=tk.Y, padx=10)
# 日志显示/隐藏按钮
self.log_toggle_btn = ttk.Button(toolbar_frame, text="📋 显示日志",
command=self.toggle_log_panel, width=12)
self.log_toggle_btn.pack(side=tk.LEFT, padx=2)
# 添加分隔线
ttk.Separator(parent, orient='horizontal').pack(fill=tk.X, pady=5)
columns = ("ID",
"股票代码", "股票名称", "持仓数量", "网格索引",
"最新成交价", "计划买入价", "买入订单号", "计划卖出价", "卖出订单号",
"启用状态", "交易状态"
)
self.trade_table = ttk.Treeview(parent, columns=columns, show='headings', height=15)
# 专业化的列配置
column_configs = {
"ID": (50, tk.CENTER),
"股票代码": (90, tk.CENTER),
"股票名称": (100, tk.CENTER),
"持仓数量": (90, tk.CENTER),
"网格索引": (80, tk.CENTER),
"最新成交价": (100, tk.CENTER),
"计划买入价": (100, tk.CENTER),
"买入订单号": (100, tk.CENTER),
"计划卖出价": (100, tk.CENTER),
"卖出订单号": (100, tk.CENTER),
"启用状态": (80, tk.CENTER),
"交易状态": (80, tk.CENTER)
}
for col in columns:
width, anchor = column_configs[col]
self.trade_table.heading(col, text=col)
self.trade_table.column(col, width=width, anchor=anchor) # type: ignore
# 填充数据
self.populate_trade_table()
# 滚动条
scrollbar = ttk.Scrollbar(parent, orient=tk.VERTICAL, command=self.trade_table.yview)
self.trade_table.configure(yscrollcommand=scrollbar.set)
self.trade_table.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 绑定双击事件
self.trade_table.bind("<Double-1>", self.on_table_double_click)
def get_status_indicator(self, target: TradeTarget) -> str:
"""获取状态指示器(带颜色色块的文本)"""
if target.status == 1:
# 绿色圆点表示交易中
return "🟢 已建仓"
elif target.status == 0:
# 黄色圆点表示暂停
return "🟡 未建仓"
else:
return "🔴 错误状态"
def get_trade_status_indicator(self, status: int) -> str:
"""获取交易状态指示器"""
if status == 1:
return "🟢 策略运行"
else:
return "🟡 策略暂停"
def populate_trade_table(self):
"""填充交易标的表格数据"""
for temp in self.trade_targets:
target: TradeTarget = temp
values = [
target.id, # type: ignore
target.stock_code,
target.stock_name,
target.current_position,
target.grid_index,
f"{target.last_trade_price:.2f}",
f"{target.current_buy_price:.2f}",
target.current_buy_order_no,
f"{target.current_sell_price:.2f}",
target.current_sell_order_no,
self.get_status_indicator(target),
self.get_trade_status_indicator(target.status) # type: ignore
]
self.trade_table.insert('', tk.END, values=values)
def create_log_table(self, parent):
"""创建操作日志表格"""
columns = ("timestamp", "level", "message")
self.log_table = ttk.Treeview(parent, columns=columns, show='headings', height=8)
log_column_configs = {
"timestamp": ("时间", 120),
"level": ("级别", 60),
"message": ("消息", 200)
}
for col in columns:
title, width = log_column_configs[col]
self.log_table.heading(col, text=title)
self.log_table.column(col, width=width, anchor=tk.W)
# 填充示例日志
sample_logs = [
("2024-01-15 10:30:15", "INFO", "系统启动成功"),
("2024-01-15 10:31:22", "DEBUG", "加载交易标的: 5个"),
("2024-01-15 10:32:45", "INFO", "000001 - 网格交易线程启动"),
("2024-01-15 10:33:10", "WARNING", "601318 - 未启用交易"),
("2024-01-15 10:34:30", "ERROR", "300750 - 订单提交失败"),
("2024-01-15 10:35:18", "INFO", "600036 - 买入订单创建成功"),
("2024-01-15 10:36:05", "INFO", "数据刷新完成")
]
for log in sample_logs:
self.log_table.insert('', tk.END, values=log)
# 滚动条
scrollbar = ttk.Scrollbar(parent, orient=tk.VERTICAL, command=self.log_table.yview)
self.log_table.configure(yscrollcommand=scrollbar.set)
self.log_table.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
def get_status_text(self, status):
"""获取状态文本"""
status_map = {
0: "新标的",
1: "交易中"
}
return status_map.get(status, "未知")
def on_table_double_click(self, event):
"""表格双击事件"""
selected = self.trade_table.selection()
if selected:
item = selected[0]
values = self.trade_table.item(item)['values']
self.add_log("DEBUG", f"双击查看详情: {values[0]} - {values[1]}")
def get_selected_target(self):
"""获取选中的交易标的"""
selected = self.trade_table.selection()
if not selected:
messagebox.showwarning("未选中", "请先选择一个交易标的")
return None
# 获取选中行的ID
item = selected[0]
values = self.trade_table.item(item)['values']
target_id = values[0]
# 从列表中找到对应的target对象
for target in self.trade_targets:
if target.id == target_id: # type: ignore
return target
return None
def start_selected_trade(self):
"""启动选中的交易"""
target = self.get_selected_target()
if not target:
return
if target.enabled: # type: ignore
messagebox.showinfo("提示", f"{target.stock_code} ({target.stock_name}) 已经在运行中")
return
result = messagebox.askyesno(
"确认启动",
f"确定要启动以下交易标的吗?\n\n"
f"股票代码: {target.stock_code}\n"
f"股票名称: {target.stock_name}"
)
if result:
target.enabled = True # type: ignore
self.add_log("INFO", f"已启动交易: {target.stock_code} - {target.stock_name}")
self.refresh_table()
messagebox.showinfo("启动成功", f"已启动 {target.stock_code} ({target.stock_name}) 的交易")
def pause_selected_trade(self):
"""暂停选中的交易"""
target = self.get_selected_target()
if not target:
return
if not target.enabled: # type: ignore
messagebox.showinfo("提示", f"{target.stock_code} ({target.stock_name}) 已经是暂停状态")
return
result = messagebox.askyesno(
"确认暂停",
f"确定要暂停以下交易标的吗?\n\n"
f"股票代码: {target.stock_code}\n"
f"股票名称: {target.stock_name}"
)
if result:
target.enabled = False # type: ignore
self.add_log("INFO", f"已暂停交易: {target.stock_code} - {target.stock_name}")
self.refresh_table()
messagebox.showinfo("暂停成功", f"已暂停 {target.stock_code} ({target.stock_name}) 的交易")
def delete_selected_trade(self):
"""删除选中的交易标的"""
target = self.get_selected_target()
if not target:
return
result = messagebox.askyesno(
"确认删除",
f"确定要删除以下交易标的吗?\n\n"
f"股票代码: {target.stock_code}\n"
f"股票名称: {target.stock_name}\n\n"
f"⚠️ 此操作不可恢复!",
icon='warning'
)
if result:
try:
self.trade_targets.remove(target)
self.add_log("WARNING", f"已删除交易标的: {target.stock_code} - {target.stock_name}")
self.refresh_table()
messagebox.showinfo("删除成功", f"已删除 {target.stock_code} ({target.stock_name})")
except Exception as e:
self.add_log("ERROR", f"删除失败: {str(e)}")
messagebox.showerror("删除失败", f"删除交易标的时出错:{str(e)}")
def add_trade_target(self):
"""添加新的交易标的"""
# TODO: 实现添加交易标的的对话框
messagebox.showinfo("提示", "添加交易标的功能待实现")
self.add_log("INFO", "点击添加交易标的按钮")
def toggle_log_panel(self):
"""切换日志面板的显示/隐藏"""
if self.log_visible:
# 隐藏日志面板
self.log_frame.pack_forget()
self.log_visible = False
self.log_toggle_btn.config(text="📋 显示日志")
else:
# 显示日志面板
self.log_frame.pack(fill=tk.X, pady=(5, 0))
self.log_visible = True
self.log_toggle_btn.config(text="📋 隐藏日志")
def refresh_table(self):
"""刷新表格数据"""
# 清空表格
for item in self.trade_table.get_children():
self.trade_table.delete(item)
# 重新填充
self.populate_trade_table()
def add_log(self, level, message):
"""添加日志记录"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.log_table.insert('', 0, values=(timestamp, level, message))
def system_settings(self):
"""系统设置"""
settings_window = tk.Toplevel(self.root)
settings_window.title("网格交易系统配置")
# 设置窗口大小
window_width = 700
window_height = 600
# 先设置为模态窗口
settings_window.transient(self.root)
# 确保主窗口完全初始化
self.root.update_idletasks()
# 获取主窗口的实际大小(包括边框)
# 使用winfo_rootx/rooty获取窗口在屏幕上的绝对位置
main_x = self.root.winfo_rootx()
main_y = self.root.winfo_rooty()
main_width = self.root.winfo_width()
main_height = self.root.winfo_height()
# 计算设置窗口相对于主窗口的居中位置
x = main_x + (main_width - window_width) // 2
y = main_y + (main_height - window_height) // 2
# 设置窗口大小和位置
settings_window.geometry(f"{window_width}x{window_height}+{x}+{y}")
settings_window.resizable(False, False)
# 设置为模态窗口
settings_window.grab_set()
# 添加底部按钮区域(先创建,确保固定在底部)
button_frame = ttk.Frame(settings_window)
button_frame.pack(side=tk.BOTTOM, fill=tk.X, padx=20, pady=10)
# 创建选项卡(在按钮之后创建,填充剩余空间)
notebook = ttk.Notebook(settings_window)
notebook.pack(fill=tk.BOTH, expand=True, padx=10, pady=(10, 0))
# 基础设置
basic_frame = ttk.Frame(notebook)
notebook.add(basic_frame, text="基础设置")
# 高级设置
advanced_frame = ttk.Frame(notebook)
notebook.add(advanced_frame, text="高级设置")
# 读取当前配置
config = configparser.ConfigParser()
config.read('config.ini')
# 创建输入框字典用于保存引用
entries = {}
# 网格价格计算参数
grid_params = {}
# 添加网格价格设置(特殊处理)
grid_price_frame = ttk.LabelFrame(basic_frame, text="网格价格设置", padding=15)
grid_price_frame.pack(fill=tk.X, padx=20, pady=10)
# 基准价格
base_price_row = ttk.Frame(grid_price_frame)
base_price_row.pack(fill=tk.X, pady=5)
ttk.Label(base_price_row, text="基准价格:", width=15, font=('Arial', 10)).pack(side=tk.LEFT)
base_price_entry = ttk.Entry(base_price_row, width=15, font=('Arial', 10))
base_price_entry.insert(0, "10.0")
base_price_entry.pack(side=tk.LEFT, padx=5)
ttk.Label(base_price_row, text="", foreground='gray', font=('Arial', 9)).pack(side=tk.LEFT)
grid_params['base_price'] = base_price_entry
# 网格类型
grid_type_row = ttk.Frame(grid_price_frame)
grid_type_row.pack(fill=tk.X, pady=5)
ttk.Label(grid_type_row, text="网格类型:", width=15, font=('Arial', 10)).pack(side=tk.LEFT)
grid_type_var = tk.StringVar(value="金额差")
ttk.Radiobutton(grid_type_row, text="百分比", variable=grid_type_var,
value="百分比", command=lambda: on_grid_type_change()).pack(side=tk.LEFT, padx=5)
ttk.Radiobutton(grid_type_row, text="金额差", variable=grid_type_var,
value="金额差", command=lambda: on_grid_type_change()).pack(side=tk.LEFT, padx=5)
grid_params['grid_type'] = grid_type_var
# 网格大小
grid_size_row = ttk.Frame(grid_price_frame)
grid_size_row.pack(fill=tk.X, pady=5)
ttk.Label(grid_size_row, text="网格大小:", width=15, font=('Arial', 10)).pack(side=tk.LEFT)
grid_size_entry = ttk.Entry(grid_size_row, width=15, font=('Arial', 10))
grid_size_entry.insert(0, "1.0")
grid_size_entry.pack(side=tk.LEFT, padx=5)
grid_size_unit_label = ttk.Label(grid_size_row, text="", foreground='gray', font=('Arial', 9))
grid_size_unit_label.pack(side=tk.LEFT)
grid_params['grid_size'] = grid_size_entry
grid_params['grid_size_unit_label'] = grid_size_unit_label
# 网格类型改变时更新单位
def on_grid_type_change():
if grid_type_var.get() == "百分比":
grid_size_unit_label.config(text="%")
grid_size_entry.delete(0, tk.END)
grid_size_entry.insert(0, "1.0")
else:
grid_size_unit_label.config(text="")
grid_size_entry.delete(0, tk.END)
grid_size_entry.insert(0, "1.0")
update_preview()
# 上方网格数量
upper_grid_row = ttk.Frame(grid_price_frame)
upper_grid_row.pack(fill=tk.X, pady=5)
ttk.Label(upper_grid_row, text="上方网格数量:", width=15, font=('Arial', 10)).pack(side=tk.LEFT)
upper_grid_entry = ttk.Entry(upper_grid_row, width=15, font=('Arial', 10))
upper_grid_entry.insert(0, "1")
upper_grid_entry.pack(side=tk.LEFT, padx=5)
ttk.Label(upper_grid_row, text="", foreground='gray', font=('Arial', 9)).pack(side=tk.LEFT)
grid_params['upper_count'] = upper_grid_entry
# 下方网格数量
lower_grid_row = ttk.Frame(grid_price_frame)
lower_grid_row.pack(fill=tk.X, pady=5)
ttk.Label(lower_grid_row, text="下方网格数量:", width=15, font=('Arial', 10)).pack(side=tk.LEFT)
lower_grid_entry = ttk.Entry(lower_grid_row, width=15, font=('Arial', 10))
lower_grid_entry.insert(0, "10")
lower_grid_entry.pack(side=tk.LEFT, padx=5)
ttk.Label(lower_grid_row, text="", foreground='gray', font=('Arial', 9)).pack(side=tk.LEFT)
grid_params['lower_count'] = lower_grid_entry
# 预览按钮和结果显示
preview_row = ttk.Frame(grid_price_frame)
preview_row.pack(fill=tk.X, pady=10)
preview_result = tk.StringVar(value="点击'预览'查看生成的网格价格序列")
def calculate_grid_prices():
"""计算网格价格序列"""
try:
base_price = float(base_price_entry.get())
grid_size = float(grid_size_entry.get())
upper_count = int(upper_grid_entry.get())
lower_count = int(lower_grid_entry.get())
grid_type = grid_type_var.get()
prices = []
# 计算上方网格价格
for i in range(upper_count, 0, -1):
if grid_type == "百分比":
price = base_price * (1 + grid_size / 100 * i)
else: # 金额差
price = base_price + grid_size * i
prices.append(round(price, 3))
# 添加基准价格
prices.append(base_price)
# 计算下方网格价格
for i in range(1, lower_count + 1):
if grid_type == "百分比":
price = base_price * (1 - grid_size / 100 * i)
else: # 金额差
price = base_price - grid_size * i
# 确保价格不为负
if price >= 0:
prices.append(round(price, 3))
else:
break
return prices
except ValueError as e:
return None
def update_preview():
"""自动更新网格价格序列预览"""
prices = calculate_grid_prices()
if prices:
price_str = ",".join([str(p) for p in prices])
preview_result.set(f"网格价格序列: {price_str}")
else:
preview_result.set("参数错误,请检查!")
# 绑定输入变化自动预览
for entry_widget in (base_price_entry, grid_size_entry, upper_grid_entry, lower_grid_entry):
entry_widget.bind("<KeyRelease>", lambda e: update_preview())
entry_widget.bind("<FocusOut>", lambda e: update_preview())
# 初始预览
update_preview()
ttk.Label(preview_row, textvariable=preview_result,
font=('Arial', 10)).pack(side=tk.LEFT, padx=10)
# 添加其他基础配置选项
other_basic_frame = ttk.LabelFrame(basic_frame, text="交易设置", padding=15)
other_basic_frame.pack(fill=tk.X, padx=20, pady=10)
other_basic_settings = [
("网格交易手数", "grid_volume", config.get('config', 'grid_volume'), "每个网格的交易手数"),
("最大启用目标数", "max_enabled_targets", config.get('config', 'max_enabled_targets'), "同时运行的最大标的数量")
]
for i, (label, key, default, tooltip) in enumerate(other_basic_settings):
frame = ttk.Frame(other_basic_frame)
frame.pack(fill=tk.X, pady=5)
label_widget = ttk.Label(frame, text=label + ":", width=15, font=('Arial', 10))
label_widget.pack(side=tk.LEFT)
entry = ttk.Entry(frame, width=15, font=('Arial', 10))
entry.insert(0, default)
entry.pack(side=tk.LEFT, padx=5)
entries[key] = entry
# 添加提示信息
tip_label = ttk.Label(frame, text=tooltip, font=('Arial', 9), foreground='gray')
tip_label.pack(side=tk.LEFT, padx=5)
# 添加高级设置选项
account_frame = ttk.LabelFrame(advanced_frame, text="账号设置", padding=15)
account_frame.pack(fill=tk.X, padx=20, pady=10)
# 交易账号
account_row = ttk.Frame(account_frame)
account_row.pack(fill=tk.X, pady=5)
ttk.Label(account_row, text="交易账号:", width=15, font=('Arial', 10)).pack(side=tk.LEFT)
account_entry = ttk.Entry(account_row, width=15, font=('Arial', 10))
account_entry.insert(0, config.get('config', 'account_no'))
account_entry.pack(side=tk.LEFT, padx=5)
entries['account_no'] = account_entry
ttk.Label(account_row, text="QMT交易账号", font=('Arial', 9), foreground='gray').pack(side=tk.LEFT, padx=5)
# QMT路径特殊处理 - 使用文件浏览器
qmt_path_frame = ttk.LabelFrame(advanced_frame, text="软件路径", padding=15)
qmt_path_frame.pack(fill=tk.X, padx=20, pady=10)
qmt_row = ttk.Frame(qmt_path_frame)
qmt_row.pack(fill=tk.X, pady=5)
ttk.Label(qmt_row, text="QMT路径:", width=15, font=('Arial', 10)).pack(side=tk.LEFT)
qmt_entry = ttk.Entry(qmt_row, width=30, font=('Arial', 10))
qmt_entry.insert(0, config.get('config', 'miniQMTPath'))
qmt_entry.pack(side=tk.LEFT, padx=5)
entries['miniQMTPath'] = qmt_entry
def browse_qmt_path():
"""打开文件夹浏览器选择QMT路径"""
initial_dir = qmt_entry.get() if qmt_entry.get() else "/"
folder_path = filedialog.askdirectory(
title="选择miniQMT安装路径",
initialdir=initial_dir
)
if folder_path:
qmt_entry.delete(0, tk.END)
qmt_entry.insert(0, folder_path)
ttk.Button(qmt_row, text="📁 浏览...", command=browse_qmt_path, width=10).pack(side=tk.LEFT, padx=5)
ttk.Label(qmt_row, text="miniQMT软件安装路径", font=('Arial', 9), foreground='gray').pack(side=tk.LEFT, padx=5)
# 定义保存和取消按钮的功能(button_frame已在上方创建)
def save_settings():
"""保存配置"""
try:
# 计算网格价格序列
grid_prices = calculate_grid_prices()
if not grid_prices:
messagebox.showerror("错误", "网格价格参数有误,请检查输入!")
return
grid_price_str = ",".join([str(p) for p in grid_prices])
# 更新配置对象
config.set('config', 'miniQMTPath', entries['miniQMTPath'].get())
config.set('config', 'grid_price', grid_price_str)
config.set('config', 'grid_volume', entries['grid_volume'].get())
config.set('config', 'account_no', entries['account_no'].get())
config.set('config', 'max_enabled_targets', entries['max_enabled_targets'].get())
# 写入配置文件
with open('config.ini', 'w') as configfile:
config.write(configfile)
# 重新加载配置到内存中
sfgrid_constants.initConfig()
messagebox.showinfo("成功", f"配置已保存!\n网格价格序列: {grid_price_str}\n部分配置可能需要重启程序后生效。")
self.add_log("INFO", f"系统配置已更新 - 网格数量: {len(grid_prices)}")
settings_window.destroy()
except Exception as e:
messagebox.showerror("错误", f"保存配置失败:{str(e)}")
self.add_log("ERROR", f"保存配置失败: {str(e)}")
def cancel_settings():
"""取消设置"""
settings_window.destroy()
# 在button_frame中添加按钮
ttk.Button(button_frame, text="💾 保存配置", command=save_settings, width=15).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="❌ 取消", command=cancel_settings, width=15).pack(side=tk.LEFT, padx=5)
def run(self):
"""运行程序"""
self.root.mainloop()
# 使用示例
if __name__ == "__main__":
print("交易标的监控系统启动...")
print("功能说明:")
print(" - 左侧表格显示所有交易标的详细信息")
print(" - 右侧表格显示操作日志")
print(" - 底部五个功能按钮提供操作")
# 创建并运行界面
app = TradeTargetUI()
app.run()