from kuanke.wizard import * from jqdata import * import pandas as pd import numpy as np # ==================== 初始化 ==================== def initialize(context): set_params(context) # 开启防未来函数 set_option('avoid_future_data', True) # 用真实价格交易 set_option('use_real_price', True) # 过滤order中低于error级别的日志 log.set_level('order', 'error') log.set_level('system', 'error') log.set_level('strategy', 'debug') set_benchmark('000001.XSHG') set_order_cost(OrderCost(open_tax=0, close_tax=0.001, open_commission=0.0002, close_commission=0.0002, min_commission=5), type='stock') set_slippage(FixedSlippage(0.01)) run_daily(before_trading, '9:30') # -------------------- 参数设置 -------------------- def set_params(context): context.max_price = 6 context.min_price = 5.01 context.grid_base_min = 1 # 最小价格 context.grid_base_max = 5 # 建仓价格 context.grid_interval = 0.5 # 下跌n元加仓 context.profit_target = 0.5 # 上涨n元清仓 context.min_stocks = 10 context.max_stocks = 25 context.base_max_stocks = 25 context.max_layers = 7 context.base_position_pct = 0.15 context.max_position_pct = 0.15 context.target_usage = 0.98 context.reserve_ratio = 0.02 context.first_round_max = 10 context.add_batch_size = 3 context.add_cash_threshold = 0.4 g.stock_pool = [] g.grid_info = {} g.monitoring_stocks = set() g.first_round_done = False # ==================== 盘前 ==================== def before_trading(context): january_clear(context) if context.current_dt.month == 1: return stock_pool = get_stock_pool(context) g.stock_pool = stock_pool g.monitoring_stocks.update([s for s in stock_pool if s not in g.grid_info]) g.first_round_done = len(g.grid_info) >= context.first_round_max # -------------------- 股票池 -------------------- def get_stock_pool(context): # 1. 全部 A 股(不含退市) df_sec = get_all_securities(types=['stock'], date=context.previous_date) codes = list(df_sec.index) # 2. 过滤 ST、科创板、北交所 def is_valid(code): name = df_sec.loc[code, 'display_name'] if 'ST' in name or '退' in name or 'st' in name: return False if code.startswith('688'): # 科创板 return False if code.startswith('83') or code.startswith('87') or code.startswith('9'): # 北交所 return False return True codes = [c for c in codes if is_valid(c)] if not codes: return [] # 3. 过滤停牌 & 价格区间 try: price_df = get_price(codes, end_date=context.current_dt, count=1, fields=['pre_close'], panel=False) if price_df is None or price_df.empty: return [] # 过滤价格区间 price_df = price_df[ (price_df['pre_close'].notna()) & (price_df['pre_close'] >= context.min_price) & (price_df['pre_close'] <= context.max_price) ] valid_codes = price_df['code'].tolist() except Exception as e: log.error(f"获取价格数据失败: {e}") return [] if not valid_codes: return [] # 4. 过滤停牌(开盘价缺失) try: open_df = get_price(valid_codes, end_date=context.current_dt, count=1, fields=['open'], panel=False) if open_df is None or open_df.empty: return [] # 过滤掉开盘价为空的股票 open_df = open_df[open_df['open'].notna()] final_codes = open_df['code'].tolist() except Exception as e: log.error(f"获取开盘价数据失败: {e}") return [] return final_codes # -------------------- 一月清仓 -------------------- def january_clear(context): if context.current_dt.month == 1: log.info("进入1月,执行年度清仓...") for stock in list(context.portfolio.positions.keys()): order_target(stock, 0) if stock in g.grid_info: del g.grid_info[stock] g.monitoring_stocks.add(stock) # ==================== 盘中 ==================== def handle_data(context, data): if context.current_dt.month == 1: return manage_positions(context, data) usage = (context.portfolio.total_value - context.portfolio.available_cash) / context.portfolio.total_value dynamic_max = get_dynamic_max_stocks(context) if len(g.grid_info) < dynamic_max and usage < context.target_usage: try_build_new(context, data) # -------------------- 动态上限 -------------------- def get_dynamic_max_stocks(context): return context.max_stocks if g.first_round_done else context.first_round_max # -------------------- 建仓 -------------------- def try_build_new(context, data): position_pct = context.max_position_pct dynamic_max = get_dynamic_max_stocks(context) count = 0 for stock in list(g.monitoring_stocks): if len(g.grid_info) >= dynamic_max or count >= 3: break price = data[stock].close if context.grid_base_min <= price <= context.grid_base_max: total_value = context.portfolio.total_value stock_amount = total_value * position_pct grid = GridInfo(price, stock_amount, context.max_layers, context.grid_interval, context.profit_target) layer_amount = grid.get_layer_amount(0) buy_amount = int(layer_amount / price / 100) * 100 if buy_amount > 0: order(stock, buy_amount) grid.add_position(price, buy_amount, 0) g.grid_info[stock] = grid g.monitoring_stocks.discard(stock) count += 1 log.info(f"[建仓] {stock} 价格{price:.2f} 数量{buy_amount}") # -------------------- 管理持仓 -------------------- def manage_positions(context, data): for stock, grid in list(g.grid_info.items()): price = data[stock].close # 止盈 sellable = grid.get_sellable_positions(price) if sellable: for idx, pos in reversed(sellable): order(stock, -pos['amount']) grid.remove_position(idx) profit = (price - pos['price']) * pos['amount'] log.info(f"[止盈] {stock} 盈利{profit:.2f}") # 加仓 layer = grid.should_add_layer(price) if layer is not None: layer_amount = grid.get_layer_amount(layer) buy_amount = int(layer_amount / price / 100) * 100 if buy_amount > 0: order(stock, buy_amount) grid.add_position(price, buy_amount, layer) log.info(f"[加仓] {stock} 层级{layer} 数量{buy_amount}") else: log.info(f"[加仓失败] {stock} 层级{layer} 金额不足") # 清仓 if len(grid.positions) == 0: del g.grid_info[stock] g.monitoring_stocks.add(stock) log.info(f"[清仓] {stock}") # ==================== 盘后 ==================== def after_trading_end(context): log.info(f"持仓数:{len(g.grid_info)},监控数:{len(g.monitoring_stocks)}") # ==================== 网格类 ==================== class GridInfo: def __init__(self, base_price, total_amount, max_layers, interval, profit_target): self.base_price = float(base_price) self.total_amount = float(total_amount) self.max_layers = int(max_layers) self.interval = float(interval) self.profit_target = float(profit_target) self.layer_prices = {i: base_price - i * interval for i in range(self.max_layers)} self.layer_weights = self._calc_weights() self.positions = [] def _calc_weights(self): weights = {i: 1.0 + 0.05 * i for i in range(self.max_layers)} total = sum(list(weights.values())) return {k: v / total for k, v in weights.items()} def get_layer_amount(self, layer): return self.total_amount * self.layer_weights[layer] def add_position(self, price, amount, layer): self.positions.append({'price': price, 'amount': amount, 'layer': layer}) def get_sellable_positions(self, current_price): return [(i, p) for i, p in enumerate(self.positions) if current_price >= p['price'] + self.profit_target] def remove_position(self, index): return self.positions.pop(index) def should_add_layer(self, current_price): for layer in range(self.max_layers): target = self.layer_prices[layer] diff = abs(current_price - target) # 获取该层级的所有持仓 layer_positions = [p for p in self.positions if p['layer'] == layer] has_position = len(layer_positions) > 0 if diff <= 0.1 and not has_position: return layer return None