221 lines
9.4 KiB
Python
221 lines
9.4 KiB
Python
import datetime
|
|
import time
|
|
import config
|
|
from xtquant.xttype import StockAccount, XtOrder, XtOrderResponse, XtPosition, XtTrade
|
|
from xtquant.xttrader import XtQuantTrader
|
|
from xtquant.xttype import StockAccount
|
|
from core.logger import LogLevel, PrintLog
|
|
from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback
|
|
from xtquant.xttype import StockAccount
|
|
from xtquant import xtconstant, xtdata
|
|
import core.eventbus as eBus
|
|
|
|
class QmtV(XtQuantTraderCallback):
|
|
def __init__(self) -> None:
|
|
self.xttrader: XtQuantTrader
|
|
self.inited: bool = False
|
|
|
|
def getTrader(self) -> XtQuantTrader:
|
|
return self.xttrader
|
|
|
|
def init_qmtv(self):
|
|
sessionId= int(time.time())
|
|
self.xttrader = XtQuantTrader(config.miniQMTPath, sessionId)
|
|
xtdata.enable_hello = False
|
|
|
|
def connect(self):
|
|
self.xttrader.register_callback(self)
|
|
self.xttrader.start()
|
|
self.xttrader.connect()
|
|
|
|
PrintLog(LogLevel.INFO, f'- [{'成功' if self.xttrader.connected else '失败'}]市场交易连接: {config.miniQMTPath}')
|
|
if self.xttrader.connected == False:
|
|
self.inited = False
|
|
return
|
|
else:
|
|
self.inited = True
|
|
|
|
self.account = StockAccount(config.account_no, 'STOCK') # pyright: ignore[reportAssignmentType, reportAttributeAccessIssue]
|
|
PrintLog(LogLevel.INFO, f'- [成功]交易账号对象初始化完成, 账号: {config.account_no}') # pyright: ignore[reportOptionalMemberAccess]
|
|
subscribe_result = self.xttrader.subscribe(self.account)
|
|
PrintLog(LogLevel.INFO, f'- [{'成功' if subscribe_result == 0 else '失败'}:{subscribe_result}]交易状态订阅')
|
|
if subscribe_result != 0:
|
|
self.inited = False
|
|
return
|
|
self.startMarketDataSubscription()
|
|
|
|
|
|
def getStockPosition(self, stock_code: str):
|
|
volume = 0
|
|
print(f'获取股票持仓: {stock_code}, {self.xttrader.connected}, {self.account.account_id if self.account else None}') # pyright: ignore[reportAttributeAccessIssue]
|
|
positions = self.xttrader.query_stock_positions(self.account)
|
|
if positions:
|
|
for temp in positions:
|
|
pos:XtPosition = temp
|
|
if pos.stock_code == stock_code:
|
|
volume = pos.volume
|
|
break
|
|
|
|
return volume
|
|
|
|
|
|
def queryPendingOrder(self, stock_code:str, tag: str) -> list[XtOrder]:
|
|
if stock_code == None or tag == None:
|
|
return []
|
|
orders = self.xttrader.query_stock_orders(self.account)
|
|
result = [order for order in orders if order.order_status == xtconstant.ORDER_REPORTED and order.stock_code == stock_code and order.strategy_name == tag]
|
|
return result
|
|
|
|
def orderAsync(self, stock_code, orderVolume, orderType, orderPrice, priceType, orderRemark, strategy_name):
|
|
return self.xttrader.order_stock_async(
|
|
self.account,
|
|
str(stock_code),
|
|
orderType,
|
|
orderVolume,
|
|
priceType,
|
|
orderPrice,
|
|
strategy_name, # strategy_name
|
|
orderRemark # remark # type: ignore
|
|
)
|
|
|
|
def getInstrumentName(self, stock_code:str):
|
|
# print(f"getInstrumentName: 获取标的名称 {stock_code}")
|
|
detail = xtdata.get_instrument_detail(stock_code, False)
|
|
if detail is None:
|
|
return None
|
|
return detail['InstrumentName']
|
|
|
|
# def print_position_info(self):
|
|
# positions:list[XtPosition] = self.xt_trader.query_stock_positions(self.account)
|
|
# if positions:
|
|
# PrintLog(LogLevel.INFO, "\n- 持仓信息")
|
|
# for temp in positions:
|
|
# pos : XtPosition = temp
|
|
# if pos.volume <=0:
|
|
# continue
|
|
# PrintLog(LogLevel.INFO, f"股票代码: {pos.stock_code}-{getInstrumentName(pos.stock_code)}")
|
|
# PrintLog(LogLevel.INFO, f"总持仓: {pos.volume}")
|
|
# PrintLog(LogLevel.INFO, f"可用持仓: {pos.can_use_volume}")
|
|
# PrintLog(LogLevel.INFO, f"持仓成本: {pos.avg_price}")
|
|
# PrintLog(LogLevel.INFO, "---")
|
|
# else:
|
|
# PrintLog(LogLevel.INFO, "\n当前无持仓")
|
|
|
|
# def print_account_info(self):
|
|
# temp = self.xt_trader.query_stock_asset(self.account)
|
|
# asset: XtAsset = temp # type: ignore
|
|
|
|
# PrintLog(LogLevel.INFO, f"=== 账户信息 {self.account.account_id} ===") # type: ignore
|
|
# PrintLog(LogLevel.INFO, f"可用资金: {asset.cash}")
|
|
# PrintLog(LogLevel.INFO, f"总资产: {asset.total_asset}")
|
|
# PrintLog(LogLevel.INFO, f"证券市值: {asset.market_value}")
|
|
|
|
# def print_stock_orders(self):
|
|
# orders = self.xt_trader.query_stock_orders(self.account, cancelable_only=True)
|
|
# if orders:
|
|
# PrintLog(LogLevel.INFO, "\n=== 委托信息 ===")
|
|
# for order in orders:
|
|
# PrintLog(LogLevel.INFO, f"委托编号: {order.order_id}")
|
|
# PrintLog(LogLevel.INFO, f"股票代码: {order.stock_code} {getInstrumentName(order.stock_code)}")
|
|
# PrintLog(LogLevel.INFO, f"委托方向: {order.offset_flag} ")
|
|
# PrintLog(LogLevel.INFO, f"委托价格: {order.price}")
|
|
# PrintLog(LogLevel.INFO, f"委托数量: {order.order_volume}")
|
|
# PrintLog(LogLevel.INFO, f"已成交数量: {order.traded_volume}")
|
|
# PrintLog(LogLevel.INFO, f"委托状态: {order.order_status} ")
|
|
# PrintLog(LogLevel.INFO, "---")
|
|
# else:
|
|
# PrintLog(LogLevel.INFO, "\n当前无委托记录")
|
|
|
|
|
|
# ========================================#
|
|
def startMarketDataSubscription(self):
|
|
self.subscriptionId = xtdata.subscribe_whole_quote(['SH', 'SZ'], self.onDataUpdate)
|
|
|
|
def stopMarketDataSubscription(self):
|
|
PrintLog(LogLevel.INFO, '- 停止市场数据订阅')
|
|
|
|
if self.subscriptionId is not None and self.subscriptionId > 0:
|
|
xtdata.unsubscribe_quote(self.subscriptionId)
|
|
|
|
# ====== 市场回调方法 -- 以下方法由XtQuantData调用 ======
|
|
def onDataUpdate(self, data):
|
|
# 收集所有市场数据用于市场监控
|
|
eBus.event_bus.publish(eBus.MarketDataUpdate, data)
|
|
|
|
|
|
# ====== 市场回调方法 -- 以下方法由XtQuantTrader调用 ======
|
|
def on_connected(self):
|
|
"""
|
|
连接成功推送
|
|
"""
|
|
print(datetime.datetime.now(), '连接成功回调')
|
|
|
|
def on_disconnected(self):
|
|
"""
|
|
连接断开
|
|
:return:
|
|
"""
|
|
print(datetime.datetime.now(), '连接断开回调')
|
|
|
|
def on_stock_order(self, order:XtOrder):
|
|
"""
|
|
委托回报推送
|
|
:param order: XtOrder对象
|
|
:return:
|
|
"""
|
|
print(f'orderd {order.strategy_name}-{order.stock_code} {order.order_id} {order.order_volume}-{order.order_status}')
|
|
# stockCode = order.stock_code
|
|
# ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode]
|
|
# # 如果存在对应的StockTradeController,则调用其onDataUpdate方法
|
|
# if ctrl is not None and order.strategy_name == ctrl.getName():
|
|
# print(f'controller info {ctrl.getName()}')
|
|
# ctrl.onAsyncOrderResponse(order) # type: ignore
|
|
# else:
|
|
# print(f"委托下单回调 投资备注 orderId: {order.order_sysid} [{order.stock_code}-{order.instrument_name}] volume: {order.order_volume} 订单策略: '{order.strategy_name}'<-->'{ctrl.getName()}'")
|
|
|
|
|
|
def on_stock_trade(self, trade:XtTrade):
|
|
"""
|
|
成交变动推送
|
|
:param trade: XtTrade对象
|
|
:return:
|
|
"""
|
|
print(f"委托回调 投资备注 {trade.stock_code}-{trade.instrument_name} {trade.strategy_name} 不匹配 {trade.order_remark}")
|
|
# stockCode = trade.stock_code
|
|
# ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode]
|
|
# # 如果存在对应的StockTradeController,则调用其onDataUpdate方法
|
|
# if ctrl is not None and trade.strategy_name == ctrl.getName():
|
|
# ctrl.onOrderTrade(trade)
|
|
# else:
|
|
# print(f"委托回调 投资备注 {trade.strategy_name} 不匹配 {ctrl.getName()}")
|
|
|
|
def on_order_stock_async_response(self, response:XtOrderResponse):
|
|
print(f"委托回调 投资备注 {response.error_msg}{response.strategy_name} {response.order_remark}")
|
|
|
|
# stockCode = response.order_remark
|
|
# ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode]
|
|
# # 如果存在对应的StockTradeController,则调用其onDataUpdate方法
|
|
# if ctrl is not None and response.strategy_name == ctrl.getName():
|
|
# ctrl.onAsyncOrderResponse(response)
|
|
# else:
|
|
# print(f"委托回调 投资备注 {response.strategy_name} 不匹配 {ctrl.getName()}")
|
|
|
|
def on_order_error(self, order_error):
|
|
"""
|
|
委托失败推送
|
|
:param order_error:XtOrderError 对象
|
|
:return:
|
|
"""
|
|
print(f"\n委托报错回调 {order_error.order_remark} {order_error.error_msg}")
|
|
|
|
|
|
def on_account_status(self, status):
|
|
"""
|
|
:param response: XtAccountStatus 对象
|
|
:return:
|
|
"""
|
|
print(datetime.datetime.now(), status)
|
|
|
|
|
|
qmtv = QmtV()
|