diff --git a/core/eventbus.py b/core/eventbus.py index eeba1a0..e74c868 100644 --- a/core/eventbus.py +++ b/core/eventbus.py @@ -1,15 +1,17 @@ # 定义事件处理函数 - -MarketDataUpdate = "market_data_update" ActionEventEnableTrade = "enable_trade" ResultEventTradeEnabled = "trade_enabled" ActionEventDisableTrade = "disable_trade" ResultEventTradeDisabled = "trade_disabled" # 市场数据监听控制事件 +MarketDataUpdate = "market_data_update" ActionEnableMarketData = "enable_market_data" ActionDisableMarketData = "disable_market_data" MarketDataEnabled = "market_data_enabled" MarketDataDisabled = "market_data_disabled" +# 删除交易标的事件 +ActionEventDeleteTradeTarget = "delete_trade_target" +ResultEventTradeTargetDeleted = "trade_target_deleted" class EventBus: diff --git a/core/main_controller.py b/core/main_controller.py index e58f4f1..9938695 100644 --- a/core/main_controller.py +++ b/core/main_controller.py @@ -1,16 +1,14 @@ # coding:utf-8 from core.strategy_db import TradeTarget -from typing import Any -from core.eventbus import ActionDisableMarketData, ActionEnableMarketData, ActionEventDisableTrade, ActionEventEnableTrade, MarketDataUpdate, MarketDataEnabled, MarketDataDisabled, ResultEventTradeDisabled, ResultEventTradeEnabled, event_bus +from core.eventbus import ActionDisableMarketData, ActionEnableMarketData, ActionEventDeleteTradeTarget, ActionEventDisableTrade, ActionEventEnableTrade, MarketDataUpdate, MarketDataEnabled, MarketDataDisabled, ResultEventTradeDisabled, ResultEventTradeEnabled, ResultEventTradeTargetDeleted, event_bus from xtquant.xttrader import XtQuantTrader import time from peewee import ModelSelect -import xtquant.xtconstant as xtconstant import core.strategy_db as strategy_db import sfgrid_constants from core.sfgrid_strategy import SFGridStrategy -from core.util import getInstrumentName, getStockPosition +from core.util import getInstrumentName, getStockPosition, queryPendingOrder from xtquant.xttrader import XtQuantTrader from xtquant.xttype import StockAccount, XtAsset, XtOrder, XtOrderResponse, XtPosition, XtTrade from xtquant import xtdata @@ -63,7 +61,14 @@ class SFGridController(XtQuantTraderCallback): event_bus.subscribe(ActionEnableMarketData, self.onMarketDataEnabled) event_bus.subscribe(ActionDisableMarketData, self.onMarketDataDisabled) event_bus.subscribe("add_trade_target", self.onAddTradeTarget) + event_bus.subscribe(ActionEventDeleteTradeTarget, self.onDeleteTradeTarget) + def onDeleteTradeTarget(self, id: int): + """处理删除交易标的事件""" + self.del_trade_target(id) + # 发布删除完成事件 + event_bus.publish(ResultEventTradeTargetDeleted, id) + def onAddTradeTarget(self, stock_code: str): """处理添加交易标的事件""" self.add_trade_target(stock_code) @@ -118,13 +123,15 @@ class SFGridController(XtQuantTraderCallback): new_target = strategy_db.TradeTarget.create( stock_name=stock_name, stock_code=stock_code, + market_price=0.0, current_position=0, grid_index=0, last_trade_price=0.0, - current_buy_price=0.0, - current_buy_order_no='', - current_sell_price=0.0, - current_sell_order_no='' + plan_buy_price=0.0, + plan_sell_price=0.0, + current_order_price=0.0, + current_order_no='', + current_order_type='' ) new_target.save() print(f'新增交易标的 {stock_code} {stock_name}, {new_target.id}') @@ -134,7 +141,7 @@ class SFGridController(XtQuantTraderCallback): # 更新标的池 self.refresh_targets() # 添加交易控制器 - stockTradeController = SFGridStrategy(new_target, self.xt_trader, self.account, new_target.enabled) # type: ignore + stockTradeController = SFGridStrategy(new_target, self.xt_trader, self.account) # type: ignore self.stock_trade_ctrl[stock_code] = stockTradeController except Exception as e: @@ -142,12 +149,31 @@ class SFGridController(XtQuantTraderCallback): def del_trade_target(self, id:int): - target: strategy_db.TradeTarget = self.instrument_pool[id] - # self.stock_trade_ctrl. - del self.stock_trade_ctrl[target.stock_code] - target.delete_instance() - self.refresh_targets() - + try: + # 检查标的是否存在 + if id not in self.instrument_pool: + print(f"交易标的 ID {id} 不存在") + return + + target: strategy_db.TradeTarget = self.instrument_pool[id] + + # 如果存在交易控制器,先停止交易 + if target.stock_code in self.stock_trade_ctrl: + # 停止交易控制器 + del self.stock_trade_ctrl[target.stock_code] + + # 从数据库中删除 + target.delete_instance() + + # 从内存中删除 + del self.instrument_pool[id] + + # 刷新标的池 + self.refresh_targets() + + print(f"已删除交易标的: {target.stock_code} - {target.stock_name}") + except Exception as e: + print(f"删除交易标的失败 ID {id}: {str(e)}") def init_instrument_pool(self, xtTrader:XtQuantTrader, account:StockAccount): self.refresh_targets() @@ -242,6 +268,10 @@ class SFGridController(XtQuantTraderCallback): if localTarget.stock_code in self.stock_trade_ctrl: tradeController: SFGridStrategy = self.stock_trade_ctrl[localTarget.stock_code] tradeTarget = tradeController.enabledTrading(False) + orders = queryPendingOrder(localTarget.stock_code, tradeController.getName(), self.xt_trader, self.account) # type: ignore + for order in orders: + self.xt_trader.cancel_order_stock_async(self.account, order.order_id) + print(f'取消未成交订单 {len(orders)}') self.instrument_pool[id] = tradeTarget event_bus.publish(ResultEventTradeDisabled, tradeTarget) else: @@ -288,29 +318,16 @@ class SFGridController(XtQuantTraderCallback): :param order: XtOrder对象 :return: """ + print(f'orderd {order.strategy_name}-{order.stock_code} {order.order_id} {order.order_volume}-{order.order_status}') stockCode = order.stock_code ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode] # 如果存在对应的StockTradeController,则调用其onDataUpdate方法 if ctrl is not None and order.strategy_name == ctrl.getName(): - ctrl.onOrderTrade(trade=order) # type: ignore + print(f'controller info {ctrl.getName()}') + ctrl.onAsyncOrderResponse(order) # type: ignore else: print(f"委托下单回调 投资备注 orderId: {order.order_sysid} [{order.stock_code}-{order.instrument_name}] volume: {order.order_volume} 订单策略: '{order.strategy_name}'<-->'{ctrl.getName()}'") - - def test_sim_trade(self, id: int, orderType: int): - tradeTarget:strategy_db.TradeTarget = self.instrument_pool[id] - ctrl:SFGridStrategy = self.stock_trade_ctrl[tradeTarget.stock_code] - trade: XtTrade = None # type: ignore - if orderType == xtconstant.STOCK_BUY: - trade = XtTrade( - sfgrid_constants.account_no, - '300083.SZ', - xtconstant.STOCK_BUY, - 1, 1, tradeTarget.plan_buy_price, sfgrid_constants.grid_volume, 1000, - tradeTarget.current_buy_order_no, - None, ctrl.getName(), None, None, None, None, None, tradeTarget.stock_name) - else: - trade = XtTrade(sfgrid_constants.account_no, '300083.SZ', xtconstant.STOCK_SELL, 1, 1, price, sfgrid_constants.grid_volume, 1000, tradeTarget.current_sell_order_no, None, ctrl.getName(), None, None, None, None, None, tradeTarget.stock_name) # type: ignore - self.on_stock_trade(trade) + def on_stock_trade(self, trade:XtTrade): """ @@ -326,14 +343,14 @@ class SFGridController(XtQuantTraderCallback): else: print(f"委托回调 投资备注 {trade.strategy_name} 不匹配 {ctrl.getName()}") - def on_order_stock_async_response(self, response:XtOrderResponse): - stockCode = response.order_remark - ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode] - # 如果存在对应的StockTradeController,则调用其onDataUpdate方法 - if ctrl is not None and response.strategy_name == ctrl.getName(): - ctrl.onAsyncOrderResponse(response) - else: - print(f"委托回调 投资备注 {response.strategy_name} 不匹配 {ctrl.getName()}") + # def on_order_stock_async_response(self, response:XtOrderResponse): + # stockCode = response.order_remark + # ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode] + # # 如果存在对应的StockTradeController,则调用其onDataUpdate方法 + # if ctrl is not None and response.strategy_name == ctrl.getName(): + # ctrl.onAsyncOrderResponse(response) + # else: + # print(f"委托回调 投资备注 {response.strategy_name} 不匹配 {ctrl.getName()}") def on_order_error(self, order_error): """ @@ -351,4 +368,4 @@ class SFGridController(XtQuantTraderCallback): :param response: XtAccountStatus 对象 :return: """ - print(datetime.datetime.now(), status) \ No newline at end of file + print(datetime.datetime.now(), status) diff --git a/core/sfgrid_strategy.py b/core/sfgrid_strategy.py index 9536c8e..adbc786 100644 --- a/core/sfgrid_strategy.py +++ b/core/sfgrid_strategy.py @@ -1,9 +1,10 @@ +from core import strategy_db from core.eventbus import MarketDataUpdate, event_bus from core.strategy_db import TradeTarget from core.util import queryPendingOrder from xtquant import xttrader, xtconstant -from xtquant.xttype import StockAccount, XtOrderResponse, XtTrade +from xtquant.xttype import StockAccount, XtOrder, XtOrderResponse, XtTrade import sfgrid_constants import threading @@ -17,14 +18,6 @@ class SFGridStrategy: self.enabledTrading(bool(tradeTarget.enabled)) # 修复类型兼容性问题 self.dataUpdateLock = threading.Lock() print(f'标的{self.tradeTarget.targetName()}交易启动状态 {self.tradeTarget.enabled}') - - def getName(self): - return "SFGRID" - - def saveProxy(self): - rc = self.tradeTarget.save() - event_bus.publish(MarketDataUpdate, self.tradeTarget) - return rc def enabledTrading(self, enabled: bool) -> TradeTarget: @@ -32,28 +25,27 @@ class SFGridStrategy: self.saveProxy() if enabled: - print(f" |- 标的{self.tradeTarget.targetName()}交易启动, position {self.tradeTarget.current_position}") - # 建仓状态检查 - if int(self.tradeTarget.current_position) == 0 and int(self.tradeTarget.status) == 0: # type: ignore - self.tradeTarget.grid_index = 1 # type: ignore - self.saveProxy() - + print(f" |- 标的{self.tradeTarget.targetName()}交易启动, 持仓量:{self.tradeTarget.current_position}") + if self.tradeTarget.status == 0: # 未建仓 + print(f" |- 标的{self.tradeTarget.targetName()}初始状态,检查订单") orders = queryPendingOrder(str(self.tradeTarget.stock_code),self.getName(), self.xt_trader,self.account) if len([order for order in orders if order.order_type == xtconstant.STOCK_BUY and order.price == sfgrid_constants.grid_price[1]]) > 0: # 已存在未交易的多单 order = [order for order in orders if order.order_type == xtconstant.STOCK_BUY and order.price == sfgrid_constants.grid_price[1]][0] print(f' |- 已存在未交易的建仓单,不重复下单:{order.order_id}') else: - self.initBuyOrderId = self.xt_trader.order_stock_async( + # 建仓 + self.tradeTarget.grid_index = 1 # pyright: ignore[reportAttributeAccessIssue] + self.tradeTarget.current_order_no = self.xt_trader.order_stock_async( self.account, str(self.tradeTarget.stock_code), xtconstant.STOCK_BUY, sfgrid_constants.grid_volume, xtconstant.FIX_PRICE, sfgrid_constants.grid_price[int(self.tradeTarget.grid_index)], # type: ignore - self.getName(), f'{self.tradeTarget.stock_code}_init_buy') - print(f"|- 标的{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} 建初始仓 买单已发出 InitBuyOrderSeq: {self.initBuyOrderId} Price: {sfgrid_constants.grid_price[int(self.tradeTarget.grid_index)]} Volume: {sfgrid_constants.grid_volume}\n") # type: ignore - else: + self.getName(), strategy_db.OrderTypeInit) + print(f"|- 标的{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} 建初始仓 买单已发出 InitBuyOrderSeq: {self.tradeTarget.current_order_no} Price: {sfgrid_constants.grid_price[int(self.tradeTarget.grid_index)]} Volume: {sfgrid_constants.grid_volume}\n") # type: ignore + else: # 已建仓 # 交易阶段,检查仓位,检查现有订单 print(f" |- 标的{self.tradeTarget.targetName()}已有仓位或非初始状态 无需建初始仓 当前仓位: {self.tradeTarget.current_position} 状态: {self.tradeTarget.status}") minRequirePosition:int = sfgrid_constants.grid_volume * int(self.tradeTarget.grid_index) # type: ignore @@ -71,121 +63,119 @@ class SFGridStrategy: def onDataUpdate(self, data): print(f'|- 市价更新[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}] - START') - - if self.tradeTarget.enabled and self.tradeTarget.status == 1: # 交易中 - self.dataUpdateLock.acquire() - print(f'|- 市价更新[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}] - LOCKED') - try: - index = self.tradeTarget.grid_index - price = -1 if index>=len(sfgrid_constants.grid_price) else sfgrid_constants.grid_price[int(index)] # pyright: ignore[reportArgumentType] - lowPrice = -1 if index+1>=len(sfgrid_constants.grid_price) else sfgrid_constants.grid_price[int(index) + 1] # pyright: ignore[reportArgumentType] - highPrice = sfgrid_constants.grid_price[int(index) - 1] # pyright: ignore[reportArgumentType] + + lastPrice = float("{:.3f}".format(data[self.tradeTarget.stock_code]['lastPrice'])) + self.tradeTarget.market_price = lastPrice # type: ignore + self.saveProxy() - lastPrice = float("{:.3f}".format(data[self.tradeTarget.stock_code]['lastPrice'])) - self.tradeTarget.market_price = lastPrice # type: ignore - print(f"|- 市价更新[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}] - 价格: {lastPrice}, 网格序号: {index}, 网格价格: {price}, 计划多单价: {lowPrice}, 计划空单价: {highPrice}") - - if lastPrice <= lowPrice: # 下下方多单 - orders = queryPendingOrder(str(self.tradeTarget.stock_code), self.getName(), self.xt_trader, self.account) - if len([order for order in orders if order.order_type == xtconstant.STOCK_BUY and order.price == lowPrice]) > 0: - # 已存在未交易的多单 - print(f' |- 已存在未交易的多单,不重复下单') - else: - print(f' |- 下网格多单') - self.tradeTarget.current_buy_order_no = self.xt_trader.order_stock_async( + if not(self.tradeTarget.enabled and self.tradeTarget.status == 1): # 已建仓,常规自动交易中 + print(f"|- 市价更新[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}] - 未建仓或交易监控暂停,不进行自动交易") + return + + self.dataUpdateLock.acquire() + print(f'|- 市价更新[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}] - LOCKED') + try: + index = self.tradeTarget.grid_index + price = -1 if index>=len(sfgrid_constants.grid_price) else sfgrid_constants.grid_price[int(index)] # pyright: ignore[reportArgumentType] + lowPrice = -1 if index+1>=len(sfgrid_constants.grid_price) else sfgrid_constants.grid_price[int(index) + 1] # pyright: ignore[reportArgumentType] + highPrice = sfgrid_constants.grid_price[int(index) - 1] # pyright: ignore[reportArgumentType] + + if lastPrice <= lowPrice: # 下下方多单 + orders = queryPendingOrder(str(self.tradeTarget.stock_code), self.getName(), self.xt_trader, self.account) + if len([order for order in orders if order.order_type == xtconstant.STOCK_BUY and order.price == lowPrice]) > 0: + # 已存在未交易的多单 + print(f' |- 已存在未交易的多单,不重复下单') + else: + print(f' |- 下网格多单') + self.tradeTarget.current_order_no = self.xt_trader.order_stock_async( + self.account, + str(self.tradeTarget.stock_code), + xtconstant.STOCK_BUY, + sfgrid_constants.grid_volume, + xtconstant.FIX_PRICE, + lowPrice, + self.getName(), # strategy_name + strategy_db.OrderTypeBuy # remark # type: ignore + ) + self.tradeTarget.plan_buy_price = float(lowPrice) # type: ignore + print(f' |- 下网格多单号 {self.tradeTarget.current_order_no}, 网格基准价 {price}, 下单价 {lowPrice}, 下单量 {sfgrid_constants.grid_volume}') + elif lastPrice == highPrice: # 下上方空单 + orders = queryPendingOrder(str(self.tradeTarget.stock_code), self.getName(), self.xt_trader, self.account) + if len([order for order in orders if order.order_type == xtconstant.STOCK_SELL and order.price == highPrice]) > 0: + # 已存在未交易的空单 + print(f' |- 已存在未交易的空单,不重复下单') + else: + print(f' |- 下网格空单') + self.tradeTarget.current_order_no = self.xt_trader.order_stock_async( self.account, str(self.tradeTarget.stock_code), - xtconstant.STOCK_BUY, + xtconstant.STOCK_SELL, sfgrid_constants.grid_volume, xtconstant.FIX_PRICE, - lowPrice, - self.getName(), # strategy_name - self.tradeTarget.stock_code # remark # type: ignore - ) - self.tradeTarget.plan_buy_price = float(lowPrice) # type: ignore - print(f' |- 下网格多单号 {self.tradeTarget.current_buy_order_no}, 网格基准价 {price}, 下单价 {lowPrice}, 下单量 {sfgrid_constants.grid_volume}') - elif lastPrice == highPrice: # 下上方空单 - orders = queryPendingOrder(str(self.tradeTarget.stock_code), self.getName(), self.xt_trader, self.account) - if len([order for order in orders if order.order_type == xtconstant.STOCK_SELL and order.price == highPrice]) > 0: - # 已存在未交易的空单 - print(f' |- 已存在未交易的空单,不重复下单') - else: - print(f' |- 下网格空单') - self.tradeTarget.current_sell_order_no = self.xt_trader.order_stock_async( - self.account, - str(self.tradeTarget.stock_code), - xtconstant.STOCK_SELL, - sfgrid_constants.grid_volume, - xtconstant.FIX_PRICE, - highPrice, - self.getName(), - self.tradeTarget.stock_code) # type: ignore - self.tradeTarget.plan_sell_price = float(highPrice) # type: ignore - print(f' |- 下网格空单号 {self.tradeTarget.current_sell_order_no}, 网格基准价 {price}, 下单价 {highPrice}, 下单量 {sfgrid_constants.grid_volume}') - finally: - print(f'|- 市价更新[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}] - release lock') - event_bus.publish('market_data', self.tradeTarget) - self.saveProxy() - self.dataUpdateLock.release() - print(f'|- 市价更新[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}] - END') - elif self.tradeTarget.enabled and self.tradeTarget.status == 0: # 交易中 - print(f"|- 标的{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} 交易中但未建初始仓,仅更新市场价") - self.tradeTarget.market_price = float("{:.3f}".format(data[self.tradeTarget.stock_code]['lastPrice'])) # type: ignore + highPrice, + self.getName(), + strategy_db.OrderTypeBuy) # type: ignore + self.tradeTarget.plan_sell_price = float(highPrice) # type: ignore + print(f' |- 下网格空单号 {self.tradeTarget.current_order_no}, 网格基准价 {price}, 下单价 {highPrice}, 下单量 {sfgrid_constants.grid_volume}') + finally: + print(f'|- 市价更新[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}] - release lock') self.saveProxy() + self.dataUpdateLock.release() + print(f'|- 市价更新[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}] - END') - def onAsyncOrderResponse(self, order:XtOrderResponse): + def onAsyncOrderResponse(self, order:XtOrder): print(f' |- 委托回调[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{order.order_id}]:START') self.dataUpdateLock.acquire() print(f' |- 委托回调[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{order.order_id}]:LOCKED') try: - stockCode = order.order_remark - orderSeq = order.seq - if (self.tradeTarget.status == 1): # 正常交易阶段订单下单成功 - if self.tradeTarget.current_buy_order_no == order.seq: - self.tradeTarget.current_buy_order_no = order.order_id - elif self.tradeTarget.current_sell_order_no == order.seq: - self.tradeTarget.current_sell_order_no = order.order_id - else: - print(f' |- 委托回调[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{order.order_id}]: 不在策略监控范围内') - rc = self.saveProxy() + if order.strategy_name == self.getName(): + self.tradeTarget.current_order_no = order.order_id + self.tradeTarget.current_order_type = order.order_remark + self.saveProxy() + else: + print(f' |- 委托回调[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{order.order_id}]: 不在策略监控范围内{order.strategy_name}') finally: print(f' |- 委托回调[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{order.order_id}]:release lock') self.dataUpdateLock.release() print(f' |- 委托回调[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{order.order_id}]:END') def onOrderTrade(self, trade:XtTrade): - print(f' |- 委托成交通知[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{trade.order_id}]:START') + print(f' |- 委托成交通知[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{trade.order_id}]:START, {trade.order_id}') + self.dataUpdateLock.acquire() print(f' |- 委托成交通知[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{trade.order_id}]:LOCKED') try: - if int(self.tradeTarget.status) == 0 and trade.order_id == self.initBuyOrderId : # type: ignore + if not trade.strategy_name == self.getName(): + print(f' |- 委托成交通知[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{trade.order_id}]: 不在策略监控范围内{trade.strategy_name}') + return + if self.tradeTarget.status == 0 and trade.order_id == self.tradeTarget.current_order_no and trade.order_remark == strategy_db.OrderTypeInit: # 此时为建仓成交 self.tradeTarget.current_position = int(self.tradeTarget.current_position) + trade.traded_volume # 当前持仓数,账户原有持仓不在策略范围内 # type: ignore self.tradeTarget.last_trade_price = float(trade.traded_price) # type: ignore self.tradeTarget.grid_index = 1 # type: ignore self.tradeTarget.status = 1 # type: ignore self.saveProxy() - print(f"|- 标的{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} 建初始仓订单ID: {self.initBuyOrderId}已成交 ") + print(f"|- 标的{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} 建初始仓订单ID: {self.tradeTarget.current_order_no}已成交 ") print(f' 成交价: {trade.traded_price} 成交量: {trade.traded_volume}') print(f' 当前持仓: {self.tradeTarget.current_position}') print(f' 网格坐标: {self.tradeTarget.grid_index}') - elif trade.order_id == self.tradeTarget.current_sell_order_no and int(self.tradeTarget.status) == 1: # type: ignore + elif trade.order_id == self.tradeTarget.current_order_no and self.tradeTarget.status == 1 and trade.order_type == xtconstant.STOCK_SELL: # type: ignore # 上涨一格:此时空单成交 self.tradeTarget.current_position = int(self.tradeTarget.current_position) - trade.traded_volume # type: ignore self.tradeTarget.last_trade_price = float(trade.traded_price) # type: ignore self.tradeTarget.grid_index = int(self.tradeTarget.grid_index) - 1 # type: ignore self.saveProxy() - print(f"|- 标的{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} 上涨 卖单已成交 订单ID: {self.tradeTarget.current_sell_order_no} Price: {sfgrid_constants.grid_price[int(self.tradeTarget.grid_index)]} Volume: {sfgrid_constants.grid_volume} 手续费: {trade.commission}\n") # type: ignore + print(f"|- 标的{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} 上涨 卖单已成交 订单ID: {self.tradeTarget.current_order_no} Price: {sfgrid_constants.grid_price[int(self.tradeTarget.grid_index)]} Volume: {sfgrid_constants.grid_volume} 手续费: {trade.commission}\n") # type: ignore print(f' 成交价: {trade.traded_price} 成交量: {trade.traded_volume}') print(f' 当前持仓: {self.tradeTarget.current_position}') print(f' 网格坐标: {self.tradeTarget.grid_index}') - elif trade.order_id == self.tradeTarget.current_buy_order_no and int(self.tradeTarget.status) == 1: # type: ignore + elif trade.order_id == self.tradeTarget.current_order_no and self.tradeTarget.status == 1 and trade.order_type == xtconstant.STOCK_BUY: # type: ignore # 下跌一格:此时多单成交 self.tradeTarget.current_position = int(self.tradeTarget.current_position) + trade.traded_volume # type: ignore self.tradeTarget.last_trade_price = float(trade.traded_price) # type: ignore self.tradeTarget.grid_index = int(self.tradeTarget.grid_index) + 1 # type: ignore self.saveProxy() - print(f"|- 标的{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} 下跌 买单已成交 订单ID: {self.tradeTarget.current_buy_order_no} Price: {trade.traded_price} Volume: {sfgrid_constants.grid_volume} 手续费: {trade.commission}") + print(f"|- 标的{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} 下跌 买单已成交 订单ID: {self.tradeTarget.current_order_no} Price: {trade.traded_price} Volume: {sfgrid_constants.grid_volume} 手续费: {trade.commission}") print(f' 成交价: {trade.traded_price} 成交量: {trade.traded_volume}') print(f' 当前持仓: {self.tradeTarget.current_position}') print(f' 网格坐标: {self.tradeTarget.grid_index}') @@ -196,3 +186,12 @@ class SFGridStrategy: print(f' |- 委托成交通知[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{trade.order_id}]:release lock') self.dataUpdateLock.release() print(f' |- 委托成交通知[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{trade.order_id}]:END') + + + def getName(self): + return "SFGRID" + + def saveProxy(self): + rc = self.tradeTarget.save() + event_bus.publish(MarketDataUpdate, self.tradeTarget) + return rc \ No newline at end of file diff --git a/core/strategy_db.py b/core/strategy_db.py index d176740..c413949 100644 --- a/core/strategy_db.py +++ b/core/strategy_db.py @@ -1,5 +1,7 @@ from peewee import SqliteDatabase, Model, CharField, IntegerField, FloatField, BooleanField +from xtquant import xtconn, xtconstant + # 连接到SQLite数据库 db = SqliteDatabase('example.db') @@ -8,18 +10,24 @@ class BaseModel(Model): class Meta: database = db +OrderTypeBuy = f'{xtconstant.STOCK_BUY}' # 买 +OrderTypeSell = f'{xtconstant.STOCK_SELL}' # 卖 +OrderTypeInit = "0" # 建仓 +OrderTypeNone = "None" # 建仓 + # 定义Target类,对应targets表 class TradeTarget(BaseModel): stock_code = CharField(unique=True) stock_name = CharField() + market_price = FloatField() current_position = IntegerField() grid_index = IntegerField() - market_price = FloatField() last_trade_price = FloatField() plan_buy_price = FloatField() - current_buy_order_no = CharField(default='') plan_sell_price = FloatField() - current_sell_order_no = CharField(default='') + current_order_price = FloatField() + current_order_no = CharField(default='') + current_order_type = CharField(default='') status = IntegerField(default=0) # 0表示新标的,1表示已建初始仓,正常交易中 enabled = BooleanField(default=False) # 是否启动交易线程 diff --git a/core/ui.py b/core/ui.py index 8862622..f95ce91 100644 --- a/core/ui.py +++ b/core/ui.py @@ -1,7 +1,9 @@ import tkinter as tk from tkinter import ttk, messagebox, filedialog from datetime import datetime -from core.eventbus import ActionDisableMarketData, ActionEnableMarketData, ActionEventDisableTrade, ActionEventEnableTrade, MarketDataUpdate, MarketDataEnabled, MarketDataDisabled, ResultEventTradeDisabled, ResultEventTradeEnabled, event_bus +import threading +import time +from core.eventbus import ActionDisableMarketData, ActionEnableMarketData, ActionEventDeleteTradeTarget, ActionEventDisableTrade, ActionEventEnableTrade, MarketDataUpdate, MarketDataEnabled, MarketDataDisabled, ResultEventTradeDisabled, ResultEventTradeEnabled, ResultEventTradeTargetDeleted, event_bus from core.strategy_db import TradeTarget import configparser import sfgrid_constants @@ -10,14 +12,20 @@ import sfgrid_constants class TradeTargetUI: def __init__(self): self.data:dict[int, TradeTarget] = {} - self.market_data_enabled = True # 添加市场数据监听状态变量 + self.market_data_enabled = False # 添加市场数据监听状态变量 + self.ui_refresh_enabled = False # 添加UI刷新线程状态变量 self.registerEventHandler() + # 创建刷新线程标志 + self.refresh_thread_running = False # 默认不启动刷新线程 + self.root = tk.Tk() self.root.title("三疯交易系统") self.root.geometry("1200x700") # 创建界面 self.create_ui() + + # 不再自动启动刷新线程,由市场数据开关控制 def registerEventHandler(self): event_bus.subscribe(MarketDataUpdate, self.onTradeTargetUpdated) @@ -25,10 +33,42 @@ class TradeTargetUI: event_bus.subscribe(ResultEventTradeDisabled, self.onTradeDisabled) event_bus.subscribe(MarketDataEnabled, self.onMarketDataToggled) event_bus.subscribe(MarketDataDisabled, self.onMarketDataToggled) + event_bus.subscribe(ResultEventTradeTargetDeleted, self.onTradeTargetDeleted) + + def start_refresh_thread(self): + """启动刷新线程""" + if not hasattr(self, 'refresh_thread') or not self.refresh_thread.is_alive(): + self.refresh_thread = threading.Thread(target=self.refresh_loop, daemon=True) + self.refresh_thread.start() + + def refresh_loop(self): + """刷新循环""" + while self.refresh_thread_running: + # 在主线程中更新UI + if hasattr(self, 'root') and self.root: + self.root.after(0, self.refresh_table) + time.sleep(0.5) # 每0.5秒刷新一次 + + def stop_refresh_thread(self): + """停止刷新线程""" + self.refresh_thread_running = False + + def onTradeTargetDeleted(self, id: int): + """处理交易标的删除完成事件""" + # 从本地数据中删除 + if id in self.data: + del self.data[id] + # 添加日志 + self.add_log("INFO", f"交易标的已删除,ID: {id}") def onMarketDataToggled(self, data:bool): self.market_data_enabled = self.market_data_switch_var.get() self.add_log("INFO", "市场数据监听已" + ("启用" if data else "禁用")) + # 同步UI刷新线程状态 + if data: + self.start_ui_refresh() + else: + self.stop_ui_refresh() def onTradeEnabled(self, target:TradeTarget): self.add_log("INFO", f"交易启用: {target.stock_code} - {target.stock_name}") @@ -37,11 +77,10 @@ class TradeTargetUI: self.add_log("INFO", f"交易禁用: {target.stock_code} - {target.stock_name}") - def onTradeTargetUpdated(self, target:TradeTarget): + def onTradeTargetUpdated(self, target: TradeTarget): # 更新或添加数据到本地缓存 self.data[target.get_id()] = target - # 刷新表格显示 - self.refresh_table() + # 不再直接刷新表格,由刷新线程统一处理 def create_ui(self): """创建UI界面""" @@ -71,7 +110,7 @@ class TradeTargetUI: ttk.Separator(toolbar_frame, orient='vertical').pack(side=tk.LEFT, fill=tk.Y, padx=10) # 市场数据监听开关 - self.market_data_switch_var = tk.BooleanVar(value=True) + self.market_data_switch_var = tk.BooleanVar(value=False) self.market_data_switch = ttk.Checkbutton( toolbar_frame, text="📊 市场数据", @@ -99,8 +138,26 @@ class TradeTargetUI: self.market_data_enabled = self.market_data_switch_var.get() if self.market_data_enabled: event_bus.publish(ActionEnableMarketData, True) + # 同步开启UI刷新线程 + self.start_ui_refresh() else: event_bus.publish(ActionDisableMarketData, True) + # 同步关闭UI刷新线程 + self.stop_ui_refresh() + + def start_ui_refresh(self): + """启动UI刷新线程""" + if not self.refresh_thread_running: + self.refresh_thread_running = True + self.start_refresh_thread() + self.add_log("INFO", "UI刷新线程已启动") + + def stop_ui_refresh(self): + """停止UI刷新线程""" + if self.refresh_thread_running: + self.stop_refresh_thread() + self.refresh_thread_running = False + self.add_log("INFO", "UI刷新线程已停止") def create_menu_bar(self): """创建菜单栏""" @@ -112,7 +169,14 @@ class TradeTargetUI: menubar.add_cascade(label="系统", menu=system_menu) system_menu.add_command(label="系统设置", command=self.system_settings) system_menu.add_separator() - system_menu.add_command(label="退出", command=self.root.destroy) + system_menu.add_command(label="退出", command=self.on_exit) + + def on_exit(self): + """退出程序""" + # 停止刷新线程 + self.stop_refresh_thread() + # 关闭窗口 + self.root.destroy() def create_tables_area(self, parent): """创建表格区域""" @@ -137,7 +201,7 @@ class TradeTargetUI: columns = ("ID", "股票代码", "股票名称", "市场价", "持仓数量", "网格索引", - "最新成交价", "计划买入价", "买入订单号", "计划卖出价", "卖出订单号", + "最新成交价", "计划买入价", "计划卖出价", "当前订单价", "当前订单号", "当前订单类型", "启用状态", "交易状态" ) @@ -147,17 +211,18 @@ class TradeTargetUI: column_configs = { "ID": (50, tk.CENTER), "股票代码": (90, tk.CENTER), - "股票名称": (100, tk.CENTER), - "市场价": (60, tk.CENTER), - "持仓数量": (90, tk.CENTER), - "网格索引": (50, tk.CENTER), - "最新成交价": (60, tk.CENTER), - "计划买入价": (60, tk.CENTER), - "买入订单号": (100, tk.CENTER), - "计划卖出价": (60, tk.CENTER), - "卖出订单号": (100, tk.CENTER), - "启用状态": (100, tk.CENTER), - "交易状态": (100, tk.CENTER) + "股票名称": (80, tk.CENTER), + "市场价": (70, tk.CENTER), + "持仓数量": (80, tk.CENTER), + "网格索引": (80, tk.CENTER), + "最新成交价": (90, tk.CENTER), + "计划买入价": (90, tk.CENTER), + "计划卖出价": (90, tk.CENTER), + "当前订单价": (90, tk.CENTER), + "当前订单号": (90, tk.CENTER), + "当前订单类型": (90, tk.CENTER), + "启用状态": (80, tk.CENTER), + "交易状态": (80, tk.CENTER) } for col in columns: @@ -201,17 +266,18 @@ class TradeTargetUI: for temp in self.data: target: TradeTarget = self.data[temp] values = [ - target.id, # type: ignore + target.id, # type: ignore target.stock_code, target.stock_name, - f"{target.market_price:.3f}", + "-" if target.market_price is None else f"{target.market_price:.3f}", target.current_position, target.grid_index, - f"{target.last_trade_price:.2f}", - f"{target.plan_buy_price:.2f}", - target.current_buy_order_no, - f"{target.plan_sell_price:.2f}", - target.current_sell_order_no, + f"{target.last_trade_price:.3f}", + f"{target.plan_buy_price:.3f}", + f"{target.plan_sell_price:.3f}", + f"{target.current_order_price:.3f}", + target.current_order_no, + target.current_order_type, self.get_status_indicator(target), self.get_trade_enabled_indicator(target.enabled) # type: ignore ] @@ -310,7 +376,6 @@ class TradeTargetUI: target.enabled = True # type: ignore event_bus.publish(ActionEventEnableTrade, target.get_id()) # self.add_log("INFO", f"已启动交易: {target.stock_code} - {target.stock_name}") - # self.refresh_table() # messagebox.showinfo("启动成功", f"已启动 {target.stock_code} ({target.stock_name}) 的交易") def on_trade_enabled(self, target: TradeTarget): @@ -337,7 +402,6 @@ class TradeTargetUI: target.enabled = False # type: ignore event_bus.publish(ActionEventDisableTrade, target.get_id()) # self.add_log("INFO", f"已暂停交易: {target.stock_code} - {target.stock_name}") - # self.refresh_table() # messagebox.showinfo("暂停成功", f"已暂停 {target.stock_code} ({target.stock_name}) 的交易") def delete_selected_trade(self): @@ -356,14 +420,9 @@ class TradeTargetUI: ) if result: - try: - del self.data[target.get_id()] - self.add_log("WARNING", f"已删除交易标的: {target.stock_code} - {target.stock_name}") - self.refresh_table() - messagebox.showinfo("删除成功", f"已删除 {target.stock_code} ({target.stock_name})") - except Exception as e: - self.add_log("ERROR", f"删除失败: {str(e)}") - messagebox.showerror("删除失败", f"删除交易标的时出错:{str(e)}") + # 通过事件总线发出删除动作 + event_bus.publish(ActionEventDeleteTradeTarget, target.get_id()) + self.add_log("INFO", f"已发送删除请求: {target.stock_code} - {target.stock_name}") def add_trade_target(self): """添加新的交易标的""" @@ -434,12 +493,27 @@ class TradeTargetUI: def refresh_table(self): """刷新表格数据""" + # 保存当前选中的项 + selected_items = self.trade_table.selection() + selected_values = [] + for item in selected_items: + values = self.trade_table.item(item)['values'] + if values: + selected_values.append(values[0]) # 保存ID + # 清空表格 for item in self.trade_table.get_children(): self.trade_table.delete(item) # 重新填充 self.populate_trade_table() + + # 恢复之前选中的项 + if selected_values: + for item in self.trade_table.get_children(): + values = self.trade_table.item(item)['values'] + if values and values[0] in selected_values: + self.trade_table.selection_add(item) def add_log(self, level, message): """添加日志记录"""