# coding:utf-8 import threading import time, sys sys.stdout.reconfigure(encoding='utf-8') # 设置标准输出编码为UTF-8 import strategy_db from trade_thread import StockTradeThread from util import getInstrumentName from xtquant.xttrader import XtQuantTrader from xtquant.xttype import StockAccount class SFGridController: def __init__(self, account_no: str = '99082560'): print('\n=== 数据库模块初始化 ===\n') strategy_db.db.connect() strategy_db.db.create_tables([strategy_db.TradeTarget]) print('\n=== 三疯网络策略控制器初始化 ===\n') self.init_instrument_pool() self.init_trader(r'D:\\Programs\\DTQMT\\userdata_mini') self.init_trade_account(account_no, 'STOCK') self.grid_price = [11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1] print('\n=== 三疯网络策略控制器初始化完成 ===\n') def add_trade_target(self, stock_code): try: stock_name = getInstrumentName(stock_code) new_target = strategy_db.TradeTarget.create( stock_name=stock_name, stock_code=stock_code, current_position=0, grid_index=0, last_trade_price=0.0, current_buy_price=0.0, current_sell_price=0.0 ) new_target.save() print(f'新增交易标的 {stock_code} {stock_name}, {new_target.id}') except Exception as e: print(f'新增交易标的失败 {stock_code} {e}') def init_instrument_pool(self): self.instrument_pool = strategy_db.TradeTarget.select() print(f'初始化标的池初始化完成, {self.instrument_pool}') def init_trader(self, path): session_id = int(time.time()) self.xt_trader = XtQuantTrader(path, session_id) self.xt_trader.start() self.xt_trader.connect() print(f'交易对象初始化完成, {self.xt_trader.connected}',) def init_trade_account(self, account_id, account_type): self.account= StockAccount(account_id, account_type) print(f'交易账号对象初始化完成, {self.account}') def print_account_info(self): account_info = self.xt_trader.query_stock_asset(self.account) print(f"\n=== 账户信息 {self.account.account_id} ===") print(f"可用资金: {account_info.m_dCash}") print(f"总资产: {account_info.m_dTotalAsset}") print(f"证券市值: {account_info.m_dMarketValue}") positions = self.xt_trader.query_stock_positions(self.account) if positions: print("\n=== 持仓信息 ===") for pos in positions: if pos.m_nVolume <=0: continue print(f"股票代码: {pos.stock_code}-{getInstrumentName(pos.stock_code)}") print(f"总持仓: {pos.m_nVolume}") print(f"可用持仓: {pos.m_nCanUseVolume}") print(f"持仓成本: {pos.avg_price}") print("---") else: print("\n当前无持仓") def print_stock_orders(self): orders = self.xt_trader.query_stock_orders(self.account, cancelable_only=True) if orders: print("\n=== 委托信息 ===") for order in orders: print(f"委托编号: {order.order_id}") print(f"股票代码: {order.stock_code} {getInstrumentName(order.stock_code)}") print(f"委托方向: {order.offset_flag} ") print(f"委托价格: {order.price}") print(f"委托数量: {order.order_volume}") print(f"已成交数量: {order.traded_volume}") print(f"委托状态: {order.order_status} ") print("---") else: print("\n当前无委托记录") def interact(self): """执行后进入repl模式""" import code code.InteractiveConsole(locals=globals()).interact() def init_stock_trade_threads(self): for stock_code in self.instrument_pool: new_job = StockTradeThread(stock_code) new_job.name = f"StockTradeThread-{stock_code}" new_job.start() def stock_trade_thread(self, stock_code): print(f"启动标的交易线程 {stock_code} {getInstrumentName(stock_code)}\n") threading.Event().wait(2) # 模拟交易操作的等待时间 if __name__ == '__main__': ctrl = SFGridController('99082560') ctrl.print_account_info() ctrl.print_stock_orders() ctrl.init_stock_trade_threads() # 交互阻塞 ctrl.interact()