This commit is contained in:
2025-11-17 18:10:41 +08:00
parent f499d9a413
commit f626545897
6 changed files with 425 additions and 25 deletions
+17 -10
View File
@@ -22,30 +22,37 @@ def get_config_path() -> Path:
return base_path / 'config.ini' return base_path / 'config.ini'
def create_default_config(): def save_config(miniQmtPath:str, account_no:str):
"""创建默认配置文件""" """创建默认配置文件"""
config = configparser.ConfigParser() config = configparser.ConfigParser()
config['config'] = { config['config'] = {
'miniQMTPath': r'D:/Programs/QMT/userdata_mini', 'miniQMTPath': miniQmtPath,
'account_no': '00000000', 'account_no': account_no
'log_level' : 'INFO'
} }
config_path = get_config_path() config_path = get_config_path()
with open(config_path, 'w') as configfile: with open(config_path, 'w') as configfile:
config.write(configfile) config.write(configfile)
print(f'已创建默认配置文件: {config_path}') print(f'已创建默认配置文件: {config_path}')
def initConfig(): def exist_config() -> bool:
global miniQMTPath, grid_price, grid_volume, account_no """检查配置文件是否存在"""
config_path = get_config_path()
return config_path.exists()
def initConfig() -> bool:
global miniQMTPath, account_no
# 获取配置文件路径 # 获取配置文件路径
config_path = get_config_path() config_path = get_config_path()
# 检查配置文件是否存在,不存在则创建
if not config_path.exists():
create_default_config()
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read(config_path, encoding='utf-8') config.read(config_path, encoding='utf-8')
miniQMTPath = config.get('config','miniQMTPath') miniQMTPath = config.get('config','miniQMTPath')
account_no = config.get('config','account_no') account_no = config.get('config','account_no')
# 判断miniQMTPath是否为空,并且目录是否存在
if not miniQMTPath or not Path(miniQMTPath).exists():
print('请先配置miniQMTPath')
return False
else:
return True
-9
View File
@@ -18,15 +18,6 @@ class MainWindow:
self.strategy_frames = {} self.strategy_frames = {}
# 日志面板可见性标志 # 日志面板可见性标志
self.log_visible = False self.log_visible = False
result:bool = qmtv.connect()
if not result:
messagebox.showinfo("提示", "QMT连接失败,请修改配置文件。")
self.root.destroy()
return
# 创建界面
print(f'创建界面')
self.create_ui() self.create_ui()
+259
View File
@@ -0,0 +1,259 @@
from kuanke.wizard import *
from jqdata import *
import pandas as pd
import numpy as np
# ==================== 初始化 ====================
def initialize(context):
set_params(context)
# 开启防未来函数
set_option('avoid_future_data', True)
# 用真实价格交易
set_option('use_real_price', True)
# 过滤order中低于error级别的日志
log.set_level('order', 'error')
log.set_level('system', 'error')
log.set_level('strategy', 'debug')
set_benchmark('000001.XSHG')
set_order_cost(OrderCost(open_tax=0, close_tax=0.001, open_commission=0.0002, close_commission=0.0002, min_commission=5), type='stock')
set_slippage(FixedSlippage(0.01))
run_daily(before_trading, '9:30')
# -------------------- 参数设置 --------------------
def set_params(context):
context.max_price = 6
context.min_price = 5.01
context.grid_base_min = 1 # 最小价格
context.grid_base_max = 5 # 建仓价格
context.grid_interval = 0.5 # 下跌n元加仓
context.profit_target = 0.5 # 上涨n元清仓
context.min_stocks = 10
context.max_stocks = 25
context.base_max_stocks = 25
context.max_layers = 7
context.base_position_pct = 0.15
context.max_position_pct = 0.15
context.target_usage = 0.98
context.reserve_ratio = 0.02
context.first_round_max = 10
context.add_batch_size = 3
context.add_cash_threshold = 0.4
g.stock_pool = []
g.grid_info = {}
g.monitoring_stocks = set()
g.first_round_done = False
# ==================== 盘前 ====================
def before_trading(context):
january_clear(context)
if context.current_dt.month == 1:
return
stock_pool = get_stock_pool(context)
g.stock_pool = stock_pool
g.monitoring_stocks.update([s for s in stock_pool if s not in g.grid_info])
g.first_round_done = len(g.grid_info) >= context.first_round_max
# -------------------- 股票池 --------------------
def get_stock_pool(context):
# 1. 全部 A 股(不含退市)
df_sec = get_all_securities(types=['stock'], date=context.previous_date)
codes = list(df_sec.index)
# 2. 过滤 ST、科创板、北交所
def is_valid(code):
name = df_sec.loc[code, 'display_name']
if 'ST' in name or '退' in name or 'st' in name:
return False
if code.startswith('688'): # 科创板
return False
if code.startswith('83') or code.startswith('87') or code.startswith('9'): # 北交所
return False
return True
codes = [c for c in codes if is_valid(c)]
if not codes:
return []
# 3. 过滤停牌 & 价格区间
try:
price_df = get_price(codes,
end_date=context.current_dt,
count=1,
fields=['pre_close'],
panel=False)
if price_df is None or price_df.empty:
return []
# 过滤价格区间
price_df = price_df[
(price_df['pre_close'].notna()) &
(price_df['pre_close'] >= context.min_price) &
(price_df['pre_close'] <= context.max_price)
]
valid_codes = price_df['code'].tolist()
except Exception as e:
log.error(f"获取价格数据失败: {e}")
return []
if not valid_codes:
return []
# 4. 过滤停牌(开盘价缺失)
try:
open_df = get_price(valid_codes,
end_date=context.current_dt,
count=1,
fields=['open'],
panel=False)
if open_df is None or open_df.empty:
return []
# 过滤掉开盘价为空的股票
open_df = open_df[open_df['open'].notna()]
final_codes = open_df['code'].tolist()
except Exception as e:
log.error(f"获取开盘价数据失败: {e}")
return []
return final_codes
# -------------------- 一月清仓 --------------------
def january_clear(context):
if context.current_dt.month == 1:
log.info("进入1月,执行年度清仓...")
for stock in list(context.portfolio.positions.keys()):
order_target(stock, 0)
if stock in g.grid_info:
del g.grid_info[stock]
g.monitoring_stocks.add(stock)
# ==================== 盘中 ====================
def handle_data(context, data):
if context.current_dt.month == 1:
return
manage_positions(context, data)
usage = (context.portfolio.total_value - context.portfolio.available_cash) / context.portfolio.total_value
dynamic_max = get_dynamic_max_stocks(context)
if len(g.grid_info) < dynamic_max and usage < context.target_usage:
try_build_new(context, data)
# -------------------- 动态上限 --------------------
def get_dynamic_max_stocks(context):
return context.max_stocks if g.first_round_done else context.first_round_max
# -------------------- 建仓 --------------------
def try_build_new(context, data):
position_pct = context.max_position_pct
dynamic_max = get_dynamic_max_stocks(context)
count = 0
for stock in list(g.monitoring_stocks):
if len(g.grid_info) >= dynamic_max or count >= 3:
break
price = data[stock].close
if context.grid_base_min <= price <= context.grid_base_max:
total_value = context.portfolio.total_value
stock_amount = total_value * position_pct
grid = GridInfo(price, stock_amount, context.max_layers, context.grid_interval, context.profit_target)
layer_amount = grid.get_layer_amount(0)
buy_amount = int(layer_amount / price / 100) * 100
if buy_amount > 0:
order(stock, buy_amount)
grid.add_position(price, buy_amount, 0)
g.grid_info[stock] = grid
g.monitoring_stocks.discard(stock)
count += 1
log.info(f"[建仓] {stock} 价格{price:.2f} 数量{buy_amount}")
# -------------------- 管理持仓 --------------------
def manage_positions(context, data):
for stock, grid in list(g.grid_info.items()):
price = data[stock].close
# 止盈
sellable = grid.get_sellable_positions(price)
if sellable:
for idx, pos in reversed(sellable):
order(stock, -pos['amount'])
grid.remove_position(idx)
profit = (price - pos['price']) * pos['amount']
log.info(f"[止盈] {stock} 盈利{profit:.2f}")
# 加仓
layer = grid.should_add_layer(price)
if layer is not None:
layer_amount = grid.get_layer_amount(layer)
buy_amount = int(layer_amount / price / 100) * 100
if buy_amount > 0:
order(stock, buy_amount)
grid.add_position(price, buy_amount, layer)
log.info(f"[加仓] {stock} 层级{layer} 数量{buy_amount}")
else:
log.info(f"[加仓失败] {stock} 层级{layer} 金额不足")
# 清仓
if len(grid.positions) == 0:
del g.grid_info[stock]
g.monitoring_stocks.add(stock)
log.info(f"[清仓] {stock}")
# ==================== 盘后 ====================
def after_trading_end(context):
log.info(f"持仓数:{len(g.grid_info)},监控数:{len(g.monitoring_stocks)}")
# ==================== 网格类 ====================
class GridInfo:
def __init__(self, base_price, total_amount, max_layers, interval, profit_target):
self.base_price = float(base_price)
self.total_amount = float(total_amount)
self.max_layers = int(max_layers)
self.interval = float(interval)
self.profit_target = float(profit_target)
self.layer_prices = {i: base_price - i * interval for i in range(self.max_layers)}
self.layer_weights = self._calc_weights()
self.positions = []
def _calc_weights(self):
weights = {i: 1.0 + 0.05 * i for i in range(self.max_layers)}
total = sum(list(weights.values()))
return {k: v / total for k, v in weights.items()}
def get_layer_amount(self, layer):
return self.total_amount * self.layer_weights[layer]
def add_position(self, price, amount, layer):
self.positions.append({'price': price, 'amount': amount, 'layer': layer})
def get_sellable_positions(self, current_price):
return [(i, p) for i, p in enumerate(self.positions) if current_price >= p['price'] + self.profit_target]
def remove_position(self, index):
return self.positions.pop(index)
def should_add_layer(self, current_price):
for layer in range(self.max_layers):
target = self.layer_prices[layer]
diff = abs(current_price - target)
# 获取该层级的所有持仓
layer_positions = [p for p in self.positions if p['layer'] == layer]
has_position = len(layer_positions) > 0
if diff <= 0.1 and not has_position:
return layer
return None
BIN
View File
Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 MiB

+145 -3
View File
@@ -1,11 +1,153 @@
# coding:utf-8 # coding:utf-8
from core.database import db import os
import tkinter as tk
from tkinter import ttk, filedialog, messagebox
import configparser
from core.main_ui import MainWindow from core.main_ui import MainWindow
import config as sdConstants import config as sdConstants
from core.qmt import qmtv from core.qmt import qmtv
if __name__ == '__main__': class ConfigWindow:
sdConstants.initConfig() def __init__(self, root):
self.root = root
self.root.title("系统配置")
self.root.geometry("500x250")
self.root.resizable(False, False)
# 居中显示
self.root.withdraw() # 先隐藏窗口
self.root.update_idletasks()
x = (self.root.winfo_screenwidth() // 2) - (500 // 2)
y = (self.root.winfo_screenheight() // 2) - (250 // 2)
self.root.geometry(f"500x250+{x}+{y}")
self.root.deiconify() # 再显示窗口
self.miniQMTPath = tk.StringVar()
self.account_no = tk.StringVar()
self.create_widgets()
def create_widgets(self):
# 创建主框架
main_frame = ttk.Frame(self.root, padding="20")
main_frame.pack(fill=tk.BOTH, expand=True)
# miniQMT路径配置
path_frame = ttk.Frame(main_frame)
path_frame.pack(fill=tk.X, pady=5)
path_label = ttk.Label(path_frame, text="miniQMT路径:")
path_label.pack(side=tk.LEFT)
path_entry = ttk.Entry(path_frame, textvariable=self.miniQMTPath, width=40)
path_entry.pack(side=tk.LEFT, padx=(10, 5), fill=tk.X, expand=True)
browse_btn = ttk.Button(path_frame, text="浏览", command=self.browse_folder)
browse_btn.pack(side=tk.LEFT)
# 资金账号配置
account_frame = ttk.Frame(main_frame)
account_frame.pack(fill=tk.X, pady=5)
account_label = ttk.Label(account_frame, text="资金账号:")
account_label.pack(side=tk.LEFT)
account_entry = ttk.Entry(account_frame, textvariable=self.account_no, width=40)
account_entry.pack(side=tk.LEFT, padx=(10, 0))
# 说明文本
info_label = ttk.Label(
main_frame,
text="请配置miniQMT的userdata_mini路径和资金账号\n路径示例: D:/Programs/DTQMT/userdata_mini",
foreground="gray"
)
info_label.pack(pady=10)
# 按钮框架
button_frame = ttk.Frame(main_frame)
button_frame.pack(fill=tk.X, pady=10)
save_btn = ttk.Button(button_frame, text="保存配置", command=self.save_config)
save_btn.pack(side=tk.RIGHT)
cancel_btn = ttk.Button(button_frame, text="取消", command=self.root.destroy)
cancel_btn.pack(side=tk.RIGHT, padx=(0, 10))
def browse_folder(self):
folder_selected = filedialog.askdirectory()
if folder_selected:
self.miniQMTPath.set(folder_selected)
def save_config(self):
mini_qmt_path = self.miniQMTPath.get().strip()
account_number = self.account_no.get().strip()
# 检查miniQMT路径
if not mini_qmt_path:
messagebox.showerror("错误", "请选择miniQMT路径")
return
if not os.path.exists(mini_qmt_path):
messagebox.showerror("错误", "miniQMT路径不存在")
return
# 检查账号
if not account_number:
messagebox.showerror("错误", "请输入资金账号")
return
# 保存配置
config = configparser.ConfigParser()
config['config'] = {
'miniQMTPath': mini_qmt_path.replace('\\', '/'),
'account_no': account_number,
'log_level': 'INFO'
}
config_path = sdConstants.get_config_path()
try:
with open(config_path, 'w') as configfile:
config.write(configfile)
messagebox.showinfo("成功", "配置已保存")
self.root.destroy()
except Exception as e:
messagebox.showerror("错误", f"保存配置失败: {str(e)}")
def check_and_create_config():
"""检查配置文件,如果不存在则打开配置窗口"""
root = tk.Tk()
config_window = ConfigWindow(root)
root.mainloop()
def initialize_system():
"""初始化系统"""
try:
while True:
# 初始化配置
if sdConstants.exist_config() and sdConstants.initConfig():
# 初始化qmtv
qmtv.init_qmtv() qmtv.init_qmtv()
connected = qmtv.connect()
if connected:
# 连接成功,启动主窗口
window = MainWindow() window = MainWindow()
window.run() window.run()
break
else:
option = messagebox.askokcancel("连接失败", "QMT连接失败,请检查")
if option:
check_and_create_config()
else:
break
else:
option = messagebox.askokcancel("错误", "请检查配置")
if option:
check_and_create_config()
else:
break
except Exception as e:
messagebox.showerror("错误", f"系统初始化失败: {str(e)}")
if __name__ == '__main__':
initialize_system()
+2 -1
View File
@@ -21,7 +21,7 @@ exe = EXE(
a.binaries, a.binaries,
a.datas, a.datas,
[], [],
name='starter', name='神之一手',
debug=False, debug=False,
bootloader_ignore_signals=False, bootloader_ignore_signals=False,
strip=True, # 去除调试符号 strip=True, # 去除调试符号
@@ -34,4 +34,5 @@ exe = EXE(
target_arch=None, target_arch=None,
codesign_identity=None, codesign_identity=None,
entitlements_file=None, entitlements_file=None,
icon='logo.png' # 添加图标文件
) )