Files
sfgrid/core/main_controller.py
T
2025-11-04 12:01:18 +08:00

314 lines
14 KiB
Python

# coding:utf-8
from xtquant.xttrader import XtQuantTrader
import time, sys
from peewee import ModelSelect
import xtquant.xtconstant as xtconstant
sys.stdout.reconfigure(encoding='utf-8') # 设置标准输出编码为UTF-8 # type: ignore
import core.strategy_db as strategy_db
import sfgrid_constants
from core.sfgrid_strategy import SFGridStrategy
from core.util import getInstrumentName, getStockPosition
from xtquant.xttrader import XtQuantTrader
from xtquant.xttype import StockAccount, XtAsset, XtOrder, XtOrderResponse, XtPosition, XtTrade
from xtquant import xtdata
from xtquant.xttrader import XtQuantTraderCallback
import datetime
# 量化核心控制对象
class SFGridController(XtQuantTraderCallback):
def __init__(self, account_no: str, miniQmtPath: str):
super().__init__()
xtdata.enable_hello = False
strategy_db.db.connect()
strategy_db.db.create_tables([strategy_db.TradeTarget])
print('- [成功]数据库模块初始化')
session_id = int(time.time())
self.xt_trader: XtQuantTrader = XtQuantTrader(miniQmtPath, session_id)
self.xt_trader.register_callback(self)
self.xt_trader.start()
connect_result = self.xt_trader.connect()
print(f'- [{'成功' if self.xt_trader.connected else '失败'}]市场交易连接{connect_result}--: {miniQmtPath}')
if self.xt_trader.connected == False:
self.inited: bool = False
return
else:
self.inited = True
self.account= StockAccount(account_no, 'STOCK')
print(f'- [成功]交易账号对象初始化完成, 账号: {self.account.account_id}') # pyright: ignore[reportAttributeAccessIssue]
subscribe_result = self.xt_trader.subscribe(self.account)
print(f'- [{'成功' if subscribe_result == 0 else '失败'}:{subscribe_result}]交易状态订阅')
if subscribe_result == 0:
self.inited = True
else:
self.inited = False
return
self.stock_trade_ctrl = {}
self.init_instrument_pool(self.xt_trader, self.account) # type: ignore
self.seq = None
print('- [成功]三疯交易系统初始化完成')
self.startMarketData()
def startMarketData(self):
print('- 启动市场数据订阅')
self.seq = xtdata.subscribe_whole_quote(['SH', 'SZ'], callback=self.onDataUpdate)
if self.seq == -1:
print('- 市场数据订阅失败')
else:
print(f'- 市场数据订阅成功, 订阅号={self.seq}')
def stopMarketData(self):
print('- 停止市场数据订阅')
if self.seq is not None and self.seq > 0:
xtdata.unsubscribe_quote(self.seq)
def add_trade_target(self, stock_code: str):
try:
stock_name = getInstrumentName(stock_code)
new_target = strategy_db.TradeTarget.create(
stock_name=stock_name,
stock_code=stock_code,
current_position=0,
grid_index=0,
last_trade_price=0.0,
current_buy_price=0.0,
current_buy_order_no='',
current_sell_price=0.0,
current_sell_order_no=''
)
new_target.save()
print(f'新增交易标的 {stock_code} {stock_name}, {new_target.id}')
# 刷新标的持仓
pos = getStockPosition(stock_code, self.xt_trader, self.account) # type: ignore
strategy_db.TradeTarget.update(current_position=pos).where(strategy_db.TradeTarget.stock_code == stock_code).execute()
# 更新标的池
self.refresh_targets()
# 添加交易控制器
stockTradeController = SFGridStrategy(new_target, self.xt_trader, self.account, new_target.enabled) # type: ignore
self.stock_trade_ctrl[stock_code] = stockTradeController
except Exception as e:
print(f'新增交易标的失败 {stock_code} {e}')
def del_trade_target(self, index:int):
target: strategy_db.TradeTarget = self.instrument_pool[index]
# self.stock_trade_ctrl.
del self.stock_trade_ctrl[target.stock_code]
target.delete_instance()
self.refresh_targets()
def init_instrument_pool(self, xtTrader:XtQuantTrader, account:StockAccount):
self.refresh_targets()
for temp in self.instrument_pool:
tradeTarget:strategy_db.TradeTarget = temp
tradeTarget.current_position = getStockPosition(tradeTarget.stock_code, xtTrader, account) # type: ignore
result = tradeTarget.save()
print(f' |- 同步当前持仓信息 {tradeTarget.stock_code}, {tradeTarget.current_position}, result = {result}')
stockTradeController = SFGridStrategy(tradeTarget, self.xt_trader, self.account, tradeTarget.enabled) # type: ignore
self.stock_trade_ctrl[tradeTarget.stock_code] = stockTradeController
print(f'- [成功]交易标的信息初始化, 共 {len(self.instrument_pool)} 个标的')
def refresh_targets(self):
# 更新标的池
self.instrument_pool:ModelSelect = strategy_db.TradeTarget.select()
self.print_pool()
def print_pool(self):
print("- [信息]标的池信息")
for i in range(len(self.instrument_pool)):
target: strategy_db.TradeTarget = self.instrument_pool[i]
status = "新建" if target.status == 0 else "已建初始仓"
print(f' [序号-{i}] 股票代码: {target.stock_code}-{target.stock_name} 当前持仓: {getStockPosition(target.stock_code, self.xt_trader, self.account)} 网格索引: {target.grid_index} 基准价格 {sfgrid_constants.grid_price[target.grid_index]} 状态: {status} 启用交易线程: {'自动交易中' if target.enabled else '交易已停止'}') # type: ignore
def print_position_info(self):
positions:list[XtPosition] = self.xt_trader.query_stock_positions(self.account)
if positions:
print("\n- 持仓信息")
for temp in positions:
pos : XtPosition = temp
if pos.volume <=0:
continue
print(f"股票代码: {pos.stock_code}-{getInstrumentName(pos.stock_code)}")
print(f"总持仓: {pos.volume}")
print(f"可用持仓: {pos.can_use_volume}")
print(f"持仓成本: {pos.avg_price}")
print("---")
else:
print("\n当前无持仓")
def print_account_info(self):
temp = self.xt_trader.query_stock_asset(self.account)
asset: XtAsset = temp # type: ignore
print(f"=== 账户信息 {self.account.account_id} ===") # type: ignore
print(f"可用资金: {asset.cash}")
print(f"总资产: {asset.total_asset}")
print(f"证券市值: {asset.market_value}")
def print_stock_orders(self):
orders = self.xt_trader.query_stock_orders(self.account, cancelable_only=True)
if orders:
print("\n=== 委托信息 ===")
for order in orders:
print(f"委托编号: {order.order_id}")
print(f"股票代码: {order.stock_code} {getInstrumentName(order.stock_code)}")
print(f"委托方向: {order.offset_flag} ")
print(f"委托价格: {order.price}")
print(f"委托数量: {order.order_volume}")
print(f"已成交数量: {order.traded_volume}")
print(f"委托状态: {order.order_status} ")
print("---")
else:
print("\n当前无委托记录")
# 初始化指定标的交易控制器
def start_stock_trade(self, index: int):
tradeTarget = self.instrument_pool[index]
# check existing thread
if tradeTarget.stock_code in self.stock_trade_ctrl:
tradeController: SFGridStrategy = self.stock_trade_ctrl[tradeTarget.stock_code]
if tradeController.isEnabled():
print(f"标的交易控制器已存在且正在运行 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n")
else:
print(f"标的交易控制器已存在但未运行,重新启动 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n")
tradeController.enabledTrading(True)
else:
stockTradeController = SFGridStrategy(tradeTarget, self.xt_trader, self.account, tradeTarget.enabled) # type: ignore
self.stock_trade_ctrl[tradeTarget.stock_code] = stockTradeController
print(f"\t创建标的交易控制器 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}")
def pause_stock_trade(self, index: int):
tradeTarget = self.instrument_pool[index]
if tradeTarget.stock_code in self.stock_trade_ctrl:
tradeController: SFGridStrategy = self.stock_trade_ctrl[tradeTarget.stock_code]
if tradeController.isEnabled():
print(f"暂停标的交易 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n")
tradeController.enabledTrading(False)
else:
print(f"标的交易已暂停 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n")
else:
print(f"标的交易控制器不存在 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n")
# ====== 市场回调方法 -- 以下方法由XtQuantData调用 ======
def onDataUpdate(self, data):
if sfgrid_constants.max_enabled_targets <= 0: # 全推
for stock_code, tickData in data.items():
lastPrice = tickData['lastPrice']
if lastPrice == 10.0 and stock_code not in self.stock_trade_ctrl:
print(f'New trade target = {stock_code} - {getInstrumentName(stock_code)} {tickData['lastPrice']}')
self.add_trade_target(stock_code)
self.stock_trade_ctrl[stock_code].enabledTrading(True)
else: # 指定目标 当前主要使用这种模式
for target in self.instrument_pool:
stock_code = target.stock_code
# 如果存在对应的StockTradeController,则调用其onDataUpdate方法
if stock_code not in self.stock_trade_ctrl or stock_code not in data:
# print(f"股票代码 {stock_code} 未在交易控制器中找到,跳过处理。\n")
continue
stock_controller: SFGridStrategy = self.stock_trade_ctrl[stock_code]
stock_controller.onDataUpdate(data)
# ====== 市场回调方法 -- 以下方法由XtQuantTrader调用 ======
def on_connected(self):
"""
连接成功推送
"""
print(datetime.datetime.now(), '连接成功回调')
def on_disconnected(self):
"""
连接断开
:return:
"""
print(datetime.datetime.now(), '连接断开回调')
def on_stock_order(self, order:XtOrder):
"""
委托回报推送
:param order: XtOrder对象
:return:
"""
stockCode = order.stock_code
ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode]
# 如果存在对应的StockTradeController,则调用其onDataUpdate方法
if ctrl is not None and order.strategy_name == ctrl.getName():
ctrl.onOrderTrade(trade=order) # type: ignore
else:
print(f"委托下单回调 投资备注 orderId: {order.order_sysid} [{order.stock_code}-{order.instrument_name}] volume: {order.order_volume} 订单策略: '{order.strategy_name}'<-->'{ctrl.getName()}'")
def test_sim_trade(self, index: int, orderType: int):
tradeTarget:strategy_db.TradeTarget = self.instrument_pool[index]
ctrl:SFGridStrategy = self.stock_trade_ctrl[tradeTarget.stock_code]
trade: XtTrade = None # type: ignore
if orderType == xtconstant.STOCK_BUY:
trade = XtTrade(
sfgrid_constants.account_no,
'300083.SZ',
xtconstant.STOCK_BUY,
1, 1, tradeTarget.current_buy_price, sfgrid_constants.grid_volume, 1000,
tradeTarget.current_buy_order_no,
None, ctrl.getName(), None, None, None, None, None, tradeTarget.stock_name)
else:
trade = XtTrade(sfgrid_constants.account_no, '300083.SZ', xtconstant.STOCK_SELL, 1, 1, price, sfgrid_constants.grid_volume, 1000, tradeTarget.current_sell_order_no, None, ctrl.getName(), None, None, None, None, None, tradeTarget.stock_name) # type: ignore
self.on_stock_trade(trade)
def on_stock_trade(self, trade:XtTrade):
"""
成交变动推送
:param trade: XtTrade对象
:return:
"""
stockCode = trade.stock_code
ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode]
# 如果存在对应的StockTradeController,则调用其onDataUpdate方法
if ctrl is not None and trade.strategy_name == ctrl.getName():
ctrl.onOrderTrade(trade)
else:
print(f"委托回调 投资备注 {trade.strategy_name} 不匹配 {ctrl.getName()}")
def on_order_stock_async_response(self, response:XtOrderResponse):
stockCode = response.order_remark
ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode]
# 如果存在对应的StockTradeController,则调用其onDataUpdate方法
if ctrl is not None and response.strategy_name == ctrl.getName():
ctrl.onAsyncOrderResponse(response)
else:
print(f"委托回调 投资备注 {response.strategy_name} 不匹配 {ctrl.getName()}")
def on_order_error(self, order_error):
"""
委托失败推送
:param order_error:XtOrderError 对象
:return:
"""
# print("on order_error callback")
# print(order_error.order_id, order_error.error_id, order_error.error_msg)
print(f"\n委托报错回调 {order_error.order_remark} {order_error.error_msg}")
def on_account_status(self, status):
"""
:param response: XtAccountStatus 对象
:return:
"""
print(datetime.datetime.now(), sys._getframe().f_code.co_name)