From 1593279b7d01bb4b385b7c344076ff09d1beea56 Mon Sep 17 00:00:00 2001 From: zengbin93 Date: Sat, 2 Mar 2024 08:01:12 +0800 Subject: [PATCH] =?UTF-8?q?V0.9.43=20=E6=9B=B4=E6=96=B0=E4=B8=80=E6=89=B9?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=20(#187)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 0.9.43 start coding * 0.9.43 新增心跳时间获取 * 0.9.43 rwc 测试代码完善 * 0.9.43 update * 0.9.43 新增 holds_performance * 0.9.43 update * 0.9.43 新增按持仓方向进行止损分析 * 0.9.43 update * 0.9.43 新增压力支撑信号 * 0.9.43 fix bug * update * update --- .github/workflows/pythonpackage.yml | 2 +- README.md | 16 ++- czsc/__init__.py | 14 ++- czsc/connectors/cooperation.py | 4 +- czsc/signals/__init__.py | 1 + czsc/signals/zdy.py | 61 +++++++++- czsc/traders/__init__.py | 4 +- czsc/traders/rwc.py | 114 ++++++++++++++++-- czsc/traders/weight_backtest.py | 56 +++++++++ czsc/utils/__init__.py | 2 +- czsc/utils/cache.py | 5 +- czsc/utils/st_components.py | 76 ++++++++++-- czsc/utils/stats.py | 42 ++++++- .../signals_dev/pressure_support_V240222.py | 77 ++++++++++++ examples/signals_dev/signal_match.py | 2 +- examples/test_offline/test_rwc.py | 41 ------- examples/test_offline/test_rwc_v231112.py | 51 +++++++- setup.py | 1 + 18 files changed, 492 insertions(+), 77 deletions(-) create mode 100644 examples/signals_dev/pressure_support_V240222.py delete mode 100644 examples/test_offline/test_rwc.py diff --git a/.github/workflows/pythonpackage.yml b/.github/workflows/pythonpackage.yml index 2872d9dd9..7e0256890 100644 --- a/.github/workflows/pythonpackage.yml +++ b/.github/workflows/pythonpackage.yml @@ -5,7 +5,7 @@ name: Python package on: push: - branches: [ master, V0.9.42 ] + branches: [ master, V0.9.43 ] pull_request: branches: [ master ] diff --git a/README.md b/README.md index 799014a87..a7644f9a6 100644 --- a/README.md +++ b/README.md @@ -19,10 +19,24 @@ * 已经开始用czsc库进行量化研究的朋友,欢迎[加入飞书群](https://applink.feishu.cn/client/chat/chatter/add_by_link?link_token=0bak668e-7617-452c-b935-94d2c209e6cf),快点击加入吧! * [B站视频教程合集(持续更新...)](https://space.bilibili.com/243682308/channel/series) -* [CZSC小圈子](https://s0cqcxuy3p.feishu.cn/wiki/wikcnwXSk9mWnki1b6URPhLA2Hc) * [CZSC代码库QA](https://zbczsc.streamlit.app/) +## 知识星球 + +* [CZSC小圈子(缠论、量化、专享案例)](https://s0cqcxuy3p.feishu.cn/wiki/wikcnwXSk9mWnki1b6URPhLA2Hc) + +* 链接:https://wx.zsxq.com/dweb2/index/group/88851448582512 +* 加入:https://t.zsxq.com/0aMSAqcgO +* 费用:100元 + +> **知识星球【CZSC小圈子】的定位是什么?** +> - 为仔细研读过禅师原文并且愿意使用 CZSC 库进行量化投研的朋友提供一个深入交流的平台。 +> - 寻找一群有能力、有兴趣、有主见的朋友共同进行量化策略研究讨论交流。 +> - 对于刚接触缠论和量化交易的新朋友,给出一些力所能及的帮助(可以在圈子中提问,必回复)。 +> - 2024年,小圈子将提供一些专享内容,主要是使用 czsc 构建量化策略的优质案例。 + + ## 项目贡献 * [择时策略研究框架](https://s0cqcxuy3p.feishu.cn/wiki/wikcnhizrtIOQakwVcZLMKJNaib) diff --git a/czsc/__init__.py b/czsc/__init__.py index 98f5dae6c..ef0740dfb 100644 --- a/czsc/__init__.py +++ b/czsc/__init__.py @@ -29,10 +29,16 @@ SignalsParser, get_signals_config, get_signals_freqs, + WeightBacktest, + stoploss_by_direction, get_ensemble_weight, long_short_equity, + RedisWeightsClient, + get_strategy_mates, + get_heartbeat_time, + OpensOptimize, ExitsOptimize, ) @@ -70,6 +76,7 @@ SignalPerformance, daily_performance, weekly_performance, + holds_performance, net_value_stats, subtract_fee, @@ -109,6 +116,7 @@ show_weight_backtest, show_ts_rolling_corr, show_ts_self_corr, + show_stoploss_by_direction, ) from czsc.utils.bi_info import ( @@ -131,10 +139,10 @@ is_event_feature, ) -__version__ = "0.9.42" +__version__ = "0.9.43" __author__ = "zengbin93" __email__ = "zeng_bin8888@163.com" -__date__ = "20240121" +__date__ = "20240222" def welcome(): @@ -154,4 +162,4 @@ def welcome(): if get_dir_size(home_path) > pow(1024, 3): - print(f"{home_path} 目录缓存超过1GB,请适当清理。调用 czsc.empty_cache_path 可以直接清空缓存") + print(f"{home_path} 目录缓存超过1GB,请适当清理。调用 czsc.empty_cache_path() 可以直接清空缓存") diff --git a/czsc/connectors/cooperation.py b/czsc/connectors/cooperation.py index 0c1120cb8..c2abcaa9e 100644 --- a/czsc/connectors/cooperation.py +++ b/czsc/connectors/cooperation.py @@ -87,7 +87,9 @@ def get_symbols(name, **kwargs): def get_min_future_klines(code, sdt, edt, freq='1m'): """分段获取期货1分钟K线后合并""" - dates = pd.date_range(start=sdt, end=edt, freq='1M') + # dates = pd.date_range(start=sdt, end=edt, freq='1M') + dates = pd.date_range(start=sdt, end=edt, freq='30D') + dates = [d.strftime('%Y%m%d') for d in dates] + [sdt, edt] dates = sorted(list(set(dates))) diff --git a/czsc/signals/__init__.py b/czsc/signals/__init__.py index 79d2290ce..15b1ce6d8 100644 --- a/czsc/signals/__init__.py +++ b/czsc/signals/__init__.py @@ -261,4 +261,5 @@ zdy_macd_V230527, zdy_dif_V230527, zdy_dif_V230528, + pressure_support_V240222, ) diff --git a/czsc/signals/zdy.py b/czsc/signals/zdy.py index 31620c5c5..abb46e0e4 100644 --- a/czsc/signals/zdy.py +++ b/czsc/signals/zdy.py @@ -371,7 +371,7 @@ def zdy_take_profit_V230406(cat: CzscTrader, **kwargs) -> OrderedDict: **信号逻辑:** 多头止盈逻辑如下,反之为空头止盈逻辑: - + 1. 任何一笔在持仓状态下的上升笔结束,若升破前一下跌笔高点,继续持仓,如没有升破前一下跌笔高点,止盈走人。需要用两次停顿分型判断。 **信号列表:** @@ -489,7 +489,7 @@ def zdy_zs_V230423(c: CZSC, **kwargs): :param kwargs: 其他参数 - di: 倒数第 di 根 K 线 - + :return: 信号字典 """ di = int(kwargs.get('di', 1)) @@ -1179,3 +1179,60 @@ def _find_peaks_valleys(data): v1 = '空头远离' return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + +def pressure_support_V240222(c: CZSC, **kwargs) -> OrderedDict: + """支撑压力线辅助V240222 + + 参数模板:"{freq}_D{di}高低点验证_支撑压力V240222" + + **信号逻辑:** + + 给定窗口内,当前价格与前高前低的关系,判断当前价格的压力和支撑。以高点验证压力位为例: + + 1. 当前高点与前高的差值在 x 个标准差以内 + 2. 当前高点与前高分别在窗口的两端 + 3. 中间的最低价与高点的差值在 y 个标准差以外 + + **信号列表:** + + - Signal('60分钟_D1高低点验证_支撑压力V240222_压力位_任意_任意_0') + - Signal('60分钟_D1高低点验证_支撑压力V240222_支撑位_任意_任意_0') + + :param c: CZSC对象 + :param kwargs: 无 + :return: 信号识别结果 + """ + di = int(kwargs.get('di', 1)) + w = int(kwargs.get('w', 20)) + assert w > 10, "参数 w 必须大于10" + + freq = c.freq.value + k1, k2, k3 = f"{freq}_D{di}高低点验证_支撑压力V240222".split('_') + v1 = '其他' + if len(c.bars_raw) < w + 10: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + bars = get_sub_elements(c.bars_raw, di=di, n=w) + max_high = max([x.high for x in bars]) + min_low = min([x.low for x in bars]) + + n = int(len(bars) * 0.2) + left_bars = bars[:n] + right_bars = bars[-n:] + gap = np.std([abs(x.high - x.low) for x in bars]) + + if max_high - min_low < gap * 0.3 * w: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + left_high = max([x.high for x in left_bars]) + right_high = max([x.high for x in right_bars]) + if max_high == max(left_high, right_high) and max_high - min(left_high, right_high) < gap: + v1 = '压力位' + + left_low = min([x.low for x in left_bars]) + right_low = min([x.low for x in right_bars]) + if min_low == min(left_low, right_low) and max(left_low, right_low) - min_low < gap: + v1 = '支撑位' + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) diff --git a/czsc/traders/__init__.py b/czsc/traders/__init__.py index f4dd351e9..988388935 100644 --- a/czsc/traders/__init__.py +++ b/czsc/traders/__init__.py @@ -14,6 +14,6 @@ ) from czsc.traders.dummy import DummyBacktest from czsc.traders.sig_parse import SignalsParser, get_signals_config, get_signals_freqs -from czsc.traders.weight_backtest import WeightBacktest, get_ensemble_weight, long_short_equity -from czsc.traders.rwc import RedisWeightsClient +from czsc.traders.weight_backtest import WeightBacktest, get_ensemble_weight, long_short_equity, stoploss_by_direction +from czsc.traders.rwc import RedisWeightsClient, get_strategy_mates, get_heartbeat_time from czsc.traders.optimize import OpensOptimize, ExitsOptimize diff --git a/czsc/traders/rwc.py b/czsc/traders/rwc.py index 9109f34df..142eb3279 100644 --- a/czsc/traders/rwc.py +++ b/czsc/traders/rwc.py @@ -18,9 +18,9 @@ class RedisWeightsClient: """策略持仓权重收发客户端""" - version = "V231112" + version = "V240225" - def __init__(self, strategy_name, redis_url=None, send_heartbeat=True, **kwargs): + def __init__(self, strategy_name, redis_url=None, connection_pool=None, send_heartbeat=True, **kwargs): """ :param strategy_name: str, 策略名 :param redis_url: str, redis连接字符串, 默认为None, 即从环境变量 RWC_REDIS_URL 中读取 @@ -39,7 +39,11 @@ def __init__(self, strategy_name, redis_url=None, send_heartbeat=True, **kwargs) - ``unix://``: creates a Unix Domain Socket connection. - :param send_heartbeat: boolean, 是否发送心跳 + :param connection_pool: redis.BlockingConnectionPool, redis连接池,默认为None + + 如果传入了 redis_url,则会自动创建一个连接池,否则需要传入一个连接池;如果传入了连接池,则会忽略 redis_url。 + + :param send_heartbeat: boolean, 是否发送心跳,默认为True 如果为True,会在后台启动一个线程,每15秒向redis发送一次心跳,用于检测策略是否存活。 推荐在写入数据时设置为True,读取数据时设置为False,避免无用的心跳。 @@ -50,16 +54,25 @@ def __init__(self, strategy_name, redis_url=None, send_heartbeat=True, **kwargs) - heartbeat_prefix: str, 心跳key的前缀,默认为 heartbeat """ self.strategy_name = strategy_name - self.redis_url = redis_url if redis_url else os.getenv("RWC_REDIS_URL") self.key_prefix = kwargs.get("key_prefix", "Weights") - thread_safe_pool = redis.BlockingConnectionPool.from_url(self.redis_url, decode_responses=True) + if connection_pool: + thread_safe_pool = connection_pool + self.redis_url = connection_pool.connection_kwargs.get("url") + logger.info(f"{strategy_name} {self.key_prefix}: 使用传入的 redis 连接池") + else: + self.redis_url = redis_url if redis_url else os.getenv("RWC_REDIS_URL") + thread_safe_pool = redis.BlockingConnectionPool.from_url(self.redis_url, decode_responses=True) + logger.info(f"{strategy_name} {self.key_prefix}: 使用 REDIS_URL 创建 redis 连接池") + + assert isinstance(thread_safe_pool, redis.BlockingConnectionPool), "redis连接池创建失败" + self.r = redis.Redis(connection_pool=thread_safe_pool) self.lua_publish = RedisWeightsClient.register_lua_publish(self.r) + self.heartbeat_prefix = kwargs.get("heartbeat_prefix", "heartbeat") if send_heartbeat: - self.heartbeat_client = redis.from_url(self.redis_url, decode_responses=True) - self.heartbeat_prefix = kwargs.get("heartbeat_prefix", "heartbeat") + self.heartbeat_client = redis.Redis(connection_pool=thread_safe_pool) self.heartbeat_thread = threading.Thread(target=self.__heartbeat, daemon=True) self.heartbeat_thread.start() @@ -87,6 +100,12 @@ def metadata(self): key = f'{self.key_prefix}:META:{self.strategy_name}' return self.r.hgetall(key) + @property + def heartbeat_time(self): + """获取策略的最近一次心跳时间""" + key = f'{self.key_prefix}:{self.heartbeat_prefix}:{self.strategy_name}' + return pd.to_datetime(self.r.get(key)) + def get_last_times(self, symbols=None): """获取所有品种上策略最近一次发布信号的时间 @@ -380,3 +399,84 @@ def get_all_weights(self, sdt=None, edt=None, **kwargs) -> pd.DataFrame: df1 = df1[df1['dt'] <= pd.to_datetime(edt)].reset_index(drop=True) df1 = df1.sort_values(['dt', 'symbol']).reset_index(drop=True) return df1 + + +def get_strategy_mates(redis_url=None, connection_pool=None, key_pattern="Weights:META:*", **kwargs): + """获取Redis中的策略元数据 + + :param redis_url: str, redis连接字符串, 默认为None, 即从环境变量 RWC_REDIS_URL 中读取 + :param connection_pool: redis.ConnectionPool, redis连接池 + :param key_pattern: str, redis中key的pattern,默认为 Weights:META:* + :param kwargs: dict, 其他参数 + :return: pd.DataFrame + """ + heartbeat_prefix = kwargs.get("heartbeat_prefix", "heartbeat") + + if connection_pool: + r = redis.Redis(connection_pool=connection_pool) + else: + redis_url = redis_url if redis_url else os.getenv("RWC_REDIS_URL") + r = redis.Redis.from_url(redis_url, decode_responses=True) + + rows = [] + for key in r.keys(key_pattern): + meta = r.hgetall(key) + if not meta: + logger.warning(f"{key} 没有策略元数据") + continue + + meta['heartbeat_time'] = r.get(f"{meta['key_prefix']}:{heartbeat_prefix}:{meta['name']}") + rows.append(meta) + + if len(rows) == 0: + logger.warning(f"{key_pattern} 下没有策略元数据") + return pd.DataFrame() + + df = pd.DataFrame(rows) + df['update_time'] = pd.to_datetime(df['update_time']) + df['heartbeat_time'] = pd.to_datetime(df['heartbeat_time']) + df = df.sort_values('name').reset_index(drop=True) + + r.close() + return df + + +def get_heartbeat_time(strategy_name=None, redis_url=None, connection_pool=None, key_prefix="Weights", **kwargs): + """获取策略的最近一次心跳时间 + + :param strategy_name: str, 策略名,默认为None, 即获取所有策略的心跳时间 + :param redis_url: str, redis连接字符串, 默认为None, 即从环境变量 RWC_REDIS_URL 中读取 + :param connection_pool: redis.ConnectionPool, redis连接池 + :param key_prefix: str, redis中key的前缀,默认为 Weights + :param kwargs: dict, 其他参数 + + - heartbeat_prefix: str, 心跳key的前缀,默认为 heartbeat + + :return: str, 最近一次心跳时间 + """ + if connection_pool: + r = redis.Redis(connection_pool=connection_pool) + else: + redis_url = redis_url if redis_url else os.getenv("RWC_REDIS_URL") + r = redis.Redis.from_url(redis_url, decode_responses=True) + + if not strategy_name: + dfm = get_strategy_mates(redis_url=redis_url, connection_pool=connection_pool, key_pattern=f"{key_prefix}:META:*") + if len(dfm) == 0: + logger.warning(f"{key_prefix} 下没有策略元数据") + return None + strategy_names = dfm['name'].unique().tolist() + else: + strategy_names = [strategy_name] + + heartbeat_prefix = kwargs.get("heartbeat_prefix", "heartbeat") + res = {} + for sn in strategy_names: + hdt = r.get(f'{key_prefix}:{heartbeat_prefix}:{sn}') + if hdt: + res[sn] = pd.to_datetime(hdt) + else: + res[sn] = None + logger.warning(f"{sn} 没有心跳时间") + r.close() + return res diff --git a/czsc/traders/weight_backtest.py b/czsc/traders/weight_backtest.py index ea989c8cb..9ac3b4827 100644 --- a/czsc/traders/weight_backtest.py +++ b/czsc/traders/weight_backtest.py @@ -156,6 +156,62 @@ def get_ensemble_weight(trader: CzscTrader, method: Union[AnyStr, Callable] = 'm return dfp[['dt', 'symbol', 'weight', 'price']].copy() +def stoploss_by_direction(dfw, stoploss=0.03, **kwargs): + """按持仓方向进行止损 + + :param dfw: pd.DataFrame, columns = ['dt', 'symbol', 'weight', 'price'], 持仓权重数据,其中 + + dt 为K线结束时间,必须是连续的交易时间序列,不允许有时间断层 + symbol 为合约代码, + weight 为K线结束时间对应的持仓权重,品种之间的权重是独立的,不会互相影响 + price 为结束时间对应的交易价格,可以是当前K线的收盘价,或者下一根K线的开盘价,或者未来N根K线的TWAP、VWAP等 + + 数据样例如下: + =================== ======== ======== ======= + dt symbol weight price + =================== ======== ======== ======= + 2019-01-02 09:01:00 DLi9001 0.5 961.695 + 2019-01-02 09:02:00 DLi9001 0.25 960.72 + 2019-01-02 09:03:00 DLi9001 0.25 962.669 + 2019-01-02 09:04:00 DLi9001 0.25 960.72 + 2019-01-02 09:05:00 DLi9001 0.25 961.695 + =================== ======== ======== ======= + + :param stoploss: 止损比例 + :param kwargs: 其他参数 + :return: pd.DataFrame, + columns = ['dt', 'symbol', 'weight', 'raw_weight', 'price', 'returns', + 'hold_returns', 'min_hold_returns', 'order_id', 'is_stop'] + """ + dfw = dfw.copy() + dfw['direction'] = np.sign(dfw['weight']) + dfw['raw_weight'] = dfw['weight'].copy() + assert stoploss > 0, "止损比例必须大于0" + + rows = [] + for _, dfg in dfw.groupby('symbol'): + assert isinstance(dfg, pd.DataFrame) + assert dfg['dt'].is_monotonic_increasing, "dt 必须是递增的时间序列" + dfg = dfg.sort_values('dt', ascending=True) + + # 按交易方向设置订单号 + dfg['order_id'] = dfg.groupby((dfg['direction'] != dfg['direction'].shift()).cumsum()).ngroup() + + # 按持仓权重计算收益 + dfg['n1b'] = dfg['price'].shift(-1) / dfg['price'] - 1 + dfg['returns'] = dfg['n1b'] * dfg['weight'] + dfg['hold_returns'] = dfg['returns'].groupby(dfg['order_id']).cumsum() + dfg['min_hold_returns'] = dfg.groupby('order_id')['hold_returns'].cummin() + + # 止损:同一个订单下,min_hold_returns < -stoploss时,后续weight置为0 + dfg['is_stop'] = (dfg['min_hold_returns'] < -stoploss) & (dfg['order_id'] == dfg['order_id'].shift(1)) + dfg['weight'] = np.where((dfg['is_stop'].shift(1)) & (dfg['order_id'] == dfg['order_id'].shift(1)), 0, dfg['weight']) + rows.append(dfg.copy()) + + dfw1 = pd.concat(rows, ignore_index=True) + return dfw1 + + class WeightBacktest: """持仓权重回测 diff --git a/czsc/utils/__init__.py b/czsc/utils/__init__.py index baf2140a6..1425c04e6 100644 --- a/czsc/utils/__init__.py +++ b/czsc/utils/__init__.py @@ -18,7 +18,7 @@ from .plotly_plot import KlineChart from .trade import cal_trade_price, update_nbars, update_bbars, update_tbars, risk_free_returns, resample_to_daily from .cross import CrossSectionalPerformance, cross_sectional_ranker -from .stats import daily_performance, net_value_stats, subtract_fee, weekly_performance +from .stats import daily_performance, net_value_stats, subtract_fee, weekly_performance, holds_performance from .signal_analyzer import SignalAnalyzer, SignalPerformance from .cache import home_path, get_dir_size, empty_cache_path, DiskCache, disk_cache from .index_composition import index_composition diff --git a/czsc/utils/cache.py b/czsc/utils/cache.py index 24a3ca095..e74ac0759 100644 --- a/czsc/utils/cache.py +++ b/czsc/utils/cache.py @@ -61,7 +61,7 @@ def is_found(self, k: str, suffix: str = "pkl", ttl=-1) -> bool: """ file = self.path / f"{k}.{suffix}" if not file.exists(): - logger.info(f"文件不存在, {file}") + logger.info(f"缓存文件不存在, {file}") return False if ttl > 0: @@ -70,7 +70,8 @@ def is_found(self, k: str, suffix: str = "pkl", ttl=-1) -> bool: logger.info(f"缓存文件已过期, {file}") return False - return file.exists() + logger.info(f"缓存文件已找到, {file}") + return True def get(self, k: str, suffix: str = "pkl") -> Any: """读取缓存文件 diff --git a/czsc/utils/st_components.py b/czsc/utils/st_components.py index ac8712deb..70fa5b408 100644 --- a/czsc/utils/st_components.py +++ b/czsc/utils/st_components.py @@ -18,6 +18,7 @@ def show_daily_return(df, **kwargs): - title: str,标题 - stat_hold_days: bool,是否展示持有日绩效指标,默认为 True - legend_only_cols: list,仅在图例中展示的列名 + - use_st_table: bool,是否使用 st.table 展示绩效指标,默认为 False """ if not df.index.dtype == 'datetime64[ns]': @@ -39,10 +40,8 @@ def _stats(df_, type_='持有日'): col_stats = czsc.daily_performance(df_[col]) col_stats['日收益名称'] = col stats.append(col_stats) - stats = pd.DataFrame(stats).set_index('日收益名称') - # fmt_cols = ['年化', '夏普', '最大回撤', '卡玛', '年化波动率', '非零覆盖', '日胜率', '盈亏平衡点', '新高间隔', '新高占比'] - # stats = stats.style.background_gradient(cmap='RdYlGn_r', axis=None, subset=fmt_cols).format('{:.4f}') + stats = pd.DataFrame(stats).set_index('日收益名称') stats = stats.style.background_gradient(cmap='RdYlGn_r', axis=None, subset=['年化']) stats = stats.background_gradient(cmap='RdYlGn_r', axis=None, subset=['绝对收益']) stats = stats.background_gradient(cmap='RdYlGn_r', axis=None, subset=['夏普']) @@ -54,7 +53,6 @@ def _stats(df_, type_='持有日'): stats = stats.background_gradient(cmap='RdYlGn_r', axis=None, subset=['非零覆盖']) stats = stats.background_gradient(cmap='RdYlGn', axis=None, subset=['新高间隔']) stats = stats.background_gradient(cmap='RdYlGn_r', axis=None, subset=['新高占比']) - stats = stats.format( { '盈亏平衡点': '{:.2f}', @@ -72,6 +70,8 @@ def _stats(df_, type_='持有日'): ) return stats + use_st_table = kwargs.get("use_st_table", False) + with st.container(): title = kwargs.get("title", "") if title: @@ -79,11 +79,15 @@ def _stats(df_, type_='持有日'): st.divider() st.write("交易日绩效指标") - st.dataframe(_stats(df, type_='交易日'), use_container_width=True) + # with st.expander("交易日绩效指标", expanded=True): + if use_st_table: + st.table(_stats(df, type_='交易日')) + else: + st.dataframe(_stats(df, type_='交易日'), use_container_width=True) if kwargs.get("stat_hold_days", True): - st.write("持有日绩效指标") - st.dataframe(_stats(df, type_='持有日'), use_container_width=True) + with st.expander("持有日绩效指标", expanded=False): + st.dataframe(_stats(df, type_='持有日'), use_container_width=True) df = df.cumsum() fig = px.line(df, y=df.columns.to_list(), title="日收益累计曲线") @@ -140,12 +144,21 @@ def show_correlation(df, cols=None, method='pearson', **kwargs): :param df: pd.DataFrame,数据源 :param cols: list,分析相关性的字段 :param method: str,计算相关性的方法,可选 pearson 和 spearman + :param kwargs: + + - use_st_table: bool,是否使用 st.table 展示相关性,默认为 False + - use_container_width: bool,是否使用容器宽度,默认为 True + """ cols = cols or df.columns.to_list() dfr = df[cols].corr(method=method) dfr['average'] = (dfr.sum(axis=1) - 1) / (len(cols) - 1) dfr = dfr.style.background_gradient(cmap='RdYlGn_r', axis=None).format('{:.4f}', na_rep='MISS') - st.dataframe(dfr, use_container_width=kwargs.get("use_container_width", True)) + + if kwargs.get("use_st_table", False): + st.table(dfr) + else: + st.dataframe(dfr, use_container_width=kwargs.get("use_container_width", True)) def show_sectional_ic(df, x_col, y_col, method='pearson', **kwargs): @@ -412,6 +425,7 @@ def show_splited_daily(df, ret_col, **kwargs): "过去6月": last_dt - pd.Timedelta(days=180), "过去1年": last_dt - pd.Timedelta(days=365), "今年以来": pd.to_datetime(f"{last_dt.year}-01-01"), + "成立以来": df.index[0], } rows = [] @@ -627,3 +641,49 @@ def show_ts_self_corr(df, col, **kwargs): df.dropna(subset=[f"{col}_lag{n}"], inplace=True) show_ts_rolling_corr(df, col, f"{col}_lag{n}", min_periods=min_periods, window=window, corr_method=corr_method) + + +def show_stoploss_by_direction(dfw, **kwargs): + """按方向止损分析的展示 + + :param dfw: pd.DataFrame, 包含权重数据 + :param kwargs: dict, 其他参数 + + - stoploss: float, 止损比例 + - show_detail: bool, 是否展示详细信息 + - digits: int, 价格小数位数, 默认2 + - fee_rate: float, 手续费率, 默认0.0002 + + :return: None + """ + dfw = dfw.copy() + stoploss = kwargs.pop('stoploss', 0.08) + dfw1 = czsc.stoploss_by_direction(dfw, stoploss=stoploss) + + # 找出逐笔止损点 + rows = [] + for symbol, dfg in dfw1.groupby('symbol'): + for order_id, dfg1 in dfg.groupby('order_id'): + if dfg1['is_stop'].any(): + row = { + 'symbol': symbol, + 'order_id': order_id, + '交易方向': '多头' if dfg1['weight'].iloc[0] > 0 else '空头', + '开仓时间': dfg1['dt'].iloc[0], + '平仓时间': dfg1['dt'].iloc[-1], + '平仓收益': dfg1['hold_returns'].iloc[-1], + '止损时间': dfg1[dfg1['is_stop']]['dt'].iloc[0], + '止损收益': dfg1[dfg1['is_stop']]['hold_returns'].iloc[0], + } + rows.append(row) + dfr = pd.DataFrame(rows) + with st.expander("逐笔止损点", expanded=False): + st.dataframe(dfr, use_container_width=True) + + if kwargs.pop("show_detail", False): + cols = ['dt', 'symbol', 'raw_weight', 'weight', 'price', 'hold_returns', 'min_hold_returns', 'returns', 'order_id', 'is_stop'] + dfs = dfw1[dfw1['is_stop']][cols].copy() + with st.expander("止损点详情", expanded=False): + st.dataframe(dfs, use_container_width=True) + + czsc.show_weight_backtest(dfw1[['dt', 'symbol', 'weight', 'price']].copy(), **kwargs) diff --git a/czsc/utils/stats.py b/czsc/utils/stats.py index 86434e48b..fad156a64 100644 --- a/czsc/utils/stats.py +++ b/czsc/utils/stats.py @@ -296,12 +296,14 @@ def evaluate_pairs(pairs: pd.DataFrame, trade_dir: str = "多空") -> dict: "持仓K线数": 0, } - if trade_dir in ["多头", "空头"]: - pairs = pairs[pairs["交易方向"] == trade_dir] - if len(pairs) == 0: return p + if trade_dir in ["多头", "空头"]: + pairs = pairs[pairs["交易方向"] == trade_dir] + if len(pairs) == 0: + return p + pairs = pairs.to_dict(orient='records') p['交易次数'] = len(pairs) p["盈亏平衡点"] = round(cal_break_even_point([x['盈亏比例'] for x in pairs]), 4) @@ -327,3 +329,37 @@ def evaluate_pairs(pairs: pd.DataFrame, trade_dir: str = "多空") -> dict: p["单笔盈亏比"] = round(p["单笔盈利"] / abs(p["单笔亏损"]), 4) return p + + +def holds_performance(df, **kwargs): + """组合持仓权重表现 + + :param df: pd.DataFrame, columns=['dt', 'symbol', 'weight', 'n1b'] + 数据说明,dt: 交易时间,symbol: 标的代码,weight: 权重,n1b: 名义收益率 + 必须是每个时间点都有数据,如果某个时间点没有数据,可以增加一行数据,权重为0 + :param kwargs: + + - fee: float, 单边费率,BP + - digits: int, 保留小数位数 + + :return: pd.DataFrame, columns=['date', 'change', 'edge_pre_fee', 'cost', 'edge_post_fee'] + """ + fee = kwargs.get('fee', 15) + digits = kwargs.get('digits', 2) # 保留小数位数 + + df = df.copy() + df['weight'] = df['weight'].round(digits) + df = df.sort_values(['dt', 'symbol']).reset_index(drop=True) + + dft = pd.pivot_table(df, index='dt', columns='symbol', values='weight', aggfunc='sum').fillna(0) + df_turns = dft.diff().abs().sum(axis=1).reset_index() + df_turns.columns = ['date', 'change'] + sdt = df['dt'].min() + df_turns.loc[(df_turns['date'] == sdt), 'change'] = df[df['dt'] == sdt]['weight'].sum() + + df_edge = df.groupby('dt').apply(lambda x: (x['weight'] * x['n1b']).sum()).reset_index() + df_edge.columns = ['date', 'edge_pre_fee'] + dfr = pd.merge(df_turns, df_edge, on='date', how='left') + dfr['cost'] = dfr['change'] * fee / 10000 # 换手成本 + dfr['edge_post_fee'] = dfr['edge_pre_fee'] - dfr['cost'] # 净收益 + return dfr diff --git a/examples/signals_dev/pressure_support_V240222.py b/examples/signals_dev/pressure_support_V240222.py new file mode 100644 index 000000000..28e5dcba7 --- /dev/null +++ b/examples/signals_dev/pressure_support_V240222.py @@ -0,0 +1,77 @@ +import numpy as np +from collections import OrderedDict +from czsc.analyze import CZSC +from czsc.utils import create_single_signal, get_sub_elements +from loguru import logger as log + + +def pressure_support_V240222(c: CZSC, **kwargs) -> OrderedDict: + """支撑压力线辅助V240222 + + 参数模板:"{freq}_D{di}高低点验证_支撑压力V240222" + + **信号逻辑:** + + 给定窗口内,当前价格与前高前低的关系,判断当前价格的压力和支撑。以高点验证压力位为例: + + 1. 当前高点与前高的差值在 x 个标准差以内 + 2. 当前高点与前高分别在窗口的两端 + 3. 中间的最低价与高点的差值在 y 个标准差以外 + + **信号列表:** + + - Signal('60分钟_D1高低点验证_支撑压力V240222_压力位_任意_任意_0') + - Signal('60分钟_D1高低点验证_支撑压力V240222_支撑位_任意_任意_0') + + :param c: CZSC对象 + :param kwargs: 无 + :return: 信号识别结果 + """ + di = int(kwargs.get('di', 1)) + w = int(kwargs.get('w', 20)) + assert w > 10, "参数 w 必须大于10" + + freq = c.freq.value + k1, k2, k3 = f"{freq}_D{di}高低点验证_支撑压力V240222".split('_') + v1 = '其他' + if len(c.bars_raw) < w + 10: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + bars = get_sub_elements(c.bars_raw, di=di, n=w) + max_high = max([x.high for x in bars]) + min_low = min([x.low for x in bars]) + + n = int(len(bars) * 0.2) + left_bars = bars[:n] + right_bars = bars[-n:] + gap = np.std([abs(x.high - x.low) for x in bars]) + + if max_high - min_low < gap * 0.3 * w: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + left_high = max([x.high for x in left_bars]) + right_high = max([x.high for x in right_bars]) + if max_high == max(left_high, right_high) and max_high - min(left_high, right_high) < gap: + v1 = '压力位' + + left_low = min([x.low for x in left_bars]) + right_low = min([x.low for x in right_bars]) + if min_low == min(left_low, right_low) and max(left_low, right_low) - min_low < gap: + v1 = '支撑位' + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + +def check(): + from czsc.connectors import research + from czsc.traders.base import check_signals_acc + + symbols = research.get_symbols('A股主要指数') + bars = research.get_raw_bars(symbols[0], '15分钟', '20181101', '20210101', fq='前复权') + + signals_config = [{'name': pressure_support_V240222, 'freq': "60分钟"}] + check_signals_acc(bars, signals_config=signals_config, height='780px', delta_days=5) # type: ignore + + +if __name__ == '__main__': + check() diff --git a/examples/signals_dev/signal_match.py b/examples/signals_dev/signal_match.py index 9132efa6d..6270c5377 100644 --- a/examples/signals_dev/signal_match.py +++ b/examples/signals_dev/signal_match.py @@ -44,7 +44,7 @@ conf = sp.parse(signals_seq) parsed_name = {x['name'] for x in conf} print(f"total signal functions: {len(sp.sig_name_map)}; parsed: {len(parsed_name)}") - # total signal functions: 197; parsed: 197 + # total signal functions: 201; parsed: 201 # 测试信号配置生成信号 from czsc import generate_czsc_signals, get_signals_freqs, get_signals_config diff --git a/examples/test_offline/test_rwc.py b/examples/test_offline/test_rwc.py deleted file mode 100644 index 68d64e39c..000000000 --- a/examples/test_offline/test_rwc.py +++ /dev/null @@ -1,41 +0,0 @@ -import sys -sys.path.insert(0, ".") -sys.path.insert(0, "..") - -import czsc -import pandas as pd - -assert czsc.RedisWeightsClient.version == "V231005" - -redis_url = 'redis://20.205.5.**:9103/1' -rwc = czsc.RedisWeightsClient('test', redis_url, key_prefix='WeightsA') - -# 如果需要清理 redis 中的数据,执行 -# rwc.clear_all() - -# 首次写入,建议设置一些策略元数据 -rwc.set_metadata(description='测试策略:仅用于读写redis测试', base_freq='1分钟', author='ZB', outsample_sdt='20220101') -print(rwc.metadata) - -rwc.set_metadata(description='测试策略:仅用于读写redis测试', base_freq='1分钟', author='ZB', - outsample_sdt='20220101', overwrite=True) -print(rwc.metadata) - -# 写入策略持仓权重,样例数据下载:https://s0cqcxuy3p.feishu.cn/wiki/Pf1fw1woQi4iJikbKJmcYToznxb -weights = pd.read_feather(r"C:\Users\zengb\Downloads\weight_example.feather") - -# 写入单条数据 -rwc.publish(**weights.iloc[0].to_dict()) - -# 批量写入整个dataframe;样例超300万行,写入耗时约5分钟 -rwc.publish_dataframe(weights, overwrite=False, batch_size=1000000) - -# 获取redis中该策略有持仓权重的品种列表 -symbols = rwc.get_symbols() -print(symbols) - -# 获取指定品种在某个时间段的持仓权重数据 -dfw = rwc.get_hist_weights('ZZSF9001', '20210101', '20230101') - -# 获取所有品种最近一个时间的持仓权重 -dfr = rwc.get_last_weights(symbols=symbols) diff --git a/examples/test_offline/test_rwc_v231112.py b/examples/test_offline/test_rwc_v231112.py index bcb275916..9223c0a31 100644 --- a/examples/test_offline/test_rwc_v231112.py +++ b/examples/test_offline/test_rwc_v231112.py @@ -7,20 +7,24 @@ sys.path.insert(0, "..") import czsc +import redis import pandas as pd -assert czsc.RedisWeightsClient.version == "V231112" +# assert czsc.RedisWeightsClient.version == "V231112" print(os.getenv("RWC_REDIS_URL")) +connection_pool = redis.BlockingConnectionPool.from_url(os.getenv("RWC_REDIS_URL"), decode_responses=True) + def test_writer(): - rwc = czsc.RedisWeightsClient('STK004_100', key_prefix='WeightsA', send_heartbeat=True) + rwc = czsc.RedisWeightsClient('STK004_100', connection_pool=connection_pool, key_prefix='WeightsA', send_heartbeat=True) # 首次写入,建议设置一些策略元数据 rwc.set_metadata(description='测试策略:仅用于读写redis测试', base_freq='日线', author='测试', outsample_sdt='20220101') print(rwc.metadata) - dfw = pd.read_csv(r"C:\Users\zengb\Desktop\weights_example.csv") + # 写入策略持仓权重,样例数据下载:https://s0cqcxuy3p.feishu.cn/wiki/Pf1fw1woQi4iJikbKJmcYToznxb + dfw = pd.read_feather(r"C:\Users\zengb\Downloads\weight_example.feather") # 写入单条数据 rwc.publish(**dfw.iloc[0].to_dict()) @@ -34,8 +38,26 @@ def test_writer(): def test_reader(): # 读取redis中的数据:send_heartbeat 推荐设置为 False,否则导致心跳数据异常 - rwc = czsc.RedisWeightsClient('STK002_100', key_prefix='WeightsA', send_heartbeat=False) + rwc = czsc.RedisWeightsClient('STK004_100', connection_pool=connection_pool, key_prefix='WeightsA', send_heartbeat=False) + symbols = rwc.get_symbols() + print(rwc.heartbeat_time, symbols) + + # 读取单个品种的持仓历史 + df = rwc.get_hist_weights('000001.SZ', '20170101', '20230101') + + # 读取所有品种最近一个时间的持仓权重,忽略权重为0的品种 + df1 = rwc.get_last_weights(ignore_zero=True) + + # 读取策略全部持仓权重历史 + dfa1 = rwc.get_all_weights() + dfa2 = rwc.get_all_weights(ignore_zero=False) + + +def test_reader_by_url(): + # 读取redis中的数据:send_heartbeat 推荐设置为 False,否则导致心跳数据异常 + rwc = czsc.RedisWeightsClient('STK004_100', key_prefix='WeightsA', send_heartbeat=False) symbols = rwc.get_symbols() + print(rwc.heartbeat_time, symbols) # 读取单个品种的持仓历史 df = rwc.get_hist_weights('000001.SZ', '20170101', '20230101') @@ -46,3 +68,24 @@ def test_reader(): # 读取策略全部持仓权重历史 dfa1 = rwc.get_all_weights() dfa2 = rwc.get_all_weights(ignore_zero=False) + + +def test_metas(): + from czsc.traders.rwc import get_strategy_mates + mates = get_strategy_mates(connection_pool=connection_pool, key_pattern="WeightsA:META:*") + print(mates) + + mates = get_strategy_mates(key_pattern="WeightsA:META:*") + print(mates) + + +def test_heartbeat(): + from czsc.traders.rwc import get_heartbeat_time + + # 获取指定策略的心跳时间 + hb = get_heartbeat_time(strategy_name="MKT001", connection_pool=connection_pool, key_prefix="WeightsA") + print(hb) + + # 获取所有策略的心跳时间 + hb = get_heartbeat_time(connection_pool=connection_pool, key_prefix="WeightsA") + print(hb) diff --git a/setup.py b/setup.py index 0785ed18e..f7cc9dea9 100644 --- a/setup.py +++ b/setup.py @@ -34,6 +34,7 @@ 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', ], entry_points={ 'console_scripts': [