diff --git a/czsc/__init__.py b/czsc/__init__.py index 1e762b3e8..541b164a4 100644 --- a/czsc/__init__.py +++ b/czsc/__init__.py @@ -198,7 +198,8 @@ ) from czsc.eda import ( - remove_beta_effects, vwap, twap + remove_beta_effects, vwap, twap, + cross_sectional_strategy, ) diff --git a/czsc/eda.py b/czsc/eda.py index 747707bff..bd62e54bc 100644 --- a/czsc/eda.py +++ b/czsc/eda.py @@ -5,8 +5,10 @@ create_dt: 2023/2/7 13:17 describe: 用于探索性分析的函数 """ +import loguru import pandas as pd import numpy as np +from sklearn.linear_model import Ridge, LinearRegression, Lasso def vwap(price: np.array, volume: np.array, **kwargs) -> float: @@ -40,7 +42,6 @@ def remove_beta_effects(df, **kwargs): :return: DataFrame """ - from sklearn.linear_model import Ridge, LinearRegression, Lasso linear_model = kwargs.get("linear_model", "ridge") linear = { @@ -53,11 +54,15 @@ def remove_beta_effects(df, **kwargs): factor = kwargs.get("factor") betas = kwargs.get("betas") + logger = kwargs.get("logger", loguru.logger) + assert factor is not None and betas is not None, "factor 和 betas 参数必须指定" assert isinstance(betas, list), "betas 参数必须为列表" assert factor in df.columns, f"数据中不包含因子 {factor}" assert all([x in df.columns for x in betas]), f"数据中不包含全部 beta {betas}" + logger.info(f"去除 beta 对因子 {factor} 的影响, 使用 {linear_model} 模型, betas: {betas}") + rows = [] for dt, dfg in df.groupby("dt"): dfg = dfg.copy().dropna(subset=[factor] + betas) @@ -72,3 +77,43 @@ def remove_beta_effects(df, **kwargs): dfr = pd.concat(rows, ignore_index=True) return dfr + + +def cross_sectional_strategy(df, factor, **kwargs): + """根据截面因子值构建多空组合 + + :param df: pd.DataFrame, 包含因子列的数据, 必须包含 dt, symbol, factor 列 + :param factor: str, 因子列名称 + :param kwargs: + + - factor_direction: str, 因子方向,positive 或 negative + - long_num: int, 多头持仓数量 + - short_num: int, 空头持仓数量 + - logger: loguru.logger, 日志记录器 + + :return: pd.DataFrame, 包含 weight 列的数据 + """ + factor_direction = kwargs.get("factor_direction", "positive") + long_num = kwargs.get("long_num", 5) + short_num = kwargs.get("short_num", 5) + logger = kwargs.get("logger", loguru.logger) + + assert factor in df.columns, f"{factor} 不在 df 中" + assert factor_direction in ["positive", "negative"], f"factor_direction 参数错误" + + df = df.copy() + if factor_direction == "negative": + df[factor] = -df[factor] + + df['weight'] = 0 + for dt, dfg in df.groupby("dt"): + if len(dfg) < long_num + short_num: + logger.warning(f"{dt} 截面数据量过小,跳过;仅有 {len(dfg)} 条数据,需要 {long_num + short_num} 条数据") + continue + + dfa = dfg.sort_values(factor, ascending=False).head(long_num) + dfb = dfg.sort_values(factor, ascending=True).head(short_num) + df.loc[dfa.index, "weight"] = 1 / long_num + df.loc[dfb.index, "weight"] = -1 / short_num + + return df diff --git a/czsc/utils/corr.py b/czsc/utils/corr.py index e4d6d1212..3ddab4127 100644 --- a/czsc/utils/corr.py +++ b/czsc/utils/corr.py @@ -123,6 +123,10 @@ def cross_sectional_ic(df, x_col="open", y_col="n1b", method="spearman", **kwarg "IC绝对值>2%占比": 0, "累计IC回归R2": 0, "累计IC回归斜率": 0, + "月胜率": 0, + "月均值": 0, + "年胜率": 0, + "年均值": 0, } if df.empty: return df, res @@ -143,4 +147,15 @@ def cross_sectional_ic(df, x_col="open", y_col="n1b", method="spearman", **kwarg lr_ = single_linear(y=df["ic"].cumsum().to_list()) res.update({"累计IC回归R2": lr_["r2"], "累计IC回归斜率": lr_["slope"]}) + + monthly_ic = df.groupby(df["dt"].dt.strftime("%Y年%m月"))["ic"].mean().to_dict() + monthly_win_rate = len([1 for x in monthly_ic.values() if np.sign(x) == np.sign(res["IC均值"])]) / len(monthly_ic) + res["月胜率"] = round(monthly_win_rate, 4) + res["月均值"] = round(np.mean(list(monthly_ic.values())), 4) + + yearly_ic = df.groupby(df["dt"].dt.strftime("%Y年"))["ic"].mean().to_dict() + yearly_win_rate = len([1 for x in yearly_ic.values() if np.sign(x) == np.sign(res["IC均值"])]) / len(yearly_ic) + res["年胜率"] = round(yearly_win_rate, 4) + res["年均值"] = round(np.mean(list(yearly_ic.values())), 4) + return df, res