完成基础框架

This commit is contained in:
2025-10-28 15:19:12 +08:00
parent be7e7dda48
commit 26cf98ce17
6 changed files with 269 additions and 142 deletions
+36 -102
View File
@@ -1,120 +1,54 @@
# coding:utf-8 # coding:utf-8
import threading import sys
import time, sys
sys.stdout.reconfigure(encoding='utf-8') # 设置标准输出编码为UTF-8 sys.stdout.reconfigure(encoding='utf-8') # 设置标准输出编码为UTF-8
import strategy_db from strategy_controller import SFGridController
from trade_thread import StockTradeThread
from util import getInstrumentName, interact
from xtquant.xttrader import XtQuantTrader
from xtquant.xttype import StockAccount
ctrl:SFGridController
class SFGridController: def interact():
def __init__(self, account_no: str = '99082560'): """执行后进入repl模式"""
print('\n=== 数据库模块初始化 ===\n') import code
strategy_db.db.connect() code.InteractiveConsole(locals=globals()).interact()
strategy_db.db.create_tables([strategy_db.TradeTarget])
def startMarketData():
ctrl.startMarketData()
print('\n=== 三疯网络策略控制器初始化 ===\n') def stopMarketData():
self.init_instrument_pool() ctrl.stopMarketData()
self.init_trader(r'D:\\Programs\\DTQMT\\userdata_mini')
self.init_trade_account(account_no, 'STOCK')
self.grid_price = [11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1]
print('\n=== 三疯网络策略控制器初始化完成 ===\n')
def add_trade_target(self, stock_code):
try:
stock_name = getInstrumentName(stock_code)
new_target = strategy_db.TradeTarget.create(
stock_name=stock_name,
stock_code=stock_code,
current_position=0,
grid_index=0,
last_trade_price=0.0,
current_buy_price=0.0,
current_sell_price=0.0
)
new_target.save()
print(f'新增交易标的 {stock_code} {stock_name}, {new_target.id}')
except Exception as e:
print(f'新增交易标的失败 {stock_code} {e}')
def init_instrument_pool(self): def targetPool():
self.instrument_pool = strategy_db.TradeTarget.select() ctrl.print_pool()
print(f'初始化标的池初始化完成, {self.instrument_pool}')
def init_trader(self, path):
session_id = int(time.time())
self.xt_trader = XtQuantTrader(path, session_id)
self.xt_trader.start()
self.xt_trader.connect()
print(f'交易对象初始化完成, {self.xt_trader.connected}',)
def init_trade_account(self, account_id, account_type): def addTradeTarget(stock_code):
self.account= StockAccount(account_id, account_type) ctrl.add_trade_target(stock_code)
print(f'交易账号对象初始化完成, {self.account}')
def print_account_info(self): def accountInfo():
account_info = self.xt_trader.query_stock_asset(self.account) ctrl.print_account_info()
print(f"\n=== 账户信息 {self.account.account_id} ===")
print(f"可用资金: {account_info.m_dCash}")
print(f"总资产: {account_info.m_dTotalAsset}")
print(f"证券市值: {account_info.m_dMarketValue}")
positions = self.xt_trader.query_stock_positions(self.account) def positionInfo():
if positions: ctrl.print_position_info()
print("\n=== 持仓信息 ===")
for pos in positions:
if pos.m_nVolume <=0:
continue
print(f"股票代码: {pos.stock_code}-{getInstrumentName(pos.stock_code)}")
print(f"总持仓: {pos.m_nVolume}")
print(f"可用持仓: {pos.m_nCanUseVolume}")
print(f"持仓成本: {pos.avg_price}")
print("---")
else:
print("\n当前无持仓")
def print_stock_orders(self):
orders = self.xt_trader.query_stock_orders(self.account, cancelable_only=True)
if orders:
print("\n=== 委托信息 ===")
for order in orders:
print(f"委托编号: {order.order_id}")
print(f"股票代码: {order.stock_code} {getInstrumentName(order.stock_code)}")
print(f"委托方向: {order.offset_flag} ")
print(f"委托价格: {order.price}")
print(f"委托数量: {order.order_volume}")
print(f"已成交数量: {order.traded_volume}")
print(f"委托状态: {order.order_status} ")
print("---")
else:
print("\n当前无委托记录")
def startTrade(index:int):
ctrl.start_stock_trade(index)
def init_stock_trade_threads(self): def pauseTrade(index:int):
for stock_code in self.instrument_pool: ctrl.pause_stock_trade(index)
new_job = StockTradeThread(stock_code)
new_job.name = f"StockTradeThread-{stock_code}"
new_job.start()
def start_stock_trade_thread(self, stock: strategy_db.TradeTarget):
stock.name = f"StockTradeThread-{stock.stock_code}"
stock.start()
print(f"启动标的交易线程 {stock.stock_code} {getInstrumentName(stock.stock_code)}\n")
threading.Event().wait(2) # 模拟交易操作的等待时间
def help():
print("可用命令:")
print(" ===================================================")
print(" startMarketData() - 启动市场数据接收")
print(" stopMarketData() - 停止市场数据接收")
print(" addTradeTarget(stock_code) - 添加交易标的")
print(" accountInfo() - 打印账户信息")
print(" positionInfo() - 打印持仓信息")
print(" targetPool() - 打印标的池信息")
print(" startTrade(index) - 启动标的交易线程")
print(" pauseTrade(index) - 暂停标的交易线程")
print(" ===================================================")
print(" ctrl - 访问控制器实例")
if __name__ == '__main__': if __name__ == '__main__':
ctrl = SFGridController('99082560') ctrl = SFGridController('99082560')
ctrl.print_account_info()
ctrl.print_stock_orders()
ctrl.init_stock_trade_threads()
# 交互阻塞 # 交互阻塞
interact() interact()
+48
View File
@@ -0,0 +1,48 @@
import threading
import time
from strategy_db import TradeTarget
import strategy_db
from util import getInstrumentName, getStockPosition
from xtquant import xttrader, xtdata
from xtquant.xttype import StockAccount
class StockTradeController:
def __init__(self, tradeTarget: TradeTarget, xt_trader: xttrader.XtQuantTrader, account: StockAccount, enabled: bool = False):
super().__init__() #必须调用父类的初始化方法
self.tradeTarget = tradeTarget
self.xt_trader = xt_trader
self.account = account
self.currentPosition = getStockPosition(self.tradeTarget.stock_code, self.xt_trader, self.account)
self.tradeRecords = strategy_db.TradeRecord.select().where(strategy_db.TradeRecord.stock_code == self.tradeTarget.stock_code).order_by(strategy_db.TradeRecord.trade_time.desc())
print(f"创建标的交易线程 {self.tradeTarget.stock_code} {getInstrumentName(self.tradeTarget.stock_code)} {tradeTarget}\n")
xtdata.subscribe_quote(tradeTarget.stock_code, period='tick', start_time='', end_time='', count=0, callback=self.onDataUpdate)
def enabledTrading(self, enabled: bool):
self.tradeTarget.enabled = enabled
self.tradeTarget.save()
def isEnabled(self) -> bool:
return self.tradeTarget.enabled
def onDataUpdate(self, data):
if self.isEnabled():
print(f"标的{self.tradeTarget.stock_code}-{self.tradeTarget.stock_name} 行情数据更新 {data[self.tradeTarget.stock_code]}\n")
# Description: 程序启动后
def check_stock_position(self):
volume = getStockPosition(self.tradeTarget.stock_code, self.xt_trader, self.account)
pass
# Description: 新标的,建基础仓
def init_stock_position(self):
pass
# Description: 双向下单
def two_way_order(self):
pass
+176
View File
@@ -0,0 +1,176 @@
# coding:utf-8
import time, sys
sys.stdout.reconfigure(encoding='utf-8') # 设置标准输出编码为UTF-8
import strategy_db
from stock_trade_controller import StockTradeController
from util import getInstrumentName, getStockPosition
from xtquant.xttrader import XtQuantTrader
from xtquant.xttype import StockAccount
from xtquant import xtdata
class SFGridController:
def __init__(self, account_no: str = '99082560'):
print('=== 数据库模块初始化 ===\n')
strategy_db.db.connect()
strategy_db.db.create_tables([strategy_db.TradeTarget])
print('=== 三疯网格策略控制器初始化 ===\n')
self.init_instrument_pool()
self.init_trader(r'D:\\Programs\\DTQMT\\userdata_mini')
self.init_trade_account(account_no, 'STOCK')
self.grid_price = [11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1]
self.stock_trade_ctrl = {}
self.seq = None
print('=== 三疯网格策略控制器初始化完成 ===\n')
def startMarketData(self):
print('=== 三疯网格策略 启动市场数据接收 ===\n')
self.seq = xtdata.subscribe_whole_quote(['SH', 'SZ'], callback=self.onDataUpdate)
if self.seq == -1:
print('市场数据订阅失败\n')
else:
print(f'市场数据订阅成功, 订阅号={self.seq}\n')
def stopMarketData(self):
print('=== 三疯网格策略 停止市场数据接收 ===\n')
if self.seq is not None and self.seq > 0:
xtdata.unsubscribe_quote(self.seq)
def onDataUpdate(self, data):
# 遍历股池
for target in self.instrument_pool:
stock_code = target.stock_code
# 如果存在对应的StockTradeController,则调用其onDataUpdate方法
if stock_code not in self.stock_trade_ctrl:
print(f"股票代码 {stock_code} 未在交易控制器中找到,跳过处理。\n")
continue
if stock_code not in data:
print(f"股票代码 {stock_code} 未在行情数据中找到,跳过处理。\n")
continue
stock_controller: StockTradeController = self.stock_trade_ctrl[stock_code]
stock_controller.onDataUpdate(data)
def add_trade_target(self, stock_code):
try:
stock_name = getInstrumentName(stock_code)
new_target = strategy_db.TradeTarget.create(
stock_name=stock_name,
stock_code=stock_code,
current_position=0,
grid_index=0,
last_trade_price=0.0,
current_buy_price=0.0,
current_sell_price=0.0
)
new_target.save()
print(f'新增交易标的 {stock_code} {stock_name}, {new_target.id}')
# 刷新标的持仓
pos = getStockPosition(stock_code, self.xt_trader, self.account)
strategy_db.TradeTarget.update(current_position=pos).where(strategy_db.TradeTarget.stock_code == stock_code).execute()
# 更新标的池
self.instrument_pool = strategy_db.TradeTarget.select()
# 添加交易控制器
stockTradeController = StockTradeController(new_target, self.xt_trader, self.account, new_target.enabled)
self.stock_trade_ctrl[stock_code] = stockTradeController
except Exception as e:
print(f'新增交易标的失败 {stock_code} {e}')
def init_trader(self, path):
session_id = int(time.time())
self.xt_trader = XtQuantTrader(path, session_id)
self.xt_trader.start()
self.xt_trader.connect()
print(f'交易对象初始化完成, {self.xt_trader.connected}')
def init_trade_account(self, account_id, account_type):
self.account= StockAccount(account_id, account_type)
print(f'交易账号对象初始化完成, {self.account}')
def init_instrument_pool(self):
self.instrument_pool = strategy_db.TradeTarget.select()
for tradeTarget in self.instrument_pool:
stockTradeController = StockTradeController(tradeTarget, self.xt_trader, self.account, tradeTarget.enabled)
self.stock_trade_ctrl[tradeTarget.stock_code] = stockTradeController
print(f'初始化标的池初始化完成 , 共 {len(self.instrument_pool)} 个标的\n')
self.print_pool()
def print_pool(self):
print("\n=== 标的池信息 ===")
for i in range(len(self.instrument_pool)):
target: strategy_db.TradeTarget = self.instrument_pool[i]
status = "新建" if target.status == 0 else "已建初始仓"
print(f"[Index-{i}] 股票代码: {target.stock_code}-{target.stock_name} 当前持仓: {target.current_position} 网格索引: {target.grid_index+1} Price {self.grid_price[target.grid_index]} 状态: {status} 启用交易线程: {target.enabled}")
def print_position_info(self):
positions = self.xt_trader.query_stock_positions(self.account)
if positions:
print("\n=== 持仓信息 ===")
for pos in positions:
if pos.m_nVolume <=0:
continue
print(f"股票代码: {pos.stock_code}-{getInstrumentName(pos.stock_code)}")
print(f"总持仓: {pos.m_nVolume}")
print(f"可用持仓: {pos.m_nCanUseVolume}")
print(f"持仓成本: {pos.avg_price}")
print("---")
else:
print("\n当前无持仓")
def print_account_info(self):
account_info = self.xt_trader.query_stock_asset(self.account)
print(f"\n=== 账户信息 {self.account.account_id} ===")
print(f"可用资金: {account_info.m_dCash}")
print(f"总资产: {account_info.m_dTotalAsset}")
print(f"证券市值: {account_info.m_dMarketValue}")
def print_stock_orders(self):
orders = self.xt_trader.query_stock_orders(self.account, cancelable_only=True)
if orders:
print("\n=== 委托信息 ===")
for order in orders:
print(f"委托编号: {order.order_id}")
print(f"股票代码: {order.stock_code} {getInstrumentName(order.stock_code)}")
print(f"委托方向: {order.offset_flag} ")
print(f"委托价格: {order.price}")
print(f"委托数量: {order.order_volume}")
print(f"已成交数量: {order.traded_volume}")
print(f"委托状态: {order.order_status} ")
print("---")
else:
print("\n当前无委托记录")
# 初始化指定标的交易线程
def start_stock_trade(self, index: int):
tradeTarget = self.instrument_pool[index]
# check existing thread
if tradeTarget.stock_code in self.stock_trade_ctrl:
tradeController: StockTradeController = self.stock_trade_ctrl[tradeTarget.stock_code]
if tradeController.isEnabled():
print(f"标的交易控制器已存在且正在运行 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n")
else:
print(f"标的交易控制器已存在但未运行,重新启动 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n")
tradeController.enabledTrading(True)
else:
stockTradeController = StockTradeController(tradeTarget, self.xt_trader, self.account, tradeTarget.enabled)
self.stock_trade_ctrl[tradeTarget.stock_code] = stockTradeController
print(f"创建标的交易控制器 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n")
def pause_stock_trade(self, index: int):
tradeTarget = self.instrument_pool[index]
if tradeTarget.stock_code in self.stock_trade_ctrl:
tradeController: StockTradeController = self.stock_trade_ctrl[tradeTarget.stock_code]
if tradeController.isEnabled():
print(f"暂停标的交易 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n")
tradeController.enabledTrading(False)
else:
print(f"标的交易已暂停 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n")
else:
print(f"标的交易控制器不存在 {tradeTarget.stock_code} {getInstrumentName(tradeTarget.stock_code)}\n")
+9 -2
View File
@@ -17,5 +17,12 @@ class TradeTarget(BaseModel):
last_trade_price = FloatField() last_trade_price = FloatField()
current_buy_price = FloatField() current_buy_price = FloatField()
current_sell_price = FloatField() current_sell_price = FloatField()
status = IntegerField(default=0) # 1表示启用,0表示停止 status = IntegerField(default=0) # 0表示新建,1表示已建初始仓
enabled = BooleanField(default=False) # 是否启用该标的 enabled = BooleanField(default=False) # 是否启动交易线程
class TradeRecord(BaseModel):
stock_code = CharField()
trade_type = CharField() # 'buy' 或 'sell'
price = FloatField()
volume = IntegerField()
trade_time = CharField() # 可以存储为字符串格式的时间
-33
View File
@@ -1,33 +0,0 @@
import threading
import time
from strategy_db import TradeTarget
from util import getInstrumentName, getStockPosition
from xtquant import xttrader
from xtquant.xttype import StockAccount
class StockTradeThread(threading.Thread):
def __init__(self, tradeTarget: TradeTarget, xt_trader: xttrader.XtQuantTrader, account: StockAccount):
super().__init__() #必须调用父类的初始化方法
self.tradeTarget = tradeTarget
self.xt_trader = xt_trader
self.account = account
def run(self) -> None:
print(f"启动标的交易线程 {self.tradeTarget.stock_code} {getInstrumentName(self.tradeTarget.stock_code)}\n")
while True:
print('{} is running >> {}'.format(threading.current_thread().name, self.tradeTarget.stock_code))
time.sleep(2)
# Description: 程序启动后
def check_stock_position(self):
volume = getStockPosition(self.stock_code, self.xt_trader, self.account)
pass
# Description: 新标的,建基础仓
def init_stock_position(self):
pass
# Description: 双向下单
def two_way_order(self):
pass
-5
View File
@@ -17,8 +17,3 @@ def getStockPosition(stock_code: str, xt_trader: xttrader.XtQuantTrader, account
break break
return volume return volume
def interact():
"""执行后进入repl模式"""
import code
code.InteractiveConsole(locals=globals()).interact()