Skip to content

Commit

Permalink
V0.9.43 更新一批代码 (#187)
Browse files Browse the repository at this point in the history
* 0.9.43 start coding

* 0.9.43 新增心跳时间获取

* 0.9.43 rwc 测试代码完善

* 0.9.43 update

* 0.9.43 新增 holds_performance

* 0.9.43 update

* 0.9.43 新增按持仓方向进行止损分析

* 0.9.43 update

* 0.9.43 新增压力支撑信号

* 0.9.43 fix bug

* update

* update
  • Loading branch information
zengbin93 authored Mar 2, 2024
1 parent 27013d8 commit 1593279
Show file tree
Hide file tree
Showing 18 changed files with 492 additions and 77 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pythonpackage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ name: Python package

on:
push:
branches: [ master, V0.9.42 ]
branches: [ master, V0.9.43 ]
pull_request:
branches: [ master ]

Expand Down
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,24 @@
* 已经开始用czsc库进行量化研究的朋友,欢迎[加入飞书群](https://applink.feishu.cn/client/chat/chatter/add_by_link?link_token=0bak668e-7617-452c-b935-94d2c209e6cf),快点击加入吧!
* [B站视频教程合集(持续更新...)](https://space.bilibili.com/243682308/channel/series)
* [CZSC小圈子](https://s0cqcxuy3p.feishu.cn/wiki/wikcnwXSk9mWnki1b6URPhLA2Hc)
* [CZSC代码库QA](https://zbczsc.streamlit.app/)


## 知识星球

* [CZSC小圈子(缠论、量化、专享案例)](https://s0cqcxuy3p.feishu.cn/wiki/wikcnwXSk9mWnki1b6URPhLA2Hc)

* 链接:https://wx.zsxq.com/dweb2/index/group/88851448582512
* 加入:https://t.zsxq.com/0aMSAqcgO
* 费用:100元

> **知识星球【CZSC小圈子】的定位是什么?**
> - 为仔细研读过禅师原文并且愿意使用 CZSC 库进行量化投研的朋友提供一个深入交流的平台。
> - 寻找一群有能力、有兴趣、有主见的朋友共同进行量化策略研究讨论交流。
> - 对于刚接触缠论和量化交易的新朋友,给出一些力所能及的帮助(可以在圈子中提问,必回复)。
> - 2024年,小圈子将提供一些专享内容,主要是使用 czsc 构建量化策略的优质案例。

## 项目贡献

* [择时策略研究框架](https://s0cqcxuy3p.feishu.cn/wiki/wikcnhizrtIOQakwVcZLMKJNaib)
Expand Down
14 changes: 11 additions & 3 deletions czsc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@
SignalsParser,
get_signals_config,
get_signals_freqs,

WeightBacktest,
stoploss_by_direction,
get_ensemble_weight,
long_short_equity,

RedisWeightsClient,
get_strategy_mates,
get_heartbeat_time,

OpensOptimize,
ExitsOptimize,
)
Expand Down Expand Up @@ -70,6 +76,7 @@
SignalPerformance,
daily_performance,
weekly_performance,
holds_performance,
net_value_stats,
subtract_fee,

Expand Down Expand Up @@ -109,6 +116,7 @@
show_weight_backtest,
show_ts_rolling_corr,
show_ts_self_corr,
show_stoploss_by_direction,
)

from czsc.utils.bi_info import (
Expand All @@ -131,10 +139,10 @@
is_event_feature,
)

__version__ = "0.9.42"
__version__ = "0.9.43"
__author__ = "zengbin93"
__email__ = "[email protected]"
__date__ = "20240121"
__date__ = "20240222"


def welcome():
Expand All @@ -154,4 +162,4 @@ def welcome():


if get_dir_size(home_path) > pow(1024, 3):
print(f"{home_path} 目录缓存超过1GB,请适当清理。调用 czsc.empty_cache_path 可以直接清空缓存")
print(f"{home_path} 目录缓存超过1GB,请适当清理。调用 czsc.empty_cache_path() 可以直接清空缓存")
4 changes: 3 additions & 1 deletion czsc/connectors/cooperation.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ def get_symbols(name, **kwargs):

def get_min_future_klines(code, sdt, edt, freq='1m'):
"""分段获取期货1分钟K线后合并"""
dates = pd.date_range(start=sdt, end=edt, freq='1M')
# dates = pd.date_range(start=sdt, end=edt, freq='1M')
dates = pd.date_range(start=sdt, end=edt, freq='30D')

dates = [d.strftime('%Y%m%d') for d in dates] + [sdt, edt]
dates = sorted(list(set(dates)))

Expand Down
1 change: 1 addition & 0 deletions czsc/signals/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,5 @@
zdy_macd_V230527,
zdy_dif_V230527,
zdy_dif_V230528,
pressure_support_V240222,
)
61 changes: 59 additions & 2 deletions czsc/signals/zdy.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def zdy_take_profit_V230406(cat: CzscTrader, **kwargs) -> OrderedDict:
**信号逻辑:**
多头止盈逻辑如下,反之为空头止盈逻辑:
1. 任何一笔在持仓状态下的上升笔结束,若升破前一下跌笔高点,继续持仓,如没有升破前一下跌笔高点,止盈走人。需要用两次停顿分型判断。
**信号列表:**
Expand Down Expand Up @@ -489,7 +489,7 @@ def zdy_zs_V230423(c: CZSC, **kwargs):
:param kwargs: 其他参数
- di: 倒数第 di 根 K 线
:return: 信号字典
"""
di = int(kwargs.get('di', 1))
Expand Down Expand Up @@ -1179,3 +1179,60 @@ def _find_peaks_valleys(data):
v1 = '空头远离'

return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)


def pressure_support_V240222(c: CZSC, **kwargs) -> OrderedDict:
"""支撑压力线辅助V240222
参数模板:"{freq}_D{di}高低点验证_支撑压力V240222"
**信号逻辑:**
给定窗口内,当前价格与前高前低的关系,判断当前价格的压力和支撑。以高点验证压力位为例:
1. 当前高点与前高的差值在 x 个标准差以内
2. 当前高点与前高分别在窗口的两端
3. 中间的最低价与高点的差值在 y 个标准差以外
**信号列表:**
- Signal('60分钟_D1高低点验证_支撑压力V240222_压力位_任意_任意_0')
- Signal('60分钟_D1高低点验证_支撑压力V240222_支撑位_任意_任意_0')
:param c: CZSC对象
:param kwargs: 无
:return: 信号识别结果
"""
di = int(kwargs.get('di', 1))
w = int(kwargs.get('w', 20))
assert w > 10, "参数 w 必须大于10"

freq = c.freq.value
k1, k2, k3 = f"{freq}_D{di}高低点验证_支撑压力V240222".split('_')
v1 = '其他'
if len(c.bars_raw) < w + 10:
return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)

bars = get_sub_elements(c.bars_raw, di=di, n=w)
max_high = max([x.high for x in bars])
min_low = min([x.low for x in bars])

n = int(len(bars) * 0.2)
left_bars = bars[:n]
right_bars = bars[-n:]
gap = np.std([abs(x.high - x.low) for x in bars])

if max_high - min_low < gap * 0.3 * w:
return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)

left_high = max([x.high for x in left_bars])
right_high = max([x.high for x in right_bars])
if max_high == max(left_high, right_high) and max_high - min(left_high, right_high) < gap:
v1 = '压力位'

left_low = min([x.low for x in left_bars])
right_low = min([x.low for x in right_bars])
if min_low == min(left_low, right_low) and max(left_low, right_low) - min_low < gap:
v1 = '支撑位'

return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1)
4 changes: 2 additions & 2 deletions czsc/traders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@
)
from czsc.traders.dummy import DummyBacktest
from czsc.traders.sig_parse import SignalsParser, get_signals_config, get_signals_freqs
from czsc.traders.weight_backtest import WeightBacktest, get_ensemble_weight, long_short_equity
from czsc.traders.rwc import RedisWeightsClient
from czsc.traders.weight_backtest import WeightBacktest, get_ensemble_weight, long_short_equity, stoploss_by_direction
from czsc.traders.rwc import RedisWeightsClient, get_strategy_mates, get_heartbeat_time
from czsc.traders.optimize import OpensOptimize, ExitsOptimize
114 changes: 107 additions & 7 deletions czsc/traders/rwc.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
class RedisWeightsClient:
"""策略持仓权重收发客户端"""

version = "V231112"
version = "V240225"

def __init__(self, strategy_name, redis_url=None, send_heartbeat=True, **kwargs):
def __init__(self, strategy_name, redis_url=None, connection_pool=None, send_heartbeat=True, **kwargs):
"""
:param strategy_name: str, 策略名
:param redis_url: str, redis连接字符串, 默认为None, 即从环境变量 RWC_REDIS_URL 中读取
Expand All @@ -39,7 +39,11 @@ def __init__(self, strategy_name, redis_url=None, send_heartbeat=True, **kwargs)
<https://www.iana.org/assignments/uri-schemes/prov/rediss>
- ``unix://``: creates a Unix Domain Socket connection.
:param send_heartbeat: boolean, 是否发送心跳
:param connection_pool: redis.BlockingConnectionPool, redis连接池,默认为None
如果传入了 redis_url,则会自动创建一个连接池,否则需要传入一个连接池;如果传入了连接池,则会忽略 redis_url。
:param send_heartbeat: boolean, 是否发送心跳,默认为True
如果为True,会在后台启动一个线程,每15秒向redis发送一次心跳,用于检测策略是否存活。
推荐在写入数据时设置为True,读取数据时设置为False,避免无用的心跳。
Expand All @@ -50,16 +54,25 @@ def __init__(self, strategy_name, redis_url=None, send_heartbeat=True, **kwargs)
- heartbeat_prefix: str, 心跳key的前缀,默认为 heartbeat
"""
self.strategy_name = strategy_name
self.redis_url = redis_url if redis_url else os.getenv("RWC_REDIS_URL")
self.key_prefix = kwargs.get("key_prefix", "Weights")

thread_safe_pool = redis.BlockingConnectionPool.from_url(self.redis_url, decode_responses=True)
if connection_pool:
thread_safe_pool = connection_pool
self.redis_url = connection_pool.connection_kwargs.get("url")
logger.info(f"{strategy_name} {self.key_prefix}: 使用传入的 redis 连接池")
else:
self.redis_url = redis_url if redis_url else os.getenv("RWC_REDIS_URL")
thread_safe_pool = redis.BlockingConnectionPool.from_url(self.redis_url, decode_responses=True)
logger.info(f"{strategy_name} {self.key_prefix}: 使用 REDIS_URL 创建 redis 连接池")

assert isinstance(thread_safe_pool, redis.BlockingConnectionPool), "redis连接池创建失败"

self.r = redis.Redis(connection_pool=thread_safe_pool)
self.lua_publish = RedisWeightsClient.register_lua_publish(self.r)
self.heartbeat_prefix = kwargs.get("heartbeat_prefix", "heartbeat")

if send_heartbeat:
self.heartbeat_client = redis.from_url(self.redis_url, decode_responses=True)
self.heartbeat_prefix = kwargs.get("heartbeat_prefix", "heartbeat")
self.heartbeat_client = redis.Redis(connection_pool=thread_safe_pool)
self.heartbeat_thread = threading.Thread(target=self.__heartbeat, daemon=True)
self.heartbeat_thread.start()

Expand Down Expand Up @@ -87,6 +100,12 @@ def metadata(self):
key = f'{self.key_prefix}:META:{self.strategy_name}'
return self.r.hgetall(key)

@property
def heartbeat_time(self):
"""获取策略的最近一次心跳时间"""
key = f'{self.key_prefix}:{self.heartbeat_prefix}:{self.strategy_name}'
return pd.to_datetime(self.r.get(key))

def get_last_times(self, symbols=None):
"""获取所有品种上策略最近一次发布信号的时间
Expand Down Expand Up @@ -380,3 +399,84 @@ def get_all_weights(self, sdt=None, edt=None, **kwargs) -> pd.DataFrame:
df1 = df1[df1['dt'] <= pd.to_datetime(edt)].reset_index(drop=True)
df1 = df1.sort_values(['dt', 'symbol']).reset_index(drop=True)
return df1


def get_strategy_mates(redis_url=None, connection_pool=None, key_pattern="Weights:META:*", **kwargs):
"""获取Redis中的策略元数据
:param redis_url: str, redis连接字符串, 默认为None, 即从环境变量 RWC_REDIS_URL 中读取
:param connection_pool: redis.ConnectionPool, redis连接池
:param key_pattern: str, redis中key的pattern,默认为 Weights:META:*
:param kwargs: dict, 其他参数
:return: pd.DataFrame
"""
heartbeat_prefix = kwargs.get("heartbeat_prefix", "heartbeat")

if connection_pool:
r = redis.Redis(connection_pool=connection_pool)
else:
redis_url = redis_url if redis_url else os.getenv("RWC_REDIS_URL")
r = redis.Redis.from_url(redis_url, decode_responses=True)

rows = []
for key in r.keys(key_pattern):
meta = r.hgetall(key)
if not meta:
logger.warning(f"{key} 没有策略元数据")
continue

meta['heartbeat_time'] = r.get(f"{meta['key_prefix']}:{heartbeat_prefix}:{meta['name']}")
rows.append(meta)

if len(rows) == 0:
logger.warning(f"{key_pattern} 下没有策略元数据")
return pd.DataFrame()

df = pd.DataFrame(rows)
df['update_time'] = pd.to_datetime(df['update_time'])
df['heartbeat_time'] = pd.to_datetime(df['heartbeat_time'])
df = df.sort_values('name').reset_index(drop=True)

r.close()
return df


def get_heartbeat_time(strategy_name=None, redis_url=None, connection_pool=None, key_prefix="Weights", **kwargs):
"""获取策略的最近一次心跳时间
:param strategy_name: str, 策略名,默认为None, 即获取所有策略的心跳时间
:param redis_url: str, redis连接字符串, 默认为None, 即从环境变量 RWC_REDIS_URL 中读取
:param connection_pool: redis.ConnectionPool, redis连接池
:param key_prefix: str, redis中key的前缀,默认为 Weights
:param kwargs: dict, 其他参数
- heartbeat_prefix: str, 心跳key的前缀,默认为 heartbeat
:return: str, 最近一次心跳时间
"""
if connection_pool:
r = redis.Redis(connection_pool=connection_pool)
else:
redis_url = redis_url if redis_url else os.getenv("RWC_REDIS_URL")
r = redis.Redis.from_url(redis_url, decode_responses=True)

if not strategy_name:
dfm = get_strategy_mates(redis_url=redis_url, connection_pool=connection_pool, key_pattern=f"{key_prefix}:META:*")
if len(dfm) == 0:
logger.warning(f"{key_prefix} 下没有策略元数据")
return None
strategy_names = dfm['name'].unique().tolist()
else:
strategy_names = [strategy_name]

heartbeat_prefix = kwargs.get("heartbeat_prefix", "heartbeat")
res = {}
for sn in strategy_names:
hdt = r.get(f'{key_prefix}:{heartbeat_prefix}:{sn}')
if hdt:
res[sn] = pd.to_datetime(hdt)
else:
res[sn] = None
logger.warning(f"{sn} 没有心跳时间")
r.close()
return res
Loading

0 comments on commit 1593279

Please sign in to comment.