diff --git a/sfgrid.py b/sfgrid.py index e47ffd7..428a957 100644 --- a/sfgrid.py +++ b/sfgrid.py @@ -1,120 +1,54 @@ # coding:utf-8 -import threading -import time, sys +import sys sys.stdout.reconfigure(encoding='utf-8') # 设置标准输出编码为UTF-8 -import strategy_db -from trade_thread import StockTradeThread -from util import getInstrumentName, interact -from xtquant.xttrader import XtQuantTrader -from xtquant.xttype import StockAccount +from strategy_controller import SFGridController +ctrl:SFGridController -class SFGridController: - def __init__(self, account_no: str = '99082560'): - print('\n=== 数据库模块初始化 ===\n') - strategy_db.db.connect() - strategy_db.db.create_tables([strategy_db.TradeTarget]) +def interact(): + """执行后进入repl模式""" + import code + code.InteractiveConsole(locals=globals()).interact() +def startMarketData(): + ctrl.startMarketData() - print('\n=== 三疯网络策略控制器初始化 ===\n') - self.init_instrument_pool() - self.init_trader(r'D:\\Programs\\DTQMT\\userdata_mini') - self.init_trade_account(account_no, 'STOCK') - self.grid_price = [11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1] - print('\n=== 三疯网络策略控制器初始化完成 ===\n') - - def add_trade_target(self, stock_code): - try: - stock_name = getInstrumentName(stock_code) - new_target = strategy_db.TradeTarget.create( - stock_name=stock_name, - stock_code=stock_code, - current_position=0, - grid_index=0, - last_trade_price=0.0, - current_buy_price=0.0, - current_sell_price=0.0 - ) - new_target.save() - print(f'新增交易标的 {stock_code} {stock_name}, {new_target.id}') - except Exception as e: - print(f'新增交易标的失败 {stock_code} {e}') +def stopMarketData(): + ctrl.stopMarketData() - def init_instrument_pool(self): - self.instrument_pool = strategy_db.TradeTarget.select() - print(f'初始化标的池初始化完成, {self.instrument_pool}') - - def init_trader(self, path): - session_id = int(time.time()) - self.xt_trader = XtQuantTrader(path, session_id) - self.xt_trader.start() - self.xt_trader.connect() - print(f'交易对象初始化完成, {self.xt_trader.connected}',) +def targetPool(): + ctrl.print_pool() - def init_trade_account(self, account_id, account_type): - self.account= StockAccount(account_id, account_type) - print(f'交易账号对象初始化完成, {self.account}') +def addTradeTarget(stock_code): + ctrl.add_trade_target(stock_code) - def print_account_info(self): - account_info = self.xt_trader.query_stock_asset(self.account) - - print(f"\n=== 账户信息 {self.account.account_id} ===") - print(f"可用资金: {account_info.m_dCash}") - print(f"总资产: {account_info.m_dTotalAsset}") - print(f"证券市值: {account_info.m_dMarketValue}") +def accountInfo(): + ctrl.print_account_info() - positions = self.xt_trader.query_stock_positions(self.account) - if positions: - print("\n=== 持仓信息 ===") - for pos in positions: - if pos.m_nVolume <=0: - continue - print(f"股票代码: {pos.stock_code}-{getInstrumentName(pos.stock_code)}") - print(f"总持仓: {pos.m_nVolume}") - print(f"可用持仓: {pos.m_nCanUseVolume}") - print(f"持仓成本: {pos.avg_price}") - print("---") - else: - print("\n当前无持仓") - - def print_stock_orders(self): - orders = self.xt_trader.query_stock_orders(self.account, cancelable_only=True) - if orders: - print("\n=== 委托信息 ===") - for order in orders: - print(f"委托编号: {order.order_id}") - print(f"股票代码: {order.stock_code} {getInstrumentName(order.stock_code)}") - print(f"委托方向: {order.offset_flag} ") - print(f"委托价格: {order.price}") - print(f"委托数量: {order.order_volume}") - print(f"已成交数量: {order.traded_volume}") - print(f"委托状态: {order.order_status} ") - print("---") - else: - print("\n当前无委托记录") +def positionInfo(): + ctrl.print_position_info() +def startTrade(index:int): + ctrl.start_stock_trade(index) - def init_stock_trade_threads(self): - for stock_code in self.instrument_pool: - new_job = StockTradeThread(stock_code) - new_job.name = f"StockTradeThread-{stock_code}" - new_job.start() - - def start_stock_trade_thread(self, stock: strategy_db.TradeTarget): - stock.name = f"StockTradeThread-{stock.stock_code}" - stock.start() - print(f"启动标的交易线程 {stock.stock_code} {getInstrumentName(stock.stock_code)}\n") - threading.Event().wait(2) # 模拟交易操作的等待时间 +def pauseTrade(index:int): + ctrl.pause_stock_trade(index) +def help(): + print("可用命令:") + print(" ===================================================") + print(" startMarketData() - 启动市场数据接收") + print(" stopMarketData() - 停止市场数据接收") + print(" addTradeTarget(stock_code) - 添加交易标的") + print(" accountInfo() - 打印账户信息") + print(" positionInfo() - 打印持仓信息") + print(" targetPool() - 打印标的池信息") + print(" startTrade(index) - 启动标的交易线程") + print(" pauseTrade(index) - 暂停标的交易线程") + print(" ===================================================") + print(" ctrl - 访问控制器实例") if __name__ == '__main__': ctrl = SFGridController('99082560') - ctrl.print_account_info() - ctrl.print_stock_orders() - - ctrl.init_stock_trade_threads() - # 交互阻塞 interact() - - diff --git a/stock_trade_controller.py b/stock_trade_controller.py new file mode 100644 index 0000000..e4eca53 --- /dev/null +++ b/stock_trade_controller.py @@ -0,0 +1,48 @@ +import threading +import time +from strategy_db import TradeTarget +import strategy_db +from util import getInstrumentName, getStockPosition +from xtquant import xttrader, xtdata +from xtquant.xttype import StockAccount + + +class StockTradeController: + + def __init__(self, tradeTarget: TradeTarget, xt_trader: xttrader.XtQuantTrader, account: StockAccount, enabled: bool = False): + super().__init__() #必须调用父类的初始化方法 + self.tradeTarget = tradeTarget + self.xt_trader = xt_trader + self.account = account + self.currentPosition = getStockPosition(self.tradeTarget.stock_code, self.xt_trader, self.account) + self.tradeRecords = strategy_db.TradeRecord.select().where(strategy_db.TradeRecord.stock_code == self.tradeTarget.stock_code).order_by(strategy_db.TradeRecord.trade_time.desc()) + + print(f"创建标的交易线程 {self.tradeTarget.stock_code} {getInstrumentName(self.tradeTarget.stock_code)} {tradeTarget}\n") + xtdata.subscribe_quote(tradeTarget.stock_code, period='tick', start_time='', end_time='', count=0, callback=self.onDataUpdate) + + def enabledTrading(self, enabled: bool): + self.tradeTarget.enabled = enabled + self.tradeTarget.save() + + def isEnabled(self) -> bool: + return self.tradeTarget.enabled + + + def onDataUpdate(self, data): + if self.isEnabled(): + print(f"标的{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} 行情数据更新 {data[self.tradeTarget.stock_code]}\n") + + + # Description: 程序启动后 + def check_stock_position(self): + volume = getStockPosition(self.tradeTarget.stock_code, self.xt_trader, self.account) + pass + + + # Description: 新标的,建基础仓 + def init_stock_position(self): + pass + + # Description: 双向下单 + def two_way_order(self): + pass \ No newline at end of file diff --git a/strategy_controller.py b/strategy_controller.py new file mode 100644 index 0000000..89c3cb7 --- /dev/null +++ b/strategy_controller.py @@ -0,0 +1,176 @@ +# coding:utf-8 +import time, sys +sys.stdout.reconfigure(encoding='utf-8') # 设置标准输出编码为UTF-8 +import strategy_db +from stock_trade_controller import StockTradeController +from util import getInstrumentName, getStockPosition +from xtquant.xttrader import XtQuantTrader +from xtquant.xttype import StockAccount +from xtquant import xtdata + +class SFGridController: + def __init__(self, account_no: str = '99082560'): + print('=== 数据库模块初始化 ===\n') + strategy_db.db.connect() + strategy_db.db.create_tables([strategy_db.TradeTarget]) + print('=== 三疯网格策略控制器初始化 ===\n') + self.init_instrument_pool() + self.init_trader(r'D:\\Programs\\DTQMT\\userdata_mini') + self.init_trade_account(account_no, 'STOCK') + self.grid_price = [11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1] + self.stock_trade_ctrl = {} + self.seq = None + print('=== 三疯网格策略控制器初始化完成 ===\n') + + def startMarketData(self): + print('=== 三疯网格策略 启动市场数据接收 ===\n') + self.seq = xtdata.subscribe_whole_quote(['SH', 'SZ'], callback=self.onDataUpdate) + if self.seq == -1: + print('市场数据订阅失败\n') + else: + print(f'市场数据订阅成功, 订阅号={self.seq}\n') + + + def stopMarketData(self): + print('=== 三疯网格策略 停止市场数据接收 ===\n') + if self.seq is not None and self.seq > 0: + xtdata.unsubscribe_quote(self.seq) + + + def onDataUpdate(self, data): + # 遍历股池 + for target in self.instrument_pool: + stock_code = target.stock_code + # 如果存在对应的StockTradeController,则调用其onDataUpdate方法 + if stock_code not in self.stock_trade_ctrl: + print(f"股票代码 {stock_code} 未在交易控制器中找到,跳过处理。\n") + continue + if stock_code not in data: + print(f"股票代码 {stock_code} 未在行情数据中找到,跳过处理。\n") + continue + stock_controller: StockTradeController = self.stock_trade_ctrl[stock_code] + stock_controller.onDataUpdate(data) + + def add_trade_target(self, stock_code): + try: + stock_name = getInstrumentName(stock_code) + new_target = strategy_db.TradeTarget.create( + stock_name=stock_name, + stock_code=stock_code, + current_position=0, + grid_index=0, + last_trade_price=0.0, + current_buy_price=0.0, + current_sell_price=0.0 + ) + new_target.save() + print(f'新增交易标的 {stock_code} {stock_name}, {new_target.id}') + # 刷新标的持仓 + pos = getStockPosition(stock_code, self.xt_trader, self.account) + strategy_db.TradeTarget.update(current_position=pos).where(strategy_db.TradeTarget.stock_code == stock_code).execute() + # 更新标的池 + self.instrument_pool = strategy_db.TradeTarget.select() + # 添加交易控制器 + stockTradeController = StockTradeController(new_target, self.xt_trader, self.account, new_target.enabled) + self.stock_trade_ctrl[stock_code] = stockTradeController + + except Exception as e: + print(f'新增交易标的失败 {stock_code} {e}') + + def init_trader(self, path): + session_id = int(time.time()) + self.xt_trader = XtQuantTrader(path, session_id) + self.xt_trader.start() + self.xt_trader.connect() + print(f'交易对象初始化完成, {self.xt_trader.connected}') + + def init_trade_account(self, account_id, account_type): + self.account= StockAccount(account_id, account_type) + print(f'交易账号对象初始化完成, {self.account}') + + def init_instrument_pool(self): + self.instrument_pool = strategy_db.TradeTarget.select() + + for tradeTarget in self.instrument_pool: + stockTradeController = StockTradeController(tradeTarget, self.xt_trader, self.account, tradeTarget.enabled) + self.stock_trade_ctrl[tradeTarget.stock_code] = stockTradeController + + print(f'初始化标的池初始化完成 , 共 {len(self.instrument_pool)} 个标的\n') + self.print_pool() + + def print_pool(self): + print("\n=== 标的池信息 ===") + for i in range(len(self.instrument_pool)): + target: strategy_db.TradeTarget = self.instrument_pool[i] + status = "新建" if target.status == 0 else "已建初始仓" + print(f"[Index-{i}] 股票代码: {target.stock_code}-{target.stock_name} 当前持仓: {target.current_position} 网格索引: {target.grid_index+1} Price {self.grid_price[target.grid_index]} 状态: {status} 启用交易线程: {target.enabled}") + + def print_position_info(self): + positions = self.xt_trader.query_stock_positions(self.account) + if positions: + print("\n=== 持仓信息 ===") + for pos in positions: + if pos.m_nVolume <=0: + continue + print(f"股票代码: {pos.stock_code}-{getInstrumentName(pos.stock_code)}") + print(f"总持仓: {pos.m_nVolume}") + print(f"可用持仓: {pos.m_nCanUseVolume}") + print(f"持仓成本: {pos.avg_price}") + print("---") + else: + print("\n当前无持仓") + + def print_account_info(self): + account_info = self.xt_trader.query_stock_asset(self.account) + + print(f"\n=== 账户信息 {self.account.account_id} ===") + print(f"可用资金: {account_info.m_dCash}") + print(f"总资产: {account_info.m_dTotalAsset}") + print(f"证券市值: {account_info.m_dMarketValue}") + + def print_stock_orders(self): + orders = self.xt_trader.query_stock_orders(self.account, cancelable_only=True) + if orders: + print("\n=== 委托信息 ===") + for order in orders: + print(f"委托编号: {order.order_id}") + print(f"股票代码: {order.stock_code} {getInstrumentName(order.stock_code)}") + print(f"委托方向: {order.offset_flag} ") + print(f"委托价格: {order.price}") + print(f"委托数量: {order.order_volume}") + print(f"已成交数量: {order.traded_volume}") + print(f"委托状态: {order.order_status} ") + print("---") + else: + print("\n当前无委托记录") + + + # 初始化指定标的交易线程 + def start_stock_trade(self, index: int): + tradeTarget = self.instrument_pool[index] + # check existing thread + if tradeTarget.stock_code in self.stock_trade_ctrl: + tradeController: StockTradeController = self.stock_trade_ctrl[tradeTarget.stock_code] + if tradeController.isEnabled(): + print(f"标的交易控制器已存在且正在运行 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n") + else: + print(f"标的交易控制器已存在但未运行,重新启动 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n") + tradeController.enabledTrading(True) + else: + stockTradeController = StockTradeController(tradeTarget, self.xt_trader, self.account, tradeTarget.enabled) + self.stock_trade_ctrl[tradeTarget.stock_code] = stockTradeController + print(f"创建标的交易控制器 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n") + + + def pause_stock_trade(self, index: int): + tradeTarget = self.instrument_pool[index] + if tradeTarget.stock_code in self.stock_trade_ctrl: + tradeController: StockTradeController = self.stock_trade_ctrl[tradeTarget.stock_code] + if tradeController.isEnabled(): + print(f"暂停标的交易 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n") + tradeController.enabledTrading(False) + else: + print(f"标的交易已暂停 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n") + else: + print(f"标的交易控制器不存在 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n") + diff --git a/strategy_db.py b/strategy_db.py index dd9930b..e2b0a27 100644 --- a/strategy_db.py +++ b/strategy_db.py @@ -17,5 +17,12 @@ class TradeTarget(BaseModel): last_trade_price = FloatField() current_buy_price = FloatField() current_sell_price = FloatField() - status = IntegerField(default=0) # 1表示启用,0表示停止 - enabled = BooleanField(default=False) # 是否启用该标的 \ No newline at end of file + status = IntegerField(default=0) # 0表示新建,1表示已建初始仓 + enabled = BooleanField(default=False) # 是否启动交易线程 + +class TradeRecord(BaseModel): + stock_code = CharField() + trade_type = CharField() # 'buy' 或 'sell' + price = FloatField() + volume = IntegerField() + trade_time = CharField() # 可以存储为字符串格式的时间 \ No newline at end of file diff --git a/trade_thread.py b/trade_thread.py deleted file mode 100644 index bc7d610..0000000 --- a/trade_thread.py +++ /dev/null @@ -1,33 +0,0 @@ -import threading -import time -from strategy_db import TradeTarget -from util import getInstrumentName, getStockPosition -from xtquant import xttrader -from xtquant.xttype import StockAccount - -class StockTradeThread(threading.Thread): - def __init__(self, tradeTarget: TradeTarget, xt_trader: xttrader.XtQuantTrader, account: StockAccount): - super().__init__() #必须调用父类的初始化方法 - self.tradeTarget = tradeTarget - self.xt_trader = xt_trader - self.account = account - - def run(self) -> None: - print(f"启动标的交易线程 {self.tradeTarget.stock_code} {getInstrumentName(self.tradeTarget.stock_code)}\n") - while True: - print('{} is running >> {}'.format(threading.current_thread().name, self.tradeTarget.stock_code)) - time.sleep(2) - - - # Description: 程序启动后 - def check_stock_position(self): - volume = getStockPosition(self.stock_code, self.xt_trader, self.account) - pass - - # Description: 新标的,建基础仓 - def init_stock_position(self): - pass - - # Description: 双向下单 - def two_way_order(self): - pass \ No newline at end of file diff --git a/util.py b/util.py index dd475c0..41b374f 100644 --- a/util.py +++ b/util.py @@ -17,8 +17,3 @@ def getStockPosition(stock_code: str, xt_trader: xttrader.XtQuantTrader, account break return volume - -def interact(): - """执行后进入repl模式""" - import code - code.InteractiveConsole(locals=globals()).interact() \ No newline at end of file