import time import tkinter as tk from tkinter import ttk from core.logger import LogLevel, PrintLog from core.sfgrid.ui import TradeTargetUI import sfgrid_config from xtquant import xtdata from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback import datetime from xtquant.xttype import StockAccount, XtAsset, XtOrder, XtOrderResponse, XtPosition, XtTrade class MainWindow(XtQuantTraderCallback): def __init__(self): self.root = tk.Tk() self.root.title("三疯交易系统") self.root.geometry("1400x700") # 当前选中的策略Tab索引 self.current_strategy_index = 0 # 存储各个Frame的引用 self.strategy_frames = {} # 日志面板可见性标志 self.log_visible = False self.initQmt() # 创建界面 self.create_ui() def initQmt(self): xtdata.enable_hello = False session_id = int(time.time()) self.xt_trader: XtQuantTrader = XtQuantTrader(sfgrid_config.miniQMTPath, session_id) self.xt_trader.register_callback(self) self.xt_trader.start() self.xt_trader.connect() PrintLog(LogLevel.INFO, f'- [{'成功' if self.xt_trader.connected else '失败'}]市场交易连接: {sfgrid_config.miniQMTPath}') if self.xt_trader.connected == False: self.inited: bool = False return else: self.inited = True self.account= StockAccount(sfgrid_config.account_no, 'STOCK') PrintLog(LogLevel.INFO, f'- [成功]交易账号对象初始化完成, 账号: {self.account.account_id}') # pyright: ignore[reportAttributeAccessIssue] subscribe_result = self.xt_trader.subscribe(self.account) PrintLog(LogLevel.INFO, f'- [{'成功' if subscribe_result == 0 else '失败'}:{subscribe_result}]交易状态订阅') if subscribe_result == 0: self.inited = True else: self.inited = False return def create_ui(self): """创建UI界面""" # 主容器 main_container = ttk.Frame(self.root) main_container.pack(fill=tk.BOTH, expand=True, padx=10, pady=10) # 中间主体区域(左右布局) content_area = ttk.Frame(main_container) content_area.pack(fill=tk.BOTH, expand=True) # 左侧Tab按钮栏(垂直排列) tab_bar_frame = ttk.Frame(content_area) tab_bar_frame.pack(side=tk.LEFT, fill=tk.Y, padx=(0, 10)) # 创建Tab按钮(垂直排列,文字垂直显示) self.tab_buttons = [] strategy_names = ["三疯\n网格", "通用\n网格", "涨停\n分析"] for idx, name in enumerate(strategy_names): btn = ttk.Button( tab_bar_frame, text=name, command=lambda i=idx: self.switch_strategy_tab(i), width=4 ) btn.pack(side=tk.TOP, pady=2, fill=tk.X) self.tab_buttons.append(btn) # 在Tab按钮下方添加退出按钮和日志按钮(底部对齐) # 使用一个填充Frame将按钮推到底部 spacer = ttk.Frame(tab_bar_frame) spacer.pack(side=tk.TOP, fill=tk.X, ipady=10) # 清空日志按钮(底部第三个) clear_log_btn = ttk.Button( tab_bar_frame, text="🗑", # 垃圾桶图标 command=self.clear_logs, width=3 ) clear_log_btn.pack(side=tk.TOP, pady=2, fill=tk.X) # 日志显示按钮(退出按钮上方) self.log_toggle_btn = ttk.Button( tab_bar_frame, text="📋", # 日志图标 command=self.toggle_log_panel, width=3 ) self.log_toggle_btn.pack(side=tk.TOP, pady=2, fill=tk.X) # 退出按钮(最底部) exit_btn = ttk.Button( tab_bar_frame, text="⏻", # 电源图标 command=self.on_exit, width=3 ) exit_btn.pack(side=tk.TOP, pady=2, fill=tk.X) # 添加垂直分隔线 separator = ttk.Separator(content_area, orient='vertical') separator.pack(side=tk.LEFT, fill=tk.Y, padx=1) # 右侧内容区域容器(用于放置不同策略的Frame) self.content_container = ttk.Frame(content_area) self.content_container.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) # 创建各个策略的Frame self.create_strategy_frames(strategy_names) # 创建全局日志面板(默认隐藏) self.create_global_log_panel(main_container) # 默认显示第一个策略 self.switch_strategy_tab(0) def create_global_log_panel(self, parent): """创建全局日志面板""" # 日志区域(默认隐藏) self.log_frame = ttk.LabelFrame(parent, text="操作日志", padding=10) # 默认不显示,通过工具栏按钮控制 # 创建日志表格 columns = ("timestamp", "level", "message") self.log_table = ttk.Treeview(self.log_frame, columns=columns, show='headings', height=8) log_column_configs = { "timestamp": ("时间", 100), "level": ("级别", 50), "message": ("消息", 1150) # 调整宽度适应全局布局 } for col in columns: title, width = log_column_configs[col] self.log_table.heading(col, text=title) self.log_table.column(col, width=width, anchor=tk.W) # 添加初始日志 from datetime import datetime timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.log_table.insert('', tk.END, values=(timestamp, "INFO", "系统启动成功")) # 滚动条 scrollbar = ttk.Scrollbar(self.log_frame, orient=tk.VERTICAL, command=self.log_table.yview) self.log_table.configure(yscrollcommand=scrollbar.set) self.log_table.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar.pack(side=tk.RIGHT, fill=tk.Y) def add_log(self, level:LogLevel, message): """添加日志记录 - 全局方法""" from datetime import datetime timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.log_table.insert('', 0, values=(timestamp, level.value, message)) def clear_logs(self): """清空日志记录""" # 删除所有日志项 for item in self.log_table.get_children(): self.log_table.delete(item) self.add_log(LogLevel.DEBUG, "日志已清空") def create_strategy_frames(self, strategy_names): """创建各个策略的Frame""" for idx, name in enumerate(strategy_names): if idx == 0: # 第一个Tab使用TradeTargetUI,传入main_window引用 frame = TradeTargetUI(self.content_container, self) self.strategy_frames[idx] = frame else: # 其他策略使用占位Frame frame = ttk.Frame(self.content_container) self.strategy_frames[idx] = frame # 添加占位内容 placeholder = ttk.Label( frame, text=f"{name} - 策略界面将在此实现", font=('Arial', 14), foreground='gray' ) placeholder.pack(expand=True) def switch_strategy_tab(self, index): """切换策略Tab""" # 隐藏当前Frame if self.current_strategy_index in self.strategy_frames: self.strategy_frames[self.current_strategy_index].pack_forget() # 更新当前索引 self.current_strategy_index = index # 显示选中的Frame if index in self.strategy_frames: self.strategy_frames[index].pack(fill=tk.BOTH, expand=True) # 更新Tab按钮样式(可选,用于视觉反馈) self.update_tab_button_styles() def update_tab_button_styles(self): """更新Tab按钮的样式以显示选中状态""" # 注意:ttk.Button的样式需要通过ttk.Style来设置 # 这里简化处理,仅作为接口预留 pass def toggle_log_panel(self): """切换日志面板的显示/隐藏""" if self.log_visible: # 隐藏日志面板 self.log_frame.pack_forget() self.log_visible = False self.log_toggle_btn.config(text="📋") # 日志图标 else: # 显示日志面板 self.log_frame.pack(side=tk.BOTTOM, fill=tk.X, pady=(5, 0)) self.log_visible = True self.log_toggle_btn.config(text="🔽") # 使用不同图标表示隐藏 def on_exit(self): """退出程序""" from tkinter import messagebox result = messagebox.askyesno("确认退出", "确定要退出系统吗?") if result: self.root.destroy() def run(self): """运行程序""" self.root.mainloop() # ====== 市场回调方法 -- 以下方法由XtQuantData调用 ====== def onDataUpdate(self, data): # 收集所有市场数据用于市场监控 print(f'market data update {len(data)}') # ====== 市场回调方法 -- 以下方法由XtQuantTrader调用 ====== def on_connected(self): """ 连接成功推送 """ print(datetime.datetime.now(), '连接成功回调') def on_disconnected(self): """ 连接断开 :return: """ print(datetime.datetime.now(), '连接断开回调') def on_stock_order(self, order:XtOrder): """ 委托回报推送 :param order: XtOrder对象 :return: """ print(f'orderd {order.strategy_name}-{order.stock_code} {order.order_id} {order.order_volume}-{order.order_status}') # stockCode = order.stock_code # ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode] # # 如果存在对应的StockTradeController,则调用其onDataUpdate方法 # if ctrl is not None and order.strategy_name == ctrl.getName(): # print(f'controller info {ctrl.getName()}') # ctrl.onAsyncOrderResponse(order) # type: ignore # else: # print(f"委托下单回调 投资备注 orderId: {order.order_sysid} [{order.stock_code}-{order.instrument_name}] volume: {order.order_volume} 订单策略: '{order.strategy_name}'<-->'{ctrl.getName()}'") def on_stock_trade(self, trade:XtTrade): """ 成交变动推送 :param trade: XtTrade对象 :return: """ print(f"委托回调 投资备注 {trade.stock_code}-{trade.instrument_name} {trade.strategy_name} 不匹配 {trade.order_remark}") # stockCode = trade.stock_code # ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode] # # 如果存在对应的StockTradeController,则调用其onDataUpdate方法 # if ctrl is not None and trade.strategy_name == ctrl.getName(): # ctrl.onOrderTrade(trade) # else: # print(f"委托回调 投资备注 {trade.strategy_name} 不匹配 {ctrl.getName()}") def on_order_stock_async_response(self, response:XtOrderResponse): print(f"委托回调 投资备注 {response.error_msg}{response.strategy_name} {response.order_remark}") # stockCode = response.order_remark # ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode] # # 如果存在对应的StockTradeController,则调用其onDataUpdate方法 # if ctrl is not None and response.strategy_name == ctrl.getName(): # ctrl.onAsyncOrderResponse(response) # else: # print(f"委托回调 投资备注 {response.strategy_name} 不匹配 {ctrl.getName()}") def on_order_error(self, order_error): """ 委托失败推送 :param order_error:XtOrderError 对象 :return: """ print(f"\n委托报错回调 {order_error.order_remark} {order_error.error_msg}") def on_account_status(self, status): """ :param response: XtAccountStatus 对象 :return: """ print(datetime.datetime.now(), status)