# -*- coding: utf-8 -*- """ 掘金量化策略示例 包含多种经典量化策略的实现 """ from gm.api import * import numpy as np import pandas as pd from datetime import datetime # 设置token(需要替换为实际的token) set_token('your_token_here') # 策略1: 双均线策略 def init(context): """初始化函数""" # 设置标的股票 context.symbol = 'SZSE.000001' # 平安银行 # 设置均线周期 context.short_period = 5 context.long_period = 20 # 设置交易参数 context.order_size = 100 # 每次交易股数 print('策略初始化完成') def on_bar(context, bars): """K线触发函数""" # 获取历史数据 recent_bars = history(context.symbol, frequency='1d', count=context.long_period + 1, end_time=bars[0].bob, df=True) if len(recent_bars) < context.long_period + 1: return # 计算均线 short_ma = recent_bars['close'].iloc[-context.short_period:].mean() long_ma = recent_bars['close'].iloc[-context.long_period:].mean() # 获取当前持仓 position = context.account().position(symbol=context.symbol, side=PositionSide_Long) # 交易逻辑 if short_ma > long_ma and position is None: # 金叉,买入 order_volume(symbol=context.symbol, volume=context.order_size, side=OrderSide_Buy, order_type=OrderType_Market, position_effect=PositionEffect_Open) print(f'买入信号: 短期均线{short_ma:.2f} > 长期均线{long_ma:.2f}') elif short_ma < long_ma and position is not None: # 死叉,卖出 order_volume(symbol=context.symbol, volume=position.volume, side=OrderSide_Sell, order_type=OrderType_Market, position_effect=PositionEffect_Close) print(f'卖出信号: 短期均线{short_ma:.2f} < 长期均线{long_ma:.2f}') # 策略2: 布林带均值回归策略 class BollingerBandStrategy: def __init__(self, symbol, period=20, std_dev=2, order_size=100): self.symbol = symbol self.period = period self.std_dev = std_dev self.order_size = order_size def calculate_bollinger_bands(self, data): """计算布林带""" sma = data['close'].rolling(window=self.period).mean() std = data['close'].rolling(window=self.period).std() upper_band = sma + (std * self.std_dev) lower_band = sma - (std * self.std_dev) return upper_band, lower_band, sma def generate_signals(self, context, bars): """生成交易信号""" # 获取历史数据 data = history(self.symbol, frequency='1d', count=self.period + 1, end_time=bars[0].bob, df=True) if len(data) < self.period + 1: return # 计算布林带 upper_band, lower_band, sma = self.calculate_bollinger_bands(data) # 获取当前价格 current_price = bars[0].close # 获取当前持仓 position = context.account().position(symbol=self.symbol, side=PositionSide_Long) # 交易信号 if current_price <= lower_band.iloc[-1] and position is None: # 价格触及下轨,买入 order_volume(symbol=self.symbol, volume=self.order_size, side=OrderSide_Buy, order_type=OrderType_Market, position_effect=PositionEffect_Open) print(f'布林带买入: 价格{current_price:.2f} <= 下轨{lower_band.iloc[-1]:.2f}') elif current_price >= upper_band.iloc[-1] and position is not None: # 价格触及上轨,卖出 order_volume(symbol=self.symbol, volume=position.volume, side=OrderSide_Sell, order_type=OrderType_Market, position_effect=PositionEffect_Close) print(f'布林带卖出: 价格{current_price:.2f} >= 上轨{upper_band.iloc[-1]:.2f}') # 策略3: RSI超买超卖策略 class RSIStrategy: def __init__(self, symbol, period=14, overbought=70, oversold=30, order_size=100): self.symbol = symbol self.period = period self.overbought = overbought self.oversold = oversold self.order_size = order_size def calculate_rsi(self, data): """计算RSI指标""" delta = data['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=self.period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=self.period).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) return rsi def generate_signals(self, context, bars): """生成交易信号""" # 获取历史数据 data = history(self.symbol, frequency='1d', count=self.period + 1, end_time=bars[0].bob, df=True) if len(data) < self.period + 1: return # 计算RSI rsi = self.calculate_rsi(data) current_rsi = rsi.iloc[-1] # 获取当前持仓 position = context.account().position(symbol=self.symbol, side=PositionSide_Long) # 交易信号 if current_rsi <= self.oversold and position is None: # RSI超卖,买入 order_volume(symbol=self.symbol, volume=self.order_size, side=OrderSide_Buy, order_type=OrderType_Market, position_effect=PositionEffect_Open) print(f'RSI买入: RSI={current_rsi:.2f} <= 超卖线{self.oversold}') elif current_rsi >= self.overbought and position is not None: # RSI超买,卖出 order_volume(symbol=self.symbol, volume=position.volume, side=OrderSide_Sell, order_type=OrderType_Market, position_effect=PositionEffect_Close) print(f'RSI卖出: RSI={current_rsi:.2f} >= 超买线{self.overbought}') # 策略4: 多因子选股策略 class MultiFactorStrategy: def __init__(self, symbols, factors, order_size=100): self.symbols = symbols self.factors = factors self.order_size = order_size self.position_dict = {} def get_fundamental_data(self, symbols, date): """获取基本面数据""" # 这里需要根据实际的财务数据API来获取数据 # 示例返回模拟数据 fundamental_data = {} for symbol in symbols: fundamental_data[symbol] = { 'pe': np.random.uniform(10, 30), # 市盈率 'pb': np.random.uniform(1, 5), # 市净率 'roe': np.random.uniform(5, 20), # 净资产收益率 'growth': np.random.uniform(0, 50) # 营收增长率 } return fundamental_data def calculate_factor_scores(self, fundamental_data): """计算因子得分""" scores = {} for symbol, data in fundamental_data.items(): score = 0 # PE因子得分(越低越好) if data['pe'] < 15: score += 25 elif data['pe'] < 25: score += 15 else: score += 5 # PB因子得分(越低越好) if data['pb'] < 2: score += 25 elif data['pb'] < 3: score += 15 else: score += 5 # ROE因子得分(越高越好) if data['roe'] > 15: score += 25 elif data['roe'] > 10: score += 15 else: score += 5 # 增长率因子得分(越高越好) if data['growth'] > 30: score += 25 elif data['growth'] > 15: score += 15 else: score += 5 scores[symbol] = score return scores def select_stocks(self, context, date): """选股逻辑""" # 获取基本面数据 fundamental_data = self.get_fundamental_data(self.symbols, date) # 计算因子得分 scores = self.calculate_factor_scores(fundamental_data) # 排序并选择得分最高的股票 sorted_stocks = sorted(scores.items(), key=lambda x: x[1], reverse=True) selected_stocks = [stock[0] for stock in sorted_stocks[:5]] # 选择前5名 return selected_stocks def rebalance_portfolio(self, context, selected_stocks): """调仓""" # 获取当前持仓 current_positions = context.account().positions() # 卖出不在选中列表中的股票 for position in current_positions: if position.symbol not in selected_stocks: order_volume(symbol=position.symbol, volume=position.volume, side=OrderSide_Sell, order_type=OrderType_Market, position_effect=PositionEffect_Close) print(f'卖出: {position.symbol}') # 买入选中的股票 for symbol in selected_stocks: position = context.account().position(symbol=symbol, side=PositionSide_Long) if position is None: order_volume(symbol=symbol, volume=self.order_size, side=OrderSide_Buy, order_type=OrderType_Market, position_effect=PositionEffect_Open) print(f'买入: {symbol}') # 策略5: 网格交易策略 class GridTradingStrategy: def __init__(self, symbol, grid_size=0.02, grid_levels=10, order_size=100): self.symbol = symbol self.grid_size = grid_size # 网格大小(百分比) self.grid_levels = grid_levels # 网格层数 self.order_size = order_size self.grids = [] # 存储网格价格 self.positions = {} # 存储网格持仓 def initialize_grids(self, current_price): """初始化网格""" self.grids = [] for i in range(-self.grid_levels // 2, self.grid_levels // 2 + 1): grid_price = current_price * (1 + i * self.grid_size) self.grids.append(grid_price) print(f'网格初始化完成,价格范围: {min(self.grids):.2f} - {max(self.grids):.2f}') def generate_signals(self, context, bars): """生成网格交易信号""" current_price = bars[0].close # 如果网格未初始化,则初始化 if not self.grids: self.initialize_grids(current_price) return # 检查每个网格 for i, grid_price in enumerate(self.grids): # 检查是否触发买入信号(价格跌破网格) if current_price <= grid_price * (1 - self.grid_size / 2): if i not in self.positions: order_volume(symbol=self.symbol, volume=self.order_size, side=OrderSide_Buy, order_type=OrderType_Market, position_effect=PositionEffect_Open) self.positions[i] = self.order_size print(f'网格买入: 价格={current_price:.2f}, 网格={grid_price:.2f}') # 检查是否触发卖出信号(价格上涨到上一网格) elif current_price >= grid_price * (1 + self.grid_size / 2): if i in self.positions: order_volume(symbol=self.symbol, volume=self.positions[i], side=OrderSide_Sell, order_type=OrderType_Market, position_effect=PositionEffect_Close) del self.positions[i] print(f'网格卖出: 价格={current_price:.2f}, 网格={grid_price:.2f}') # 主函数:运行策略 if __name__ == '__main__': ''' strategy_id策略ID, 由系统生成 filename文件名, 请与本文件名保持一致 mode运行模式, 实时模式:MODE_LIVE回测模式:MODE_BACKTEST token绑定计算机的ID, 可在系统设置-密钥管理中生成 backtest_start_time回测开始时间 backtest_end_time回测结束时间 backtest_adjust股票复权方式, 不复权:ADJUST_NONE前复权:ADJUST_PREV后复权:ADJUST_POST backtest_initial_cash回测初始资金 backtest_commission_ratio回测佣金比例 backtest_slippage_ratio回测滑点比例 ''' run(strategy_id='your_strategy_id', filename='策略示例.py', mode=MODE_BACKTEST, token='your_token_here', backtest_start_time='2020-01-01 08:00:00', backtest_end_time='2020-12-31 16:00:00', backtest_adjust=ADJUST_PREV, backtest_initial_cash=100000, backtest_commission_ratio=0.0001, backtest_slippage_ratio=0.0001)