from peewee import IntegerField from core import strategy_db from core.eventbus import MarketDataUpdate, event_bus from core.strategy_db import OrderTypeBuy, OrderTypeInit, OrderTypeSell, TradeTarget from core.util import queryPendingOrder, is_trading_time from xtquant import xttrader, xtconstant from xtquant.xttype import StockAccount, XtOrder, XtTrade import sfgrid_constants import threading class SFGridStrategy: def __init__(self, tradeTarget: TradeTarget, xt_trader: xttrader.XtQuantTrader, account: StockAccount): self.tradeTarget:TradeTarget = tradeTarget self.xt_trader: xttrader.XtQuantTrader = xt_trader self.account:StockAccount = account self.enabledTrading(bool(tradeTarget.enabled)) # 修复类型兼容性问题 event_bus.publish(MarketDataUpdate, self.tradeTarget) self.dataUpdateLock = threading.Lock() def enabledTrading(self, enabled: bool) -> TradeTarget: self.tradeTarget.enabled = enabled # type: ignore self.saveProxy() if enabled: self.refreshPlanPrice() print(f" |- 标的{self.tradeTarget.targetName()}交易启动, 持仓量:{self.tradeTarget.current_position}") if self.tradeTarget.status == 0: # 未建仓 print(f" |- 标的{self.tradeTarget.targetName()}初始状态, 设置网格序号 1") self.tradeTarget.grid_index = 1 # pyright: ignore[reportAttributeAccessIssue] else: # 已建仓 # 交易阶段,检查仓位,检查现有订单 print(f" |- 标的{self.tradeTarget.targetName()}已有仓位或非初始状态 无需建初始仓 当前仓位: {self.tradeTarget.current_position} 状态: {self.tradeTarget.status}") minRequirePosition:int = sfgrid_constants.grid_volume * int(self.tradeTarget.grid_index) # type: ignore if minRequirePosition <= int(self.tradeTarget.current_position): # type: ignore print(f' |- 仓位检查: 持仓需求充足, (gridVolume*gridIndex)={minRequirePosition}, 当前持仓:{self.tradeTarget.current_position}') else: print(f' |- 仓位检查: 持仓需求不足, (gridVolume*gridIndex)={minRequirePosition}, 当前持仓:{self.tradeTarget.current_position}, 交易启动失败') self.tradeTarget.enabled = False # type: ignore self.saveProxy() else: print(f" |- 标的{self.tradeTarget.targetName()}交易监控暂停") return self.tradeTarget def isEnabled(self) -> bool: print(f'|- 检查交易状态[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}] - {self.tradeTarget.enabled}') return bool(self.tradeTarget.enabled) # 修复返回类型问题 def onDataUpdate(self, data): print(f'|- 市价更新[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}] - START') lastPrice = float("{:.3f}".format(data[self.tradeTarget.stock_code]['lastPrice'])) self.tradeTarget.market_price = lastPrice # type: ignore self.saveProxy() if not is_trading_time(): print(f"|- 市价更新[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}] - 非交易时间,不进行自动交易") return if not self.tradeTarget.enabled: # 未建仓,自动交易暂停 print(f"|- 市价更新[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}] - 未建仓或交易监控暂停,不进行自动交易") return self.dataUpdateLock.acquire() print(f'|- 市价更新[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}] - LOCKED') try: orderPrice:float = -1 orderType = -1 index: int = self.tradeTarget.grid_index # pyright: ignore[reportAssignmentType] orderRemark= "" gridBasePrice = -1 if index>=len(sfgrid_constants.grid_price) or index < 0 else sfgrid_constants.grid_price[int(index)] # pyright: ignore[reportArgumentType] if self.tradeTarget.enabled and self.tradeTarget.status == 0 and lastPrice <= sfgrid_constants.grid_price[1]: # 已启用,未建仓,建仓 orderPrice = sfgrid_constants.grid_price[index] orderType = xtconstant.STOCK_BUY orderRemark = OrderTypeInit if self.tradeTarget.enabled and self.tradeTarget.status == 1: # 已启用,已建仓,网格单 lowPrice = -1 if index+1>=len(sfgrid_constants.grid_price) else sfgrid_constants.grid_price[int(index) + 1] # pyright: ignore[reportArgumentType] highPrice = sfgrid_constants.grid_price[index - 1] if lastPrice <= lowPrice: # 下下方多单 orderPrice = lowPrice orderType = xtconstant.STOCK_BUY orderRemark = OrderTypeBuy elif lastPrice >= highPrice: # 下上方空单 orderPrice = highPrice orderType = xtconstant.STOCK_SELL orderRemark = OrderTypeSell if orderType != -1: orders = queryPendingOrder(str(self.tradeTarget.stock_code), self.getName(), self.xt_trader, self.account) if len([order for order in orders if order.order_type == orderType and order.price == orderPrice]) > 0: # 已存在未交易的多单 print(f' |- [{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}]已存在未交易的{"多单" if orderType == xtconstant.STOCK_BUY else "空单"},不重复下单') else: print(f' |- 下网格{"多单" if orderType == xtconstant.STOCK_BUY else "空单"}') self.tradeTarget.current_order_no = self.xt_trader.order_stock_async( self.account, str(self.tradeTarget.stock_code), orderType, sfgrid_constants.grid_volume, xtconstant.FIX_PRICE, orderPrice, self.getName(), # strategy_name orderRemark # remark # type: ignore ) orderTypeName = "" if orderRemark == OrderTypeBuy: orderTypeName = "多单" elif orderRemark == OrderTypeSell: orderTypeName = "空单" elif orderRemark == OrderTypeInit: orderTypeName = "建仓单" print(f' |- {orderTypeName}委托, 单号 {self.tradeTarget.current_order_no}, 网格基准价 {gridBasePrice}, 下单价 {orderPrice}, 下单量 {sfgrid_constants.grid_volume}') finally: print(f'|- 市价更新[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}] - release lock') self.saveProxy() self.dataUpdateLock.release() print(f'|- 市价更新[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}] - END') def onAsyncOrderResponse(self, order:XtOrder): print(f' |- 委托回调[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{order.order_id}]:START') self.dataUpdateLock.acquire() print(f' |- 委托回调[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{order.order_id}]:LOCKED') try: if order.strategy_name == self.getName(): self.tradeTarget.current_order_no = order.order_id self.tradeTarget.current_order_type = order.order_remark self.saveProxy() else: print(f' |- 委托回调[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{order.order_id}]: 不在策略监控范围内{order.strategy_name}') finally: print(f' |- 委托回调[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{order.order_id}]:release lock') self.dataUpdateLock.release() print(f' |- 委托回调[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{order.order_id}]:END') def onOrderTrade(self, trade:XtTrade): print(f' |- 委托成交通知[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{trade.order_id}]:START, {trade.order_id}') self.dataUpdateLock.acquire() print(f' |- 委托成交通知[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{trade.order_id}]:LOCKED') try: if not trade.strategy_name == self.getName(): print(f' |- 委托成交通知[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{trade.order_id}]: 不在策略监控范围内{trade.strategy_name}') return if self.tradeTarget.status == 0 and trade.order_id == self.tradeTarget.current_order_no and trade.order_remark == strategy_db.OrderTypeInit: # 此时为建仓成交 self.tradeTarget.current_position = int(self.tradeTarget.current_position) + trade.traded_volume # 当前持仓数,账户原有持仓不在策略范围内 # type: ignore self.tradeTarget.last_trade_price = float(trade.traded_price) # type: ignore self.tradeTarget.grid_index = 1 # type: ignore self.tradeTarget.plan_buy_price = float(sfgrid_constants.grid_price[2]) # type: ignore self.tradeTarget.plan_sell_price = float(sfgrid_constants.grid_price[0]) # type: ignore self.tradeTarget.status = 1 # type: ignore self.saveProxy() print(f"|- 标的{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} 建初始仓订单ID: {self.tradeTarget.current_order_no}已成交 ") print(f' 成交价: {trade.traded_price} 成交量: {trade.traded_volume}') print(f' 当前持仓: {self.tradeTarget.current_position}') print(f' 网格坐标: {self.tradeTarget.grid_index}') elif trade.order_id == self.tradeTarget.current_order_no and self.tradeTarget.status == 1 and trade.order_type == xtconstant.STOCK_SELL: # type: ignore # 上涨一格:此时空单成交 self.tradeTarget.current_position = int(self.tradeTarget.current_position) - trade.traded_volume # type: ignore self.tradeTarget.last_trade_price = float(trade.traded_price) # type: ignore self.tradeTarget.grid_index = int(self.tradeTarget.grid_index) - 1 # type: ignore print(f"|- 标的{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} 上涨 卖单已成交 订单ID: {self.tradeTarget.current_order_no} Price: {sfgrid_constants.grid_price[int(self.tradeTarget.grid_index)]} Volume: {sfgrid_constants.grid_volume} 手续费: {trade.commission}\n") # type: ignore print(f' 成交价: {trade.traded_price} 成交量: {trade.traded_volume}') print(f' 当前持仓: {self.tradeTarget.current_position}') print(f' 网格坐标: {self.tradeTarget.grid_index}') elif trade.order_id == self.tradeTarget.current_order_no and self.tradeTarget.status == 1 and trade.order_type == xtconstant.STOCK_BUY: # type: ignore # 下跌一格:此时多单成交 self.tradeTarget.current_position = int(self.tradeTarget.current_position) + trade.traded_volume # type: ignore self.tradeTarget.last_trade_price = float(trade.traded_price) # type: ignore self.tradeTarget.grid_index = int(self.tradeTarget.grid_index) + 1 # type: ignore print(f"|- 标的{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} 下跌 买单已成交 订单ID: {self.tradeTarget.current_order_no} Price: {trade.traded_price} Volume: {sfgrid_constants.grid_volume} 手续费: {trade.commission}") print(f' 成交价: {trade.traded_price} 成交量: {trade.traded_volume}') print(f' 当前持仓: {self.tradeTarget.current_position}') print(f' 网格坐标: {self.tradeTarget.grid_index}') else: # 打印订单信息和订单状态 print(f'|- 非策略内部订单,或订单状态不满足监控条件 {trade.order_id} {trade.stock_code}-{trade.instrument_name} {trade.commission}') finally: self.refreshPlanPrice() print(f' |- 委托成交通知[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{trade.order_id}]:release lock') self.dataUpdateLock.release() print(f' |- 委托成交通知[{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name}-{trade.order_id}]:END') def refreshPlanPrice(self): if self.tradeTarget.status == 1: buyIdx: int = self.tradeTarget.grid_index + 1 # pyright: ignore[reportAssignmentType] sellIdx: int = self.tradeTarget.grid_index - 1 if self.tradeTarget.grid_index > 0: # 可以下空单 self.tradeTarget.plan_sell_price = float(sfgrid_constants.grid_price[sellIdx]) # pyright: ignore[reportAttributeAccessIssue] else: self.tradeTarget.plan_sell_price = -1.0 # type: ignore if self.tradeTarget.grid_index < len(sfgrid_constants.grid_price) - 1: self.tradeTarget.plan_buy_price = float(sfgrid_constants.grid_price[buyIdx]) # pyright: ignore[reportAttributeAccessIssue] else: self.tradeTarget.plan_buy_price = -1.0 # pyright: ignore[reportAttributeAccessIssue] else: self.tradeTarget.plan_buy_price = 10.0 # type: ignore self.tradeTarget.plan_sell_price = -1.0 # type: ignore self.saveProxy() def getName(self): return "SFGRID" def saveProxy(self): rc = self.tradeTarget.save() event_bus.publish(MarketDataUpdate, self.tradeTarget) return rc