Files
sfgrid/main_controller.py
T
2025-10-29 15:26:34 +08:00

286 lines
12 KiB
Python

# coding:utf-8
from os import popen
import time, sys
from peewee import ModelSelect
sys.stdout.reconfigure(encoding='utf-8') # 设置标准输出编码为UTF-8
import strategy_db
import sfgrid_constants
from sfgrid_trade_controller import StockTradeController
from util import getInstrumentName, getStockPosition
from xtquant.xttrader import XtQuantTrader
from xtquant.xttype import StockAccount, XtOrder, XtTrade
from xtquant import xtdata
from xtquant.xttrader import XtQuantTraderCallback
import datetime
# 量化核心控制对象
class SFGridController(XtQuantTraderCallback):
def __init__(self, account_no: str, miniQmtPath: str):
super().__init__()
xtdata.enable_hello = False
strategy_db.db.connect()
strategy_db.db.create_tables([strategy_db.TradeTarget])
print('- 数据库模块初始化完成')
self.init_trader(miniQmtPath)
self.init_trade_account(account_no, 'STOCK')
self.stock_trade_ctrl = {}
self.init_instrument_pool()
self.seq = None
print('- 三疯交易系统初始化完成')
def startMarketData(self):
print('- 启动市场数据订阅')
self.seq = xtdata.subscribe_whole_quote(['SH', 'SZ'], callback=self.onDataUpdate)
if self.seq == -1:
print('- 市场数据订阅失败')
else:
print(f'- 市场数据订阅成功, 订阅号={self.seq}')
def stopMarketData(self):
print('- 停止市场数据订阅')
if self.seq is not None and self.seq > 0:
xtdata.unsubscribe_quote(self.seq)
def add_trade_target(self, stock_code):
try:
stock_name = getInstrumentName(stock_code)
new_target = strategy_db.TradeTarget.create(
stock_name=stock_name,
stock_code=stock_code,
current_position=0,
grid_index=0,
last_trade_price=0.0,
current_buy_price=0.0,
current_buy_order_no='',
current_sell_price=0.0,
current_sell_order_no=''
)
new_target.save()
print(f'新增交易标的 {stock_code} {stock_name}, {new_target.id}')
# 刷新标的持仓
pos = getStockPosition(stock_code, self.xt_trader, self.account)
strategy_db.TradeTarget.update(current_position=pos).where(strategy_db.TradeTarget.stock_code == stock_code).execute()
# 更新标的池
self.refresh_targets()
# 添加交易控制器
stockTradeController = StockTradeController(new_target, self.xt_trader, self.account, new_target.enabled)
self.stock_trade_ctrl[stock_code] = stockTradeController
except Exception as e:
print(f'新增交易标的失败 {stock_code} {e}')
def del_trade_target(self, index:int):
target: strategy_db.TradeTarget = self.instrument_pool[index]
# self.stock_trade_ctrl.
del self.stock_trade_ctrl[target.stock_code]
target.delete_instance()
self.refresh_targets()
def init_trader(self, path):
session_id = int(time.time())
self.xt_trader = XtQuantTrader(path, session_id)
self.xt_trader.register_callback(self)
self.xt_trader.start()
self.xt_trader.connect()
print(f'- 交易对象初始化完成, {self.xt_trader.connected}')
def init_trade_account(self, account_id, account_type):
self.account= StockAccount(account_id, account_type)
print(f'- 交易账号对象初始化完成, 账号: {self.account.account_id}')
def init_instrument_pool(self):
self.refresh_targets()
for tradeTarget in self.instrument_pool:
stockTradeController = StockTradeController(tradeTarget, self.xt_trader, self.account, tradeTarget.enabled)
self.stock_trade_ctrl[tradeTarget.stock_code] = stockTradeController
print(f'- 初始化标的池初始化完成 , 共 {len(self.instrument_pool)} 个标的')
def refresh_targets(self):
# 更新标的池
self.instrument_pool:ModelSelect = strategy_db.TradeTarget.select()
self.print_pool()
def print_pool(self):
print("- 标的池信息")
for i in range(len(self.instrument_pool)):
target: strategy_db.TradeTarget = self.instrument_pool[i]
status = "新建" if target.status == 0 else "已建初始仓"
print(f' [序号-{i}] 股票代码: {target.stock_code}-{target.stock_name} 当前持仓: {target.current_position} 网格索引: {target.grid_index+1} Price {sfgrid_constants.grid_price[target.grid_index]} 状态: {status} 启用交易线程: {target.enabled}')
def print_position_info(self):
positions = self.xt_trader.query_stock_positions(self.account)
if positions:
print("\n- 持仓信息")
for pos in positions:
if pos.m_nVolume <=0:
continue
print(f"股票代码: {pos.stock_code}-{getInstrumentName(pos.stock_code)}")
print(f"总持仓: {pos.m_nVolume}")
print(f"可用持仓: {pos.m_nCanUseVolume}")
print(f"持仓成本: {pos.avg_price}")
print("---")
else:
print("\n当前无持仓")
def print_account_info(self):
account_info = self.xt_trader.query_stock_asset(self.account)
print(f"\n=== 账户信息 {self.account.account_id} ===")
print(f"可用资金: {account_info.m_dCash}")
print(f"总资产: {account_info.m_dTotalAsset}")
print(f"证券市值: {account_info.m_dMarketValue}")
def print_stock_orders(self):
orders = self.xt_trader.query_stock_orders(self.account, cancelable_only=True)
if orders:
print("\n=== 委托信息 ===")
for order in orders:
print(f"委托编号: {order.order_id}")
print(f"股票代码: {order.stock_code} {getInstrumentName(order.stock_code)}")
print(f"委托方向: {order.offset_flag} ")
print(f"委托价格: {order.price}")
print(f"委托数量: {order.order_volume}")
print(f"已成交数量: {order.traded_volume}")
print(f"委托状态: {order.order_status} ")
print("---")
else:
print("\n当前无委托记录")
# 初始化指定标的交易控制器
def start_stock_trade(self, index: int):
tradeTarget = self.instrument_pool[index]
# check existing thread
if tradeTarget.stock_code in self.stock_trade_ctrl:
tradeController: StockTradeController = self.stock_trade_ctrl[tradeTarget.stock_code]
if tradeController.isEnabled():
print(f"标的交易控制器已存在且正在运行 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n")
else:
print(f"标的交易控制器已存在但未运行,重新启动 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n")
tradeController.enabledTrading(True)
else:
stockTradeController = StockTradeController(tradeTarget, self.xt_trader, self.account, tradeTarget.enabled)
self.stock_trade_ctrl[tradeTarget.stock_code] = stockTradeController
print(f"\t创建标的交易控制器 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}")
def pause_stock_trade(self, index: int):
tradeTarget = self.instrument_pool[index]
if tradeTarget.stock_code in self.stock_trade_ctrl:
tradeController: StockTradeController = self.stock_trade_ctrl[tradeTarget.stock_code]
if tradeController.isEnabled():
print(f"暂停标的交易 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n")
tradeController.enabledTrading(False)
else:
print(f"标的交易已暂停 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n")
else:
print(f"标的交易控制器不存在 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n")
# ====== 市场回调方法 -- 以下方法由XtQuantData调用 ======
def onDataUpdate(self, data):
# 遍历股池
# for stock_code, data in data.items():
# if data['lastPrice'] == 10.0:
# print(f'target = {stock_code} - {getInstrumentName(stock_code)} {data['lastPrice']}')
# self.add_trade_target(stock_code)
# self.stock_trade_ctrl[stock_code].enabledTrading(True)
# if stock_code not in self.stock_trade_ctrl:
# self.add_trade_target(stock_code)
# self.stock_trade_ctrl[stock_code].onDataUpdate(data)
for target in self.instrument_pool:
stock_code = target.stock_code
# 如果存在对应的StockTradeController,则调用其onDataUpdate方法
if stock_code not in self.stock_trade_ctrl or stock_code not in data:
# print(f"股票代码 {stock_code} 未在交易控制器中找到,跳过处理。\n")
continue
stock_controller: StockTradeController = self.stock_trade_ctrl[stock_code]
stock_controller.onDataUpdate(data)
# ====== 市场回调方法 -- 以下方法由XtQuantTrader调用 ======
def on_connected(self):
"""
连接成功推送
"""
print(datetime.datetime.now(), '连接成功回调')
def on_disconnected(self):
"""
连接断开
:return:
"""
print(datetime.datetime.now(), '连接断开回调')
def on_stock_order(self, order:XtOrder):
"""
委托回报推送
:param order: XtOrder对象
:return:
"""
stockCode = order.stock_code
# 如果存在对应的StockTradeController,则调用其onDataUpdate方法
if stockCode in self.stock_trade_ctrl:
ctrl:StockTradeController = self.stock_trade_ctrl[stockCode]
ctrl.onOrderUpdate(order)
print(datetime.datetime.now(), '委托回调 投资备注', order.order_remark)
def on_stock_trade(self, trade:XtTrade):
"""
成交变动推送
:param trade: XtTrade对象
:return:
"""
print(datetime.datetime.now(), '成交回调', trade.order_remark, f"委托方向(48买 49卖) {trade.offset_flag} 成交价格 {trade.traded_price} 成交数量 {trade.traded_volume}")
def on_order_error(self, order_error):
"""
委托失败推送
:param order_error:XtOrderError 对象
:return:
"""
# print("on order_error callback")
# print(order_error.order_id, order_error.error_id, order_error.error_msg)
print(f"\n委托报错回调 {order_error.order_remark} {order_error.error_msg}")
def on_cancel_error(self, cancel_error):
"""
撤单失败推送
:param cancel_error: XtCancelError 对象
:return:
"""
print(datetime.datetime.now(), sys._getframe().f_code.co_name)
def on_order_stock_async_response(self, response):
"""
异步下单回报推送
:param response: XtOrderResponse 对象
:return:
"""
print(f"异步委托回调 投资备注: {response.order_remark}")
def on_cancel_order_stock_async_response(self, response):
"""
:param response: XtCancelOrderResponse 对象
:return:
"""
print(datetime.datetime.now(), sys._getframe().f_code.co_name)
def on_account_status(self, status):
"""
:param response: XtAccountStatus 对象
:return:
"""
print(datetime.datetime.now(), sys._getframe().f_code.co_name)