import datetime import time import config from xtquant.xttype import StockAccount, XtOrder, XtOrderResponse, XtPosition, XtTrade from xtquant.xttrader import XtQuantTrader from xtquant.xttype import StockAccount from core.logger import LogLevel, PrintLog from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount from xtquant import xtconstant, xtdata import core.eventbus as eBus class QmtV(XtQuantTraderCallback): def __init__(self) -> None: self.xttrader: XtQuantTrader self.inited: bool = False def getTrader(self) -> XtQuantTrader: return self.xttrader def init_qmtv(self): sessionId= int(time.time()) self.xttrader = XtQuantTrader(config.miniQMTPath, sessionId) xtdata.enable_hello = False def connect(self): self.xttrader.register_callback(self) self.xttrader.start() self.xttrader.connect() PrintLog(LogLevel.INFO, f'- [{'成功' if self.xttrader.connected else '失败'}]市场交易连接: {config.miniQMTPath}') if self.xttrader.connected == False: self.inited = False return else: self.inited = True self.account = StockAccount(config.account_no, 'STOCK') # pyright: ignore[reportAssignmentType, reportAttributeAccessIssue] PrintLog(LogLevel.INFO, f'- [成功]交易账号对象初始化完成, 账号: {config.account_no}') # pyright: ignore[reportOptionalMemberAccess] subscribe_result = self.xttrader.subscribe(self.account) PrintLog(LogLevel.INFO, f'- [{'成功' if subscribe_result == 0 else '失败'}:{subscribe_result}]交易状态订阅') if subscribe_result != 0: self.inited = False return self.startMarketDataSubscription() def getStockPosition(self, stock_code: str): volume = 0 print(f'获取股票持仓: {stock_code}, {self.xttrader.connected}, {self.account.account_id if self.account else None}') # pyright: ignore[reportAttributeAccessIssue] positions = self.xttrader.query_stock_positions(self.account) if positions: for temp in positions: pos:XtPosition = temp if pos.stock_code == stock_code: volume = pos.volume break return volume def queryPendingOrder(self, stock_code:str, tag: str) -> list[XtOrder]: if stock_code == None or tag == None: return [] orders = self.xttrader.query_stock_orders(self.account) result = [order for order in orders if order.order_status == xtconstant.ORDER_REPORTED and order.stock_code == stock_code and order.strategy_name == tag] return result def orderAsync(self, stock_code, orderVolume, orderType, orderPrice, priceType, orderRemark, strategy_name): return self.xttrader.order_stock_async( self.account, str(stock_code), orderType, orderVolume, priceType, orderPrice, strategy_name, # strategy_name orderRemark # remark # type: ignore ) def getInstrumentName(self, stock_code:str): # print(f"getInstrumentName: 获取标的名称 {stock_code}") detail = xtdata.get_instrument_detail(stock_code, False) if detail is None: return None return detail['InstrumentName'] # def print_position_info(self): # positions:list[XtPosition] = self.xt_trader.query_stock_positions(self.account) # if positions: # PrintLog(LogLevel.INFO, "\n- 持仓信息") # for temp in positions: # pos : XtPosition = temp # if pos.volume <=0: # continue # PrintLog(LogLevel.INFO, f"股票代码: {pos.stock_code}-{getInstrumentName(pos.stock_code)}") # PrintLog(LogLevel.INFO, f"总持仓: {pos.volume}") # PrintLog(LogLevel.INFO, f"可用持仓: {pos.can_use_volume}") # PrintLog(LogLevel.INFO, f"持仓成本: {pos.avg_price}") # PrintLog(LogLevel.INFO, "---") # else: # PrintLog(LogLevel.INFO, "\n当前无持仓") # def print_account_info(self): # temp = self.xt_trader.query_stock_asset(self.account) # asset: XtAsset = temp # type: ignore # PrintLog(LogLevel.INFO, f"=== 账户信息 {self.account.account_id} ===") # type: ignore # PrintLog(LogLevel.INFO, f"可用资金: {asset.cash}") # PrintLog(LogLevel.INFO, f"总资产: {asset.total_asset}") # PrintLog(LogLevel.INFO, f"证券市值: {asset.market_value}") # def print_stock_orders(self): # orders = self.xt_trader.query_stock_orders(self.account, cancelable_only=True) # if orders: # PrintLog(LogLevel.INFO, "\n=== 委托信息 ===") # for order in orders: # PrintLog(LogLevel.INFO, f"委托编号: {order.order_id}") # PrintLog(LogLevel.INFO, f"股票代码: {order.stock_code} {getInstrumentName(order.stock_code)}") # PrintLog(LogLevel.INFO, f"委托方向: {order.offset_flag} ") # PrintLog(LogLevel.INFO, f"委托价格: {order.price}") # PrintLog(LogLevel.INFO, f"委托数量: {order.order_volume}") # PrintLog(LogLevel.INFO, f"已成交数量: {order.traded_volume}") # PrintLog(LogLevel.INFO, f"委托状态: {order.order_status} ") # PrintLog(LogLevel.INFO, "---") # else: # PrintLog(LogLevel.INFO, "\n当前无委托记录") # ========================================# def startMarketDataSubscription(self): self.subscriptionId = xtdata.subscribe_whole_quote(['SH', 'SZ'], self.onDataUpdate) PrintLog(LogLevel.INFO, f'- [市场数据订阅成功-{self.subscriptionId}]') def stopMarketDataSubscription(self): PrintLog(LogLevel.INFO, '- 停止市场数据订阅') if self.subscriptionId is not None and self.subscriptionId > 0: xtdata.unsubscribe_quote(self.subscriptionId) # ====== 市场回调方法 -- 以下方法由XtQuantData调用 ====== def onDataUpdate(self, data): # 收集所有市场数据用于市场监控 eBus.event_bus.publish(eBus.MarketDataUpdate, data) # ====== 市场回调方法 -- 以下方法由XtQuantTrader调用 ====== def on_connected(self): """ 连接成功推送 """ print(datetime.datetime.now(), '连接成功回调') def on_disconnected(self): """ 连接断开 :return: """ print(datetime.datetime.now(), '连接断开回调') def on_stock_order(self, order:XtOrder): """ 委托回报推送 :param order: XtOrder对象 :return: """ print(f"委托回调 on_stock_order 投资备注 {order.strategy_name} {order.order_remark}") # print(f'orderd {order.strategy_name}-{order.stock_code} {order.order_id} {order.order_volume}-{order.order_status}') # stockCode = order.stock_code # ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode] # # 如果存在对应的StockTradeController,则调用其onDataUpdate方法 # if ctrl is not None and order.strategy_name == ctrl.getName(): # print(f'controller info {ctrl.getName()}') # ctrl.onAsyncOrderResponse(order) # type: ignore # else: # print(f"委托下单回调 投资备注 orderId: {order.order_sysid} [{order.stock_code}-{order.instrument_name}] volume: {order.order_volume} 订单策略: '{order.strategy_name}'<-->'{ctrl.getName()}'") def on_stock_trade(self, trade:XtTrade): """ 成交变动推送 :param trade: XtTrade对象 :return: """ eBus.event_bus.publish(eBus.MarketOrderTraded, trade) # stockCode = trade.stock_code # ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode] # # 如果存在对应的StockTradeController,则调用其onDataUpdate方法 # if ctrl is not None and trade.strategy_name == ctrl.getName(): # ctrl.onOrderTrade(trade) # else: # print(f"委托回调 投资备注 {trade.strategy_name} 不匹配 {ctrl.getName()}") def on_order_stock_async_response(self, response:XtOrderResponse): print(f"委托回调 on_order_stock_async_response 投资备注 {response.error_msg}{response.strategy_name} {response.order_remark}") # stockCode = response.order_remark # ctrl:SFGridStrategy = self.stock_trade_ctrl[stockCode] # # 如果存在对应的StockTradeController,则调用其onDataUpdate方法 # if ctrl is not None and response.strategy_name == ctrl.getName(): # ctrl.onAsyncOrderResponse(response) # else: # print(f"委托回调 投资备注 {response.strategy_name} 不匹配 {ctrl.getName()}") def on_order_error(self, order_error): """ 委托失败推送 :param order_error:XtOrderError 对象 :return: """ print(f"\n委托报错回调 {order_error.order_remark} {order_error.error_msg}") def on_account_status(self, status): """ :param response: XtAccountStatus 对象 :return: """ print(datetime.datetime.now(), status) qmtv = QmtV()