diff --git a/czsc/__init__.py b/czsc/__init__.py index 30ea87823..131422ce7 100644 --- a/czsc/__init__.py +++ b/czsc/__init__.py @@ -113,6 +113,8 @@ feture_cross_layering, rolling_rank, rolling_norm, + rolling_qcut, + find_most_similarity, ) __version__ = "0.9.39" diff --git a/czsc/connectors/cooperation.py b/czsc/connectors/cooperation.py index 818115430..d6840523d 100644 --- a/czsc/connectors/cooperation.py +++ b/czsc/connectors/cooperation.py @@ -4,6 +4,8 @@ email: zeng_bin8888@163.com create_dt: 2023/11/15 20:45 describe: CZSC开源协作团队内部使用数据接口 + +接口说明:https://s0cqcxuy3p.feishu.cn/wiki/F3HGw9vDPisWtSkJr1ac5DEcnNh """ import os import czsc diff --git a/czsc/utils/data_client.py b/czsc/utils/data_client.py index 93da982f7..f91e959fa 100644 --- a/czsc/utils/data_client.py +++ b/czsc/utils/data_client.py @@ -29,6 +29,10 @@ def get_url_token(url): if file_token.exists(): return open(file_token, 'r', encoding='utf-8').read() logger.warning(f"请设置 {url} 的访问凭证码,如果没有请联系管理员申请") + token = input(f"请输入 {url} 的访问凭证码(token):") + if token: + set_url_token(token, url) + return token return None diff --git a/czsc/utils/features.py b/czsc/utils/features.py index 0fdb976ef..ae6d505bc 100644 --- a/czsc/utils/features.py +++ b/czsc/utils/features.py @@ -176,12 +176,13 @@ def rolling_rank(df: pd.DataFrame, col, n=None, new_col=None, **kwargs): min_periods: int 最小计算周期 """ - min_periods = kwargs.get('min_periods', 1) + min_periods = kwargs.get('min_periods', 2) new_col = new_col if new_col else f'{col}_rank' if n is None: df[new_col] = df[col].expanding(min_periods=min_periods).rank() else: df[new_col] = df[col].rolling(window=n, min_periods=min_periods).rank() + df[new_col] = df[new_col].fillna(0) def rolling_norm(df: pd.DataFrame, col, n=None, new_col=None, **kwargs): @@ -199,10 +200,71 @@ def rolling_norm(df: pd.DataFrame, col, n=None, new_col=None, **kwargs): min_periods: int 最小计算周期 """ - min_periods = kwargs.get('min_periods', 1) + min_periods = kwargs.get('min_periods', 2) new_col = new_col if new_col else f'{col}_norm' if n is None: df[new_col] = df[col].expanding(min_periods=min_periods).apply(lambda x: (x[-1] - x.mean()) / x.std(), raw=True) else: df[new_col] = df[col].rolling(window=n, min_periods=min_periods).apply(lambda x: (x[-1] - x.mean()) / x.std(), raw=True) + df[new_col] = df[new_col].fillna(0) + + +def rolling_qcut(df: pd.DataFrame, col, n=None, new_col=None, **kwargs): + """计算序列的滚动分位数 + + :param df: pd.DataFrame + 待计算的数据 + :param col: str + 待计算的列 + :param n: int + 滚动窗口大小, 默认为None, 表示计算 expanding ,否则计算 rolling + :param new_col: str + 新列名,默认为 None, 表示使用 f'{col}_qcut' 作为新列名 + :param kwargs: + + - min_periods: int 最小计算周期 + - q: int 分位数数量 + """ + q = kwargs.get('q', 10) + min_periods = kwargs.get('min_periods', q) + new_col = new_col if new_col else f'{col}_qcut' + + def __qcut_func(x): + return pd.qcut(x, q=q, labels=False, duplicates='drop')[-1] + + if n is None: + df[new_col] = df[col].expanding(min_periods=min_periods).apply(__qcut_func, raw=True) + else: + df[new_col] = df[col].rolling(window=n, min_periods=min_periods).apply(__qcut_func, raw=True) + df[new_col] = df[new_col].fillna(-1) + + +def find_most_similarity(vector: pd.Series, matrix: pd.DataFrame, n=10, metric='cosine', **kwargs): + """寻找向量在矩阵中最相似的n个向量 + + :param vector: 1维向量, Series结构 + :param matrix: 2维矩阵, DataFrame结构, 每一列是一个向量,列名是向量的标记 + :param n: int, 返回最相似的n个向量 + :param metric: str, 计算相似度的方法, + + - From scikit-learn: ['cityblock', 'cosine', 'euclidean', 'l1', 'l2', + 'manhattan']. These metrics support sparse matrix + inputs. + ['nan_euclidean'] but it does not yet support sparse matrices. + + - From scipy.spatial.distance: ['braycurtis', 'canberra', 'chebyshev', + 'correlation', 'dice', 'hamming', 'jaccard', 'kulsinski', 'mahalanobis', + 'minkowski', 'rogerstanimoto', 'russellrao', 'seuclidean', + 'sokalmichener', 'sokalsneath', 'sqeuclidean', 'yule'] + See the documentation for scipy.spatial.distance for details on these + metrics. These metrics do not support sparse matrix inputs. + + :param kwargs: 其他参数 + """ + from sklearn.metrics.pairwise import pairwise_distances + metric = kwargs.get('metric', 'cosine') + sim = pairwise_distances(vector.values.reshape(1, -1), matrix.T, metric=metric).reshape(-1) + sim = pd.Series(sim, index=matrix.columns) + sim = sim.sort_values(ascending=False)[:n] + return sim diff --git a/czsc/utils/st_components.py b/czsc/utils/st_components.py index 4370b2005..2341ded5d 100644 --- a/czsc/utils/st_components.py +++ b/czsc/utils/st_components.py @@ -109,8 +109,9 @@ def show_sectional_ic(df, x_col, y_col, method='pearson', **kwargs): col4.dataframe(dfm.style.background_gradient(cmap='RdYlGn_r', axis=None).format('{:.4f}', na_rep='MISS'), use_container_width=True) - fig = px.histogram(df, x=x_col, marginal="box", title="因子数据分布图") - st.plotly_chart(fig, use_container_width=True) + if kwargs.get("show_factor_histgram", False): + fig = px.histogram(df, x=x_col, marginal="box", title="因子数据分布图") + st.plotly_chart(fig, use_container_width=True) def show_factor_returns(df, x_col, y_col): diff --git a/test/test_utils.py b/test/test_utils.py index d01da28be..3a5605054 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -117,3 +117,88 @@ def test_daily_performance(): result = daily_performance([0.01, 0.02, -0.01, 0.03, 0.02, -0.02, 0.01, -0.01, 0.02, 0.01]) assert result == {'年化': 2.016, '夏普': 5, '最大回撤': 0.02, '卡玛': 10, '日胜率': 0.7, '年化波动率': 0.2439, '非零覆盖': 1.0, '盈亏平衡点': 0.7, '最大新高时间': 4} + + +def test_find_most_similarity(): + from czsc.utils.features import find_most_similarity + + # 创建一个向量和一个矩阵 + vector = pd.Series(np.random.rand(10)) + matrix = pd.DataFrame(np.random.rand(10, 100)) + + # 调用函数 + result = find_most_similarity(vector, matrix, n=5, metric='cosine') + + # 检查结果的类型 + assert isinstance(result, pd.Series) + + # 检查结果的长度im + assert len(result) == 5 + + # 检查结果的索引 + assert all(isinstance(index, int) for index in result.index) + + # 检查结果的值 + assert all(0 <= value <= 1 for value in result.values) + + +def test_rolling_qcut(): + from czsc.utils.features import rolling_qcut + + # 创建一个DataFrame + df = pd.DataFrame({ + 'col1': np.random.rand(100), + }) + + # 调用函数 + rolling_qcut(df, 'col1', n=10, new_col='col1_qcut', q=5, min_periods=5) + + # 检查新列是否已添加到df + assert 'col1_qcut' in df.columns + + # 检查新列的长度 + assert len(df['col1_qcut']) == len(df['col1']) + + # 检查新列的值 + assert all(-1 <= value < 5 for value in df['col1_qcut'].dropna()) + + +def test_rolling_norm(): + from czsc.utils.features import rolling_norm + + df = pd.DataFrame({ + 'col1': np.random.rand(100), + }) + + # 调用函数 + rolling_norm(df, 'col1', n=10, new_col='col1_norm') + + # 检查新列是否已添加到df + assert 'col1_norm' in df.columns + + # 检查新列的长度 + assert len(df['col1_norm']) == len(df['col1']) + + # 检查新列的值 + assert all(-3 <= value <= 3 for value in df['col1_norm'].dropna()) + + +def test_rolling_rank(): + from czsc.utils.features import rolling_rank + + # 创建一个DataFrame + df = pd.DataFrame({ + 'col1': np.random.rand(100), + }) + + # 调用函数 + rolling_rank(df, 'col1', n=10, new_col='col1_rank') + + # 检查新列是否已添加到df + assert 'col1_rank' in df.columns + + # 检查新列的长度 + assert len(df['col1_rank']) == len(df['col1']) + + # 检查新列的值 + assert all(0 <= value <= 100 for value in df['col1_rank'].dropna())