diff --git a/smartmoneyconcepts/smc.py b/smartmoneyconcepts/smc.py index 9d6e1fb..e716e3a 100644 --- a/smartmoneyconcepts/smc.py +++ b/smartmoneyconcepts/smc.py @@ -591,6 +591,175 @@ def ob( axis=1, ) + @classmethod + def ob_vec_deepseek(cls, ohlc: pd.DataFrame, swing_highs_lows: pd.DataFrame, close_mitigation: bool = False) -> pd.DataFrame: + swing_highs_lows = swing_highs_lows.copy() + ohlc_len = len(ohlc) + _open = ohlc["open"].values + _high = ohlc["high"].values + _low = ohlc["low"].values + _close = ohlc["close"].values + _volume = ohlc["volume"].values + _swing_high_low = swing_highs_lows["HighLow"].values + + crossed = np.full(ohlc_len, False, dtype=bool) + ob = np.zeros(ohlc_len, dtype=np.int32) + top = np.full(ohlc_len, np.nan, dtype=np.float32) + bottom = np.full(ohlc_len, np.nan, dtype=np.float32) + obVolume = np.full(ohlc_len, np.nan, dtype=np.float32) + lowVolume = np.full(ohlc_len, np.nan, dtype=np.float32) + highVolume = np.full(ohlc_len, np.nan, dtype=np.float32) + percentage = np.full(ohlc_len, np.nan, dtype=np.float32) + mitigated_index = np.full(ohlc_len, np.nan, dtype=np.float32) + breaker = np.full(ohlc_len, False, dtype=bool) + + # Precompute swing indices + swing_high_indices = np.where(_swing_high_low == 1)[0] + swing_low_indices = np.where(_swing_high_low == -1)[0] + + # Precompute last swing high/low indices for each position + last_swing_high_indices = np.full(ohlc_len, -1, dtype=int) + if swing_high_indices.size > 0: + insert_pos = np.searchsorted(swing_high_indices, np.arange(ohlc_len), side='right') - 1 + valid = insert_pos >= 0 + last_swing_high_indices[valid] = swing_high_indices[insert_pos[valid]] + + last_swing_low_indices = np.full(ohlc_len, -1, dtype=int) + if swing_low_indices.size > 0: + insert_pos = np.searchsorted(swing_low_indices, np.arange(ohlc_len), side='right') - 1 + valid = insert_pos >= 0 + last_swing_low_indices[valid] = swing_low_indices[insert_pos[valid]] + + # Precompute volume sums + volume_sum = np.zeros(ohlc_len) + volume_sum[2:] = _volume[2:] + _volume[1:-1] + _volume[:-2] + high_vol_bull = np.zeros(ohlc_len) + high_vol_bull[1:] = _volume[1:] + _volume[:-1] + low_vol_bull = np.zeros(ohlc_len) + low_vol_bull[2:] = _volume[:-2] + high_vol_bear = np.zeros(ohlc_len) + high_vol_bear[2:] = _volume[:-2] + low_vol_bear = np.zeros(ohlc_len) + low_vol_bear[1:] = _volume[1:] + _volume[:-1] + + for i in range(ohlc_len): + # Process existing bullish OBs + current_bull_obs = np.where(ob == 1)[0] + if current_bull_obs.size > 0: + mask_breaker_true = breaker[current_bull_obs] + invalidation_mask = mask_breaker_true & (_high[i] > top[current_bull_obs]) + invalidation_indices = current_bull_obs[invalidation_mask] + ob[invalidation_indices] = 0 + top[invalidation_indices] = np.nan + bottom[invalidation_indices] = np.nan + obVolume[invalidation_indices] = np.nan + lowVolume[invalidation_indices] = np.nan + highVolume[invalidation_indices] = np.nan + mitigated_index[invalidation_indices] = np.nan + percentage[invalidation_indices] = np.nan + + mask_breaker_false = ~mask_breaker_true + current_bull_obs_bf = current_bull_obs[mask_breaker_false] + if current_bull_obs_bf.size > 0: + if close_mitigation: + mitigation_cond = np.minimum(_open[i], _close[i]) < bottom[current_bull_obs_bf] + else: + mitigation_cond = _low[i] < bottom[current_bull_obs_bf] + mitigate_indices = current_bull_obs_bf[mitigation_cond] + breaker[mitigate_indices] = True + mitigated_index[mitigate_indices] = i + + # Process new bullish OBs + last_top_idx = last_swing_high_indices[i] + if last_top_idx != -1: + if _close[i] > _high[last_top_idx] and not crossed[last_top_idx]: + crossed[last_top_idx] = True + window_start = last_top_idx + 1 + window_end = i - 1 + if window_start <= window_end: + window_lows = _low[window_start:window_end+1] + min_idx = window_start + np.argmin(window_lows) + obBtm = _low[min_idx] + obTop = _high[min_idx] + obIndex = min_idx + + ob[obIndex] = 1 + top[obIndex] = obTop + bottom[obIndex] = obBtm + obVolume[obIndex] = volume_sum[i] + lowVolume[obIndex] = low_vol_bull[i] + highVolume[obIndex] = high_vol_bull[i] + min_vol = min(high_vol_bull[i], low_vol_bull[i]) + max_vol = max(high_vol_bull[i], low_vol_bull[i]) + percentage[obIndex] = (min_vol / max_vol * 100) if max_vol != 0 else 100 + + # Process existing bearish OBs + current_bear_obs = np.where(ob == -1)[0] + if current_bear_obs.size > 0: + mask_breaker_true = breaker[current_bear_obs] + invalidation_mask = mask_breaker_true & (_low[i] < bottom[current_bear_obs]) + invalidation_indices = current_bear_obs[invalidation_mask] + ob[invalidation_indices] = 0 + top[invalidation_indices] = np.nan + bottom[invalidation_indices] = np.nan + obVolume[invalidation_indices] = np.nan + lowVolume[invalidation_indices] = np.nan + highVolume[invalidation_indices] = np.nan + mitigated_index[invalidation_indices] = np.nan + percentage[invalidation_indices] = np.nan + + mask_breaker_false = ~mask_breaker_true + current_bear_obs_bf = current_bear_obs[mask_breaker_false] + if current_bear_obs_bf.size > 0: + if close_mitigation: + mitigation_cond = np.maximum(_open[i], _close[i]) > top[current_bear_obs_bf] + else: + mitigation_cond = _high[i] > top[current_bear_obs_bf] + mitigate_indices = current_bear_obs_bf[mitigation_cond] + breaker[mitigate_indices] = True + mitigated_index[mitigate_indices] = i + + # Process new bearish OBs + last_btm_idx = last_swing_low_indices[i] + if last_btm_idx != -1: + if _close[i] < _low[last_btm_idx] and not crossed[last_btm_idx]: + crossed[last_btm_idx] = True + window_start = last_btm_idx + 1 + window_end = i - 1 + if window_start <= window_end: + window_highs = _high[window_start:window_end+1] + max_idx = window_start + np.argmax(window_highs) + obTop = _high[max_idx] + obBtm = _low[max_idx] + obIndex = max_idx + + ob[obIndex] = -1 + top[obIndex] = obTop + bottom[obIndex] = obBtm + obVolume[obIndex] = volume_sum[i] + lowVolume[obIndex] = low_vol_bear[i] + highVolume[obIndex] = high_vol_bear[i] + min_vol = min(high_vol_bear[i], low_vol_bear[i]) + max_vol = max(high_vol_bear[i], low_vol_bear[i]) + percentage[obIndex] = (min_vol / max_vol * 100) if max_vol != 0 else 100 + + # Convert zeros to NaN for OB and related arrays + ob = np.where(ob != 0, ob, np.nan) + top = np.where(np.isnan(ob), np.nan, top) + bottom = np.where(np.isnan(ob), np.nan, bottom) + obVolume = np.where(np.isnan(ob), np.nan, obVolume) + mitigated_index = np.where(np.isnan(ob), np.nan, mitigated_index) + percentage = np.where(np.isnan(ob), np.nan, percentage) + + return pd.DataFrame({ + "OB": ob, + "Top": top, + "Bottom": bottom, + "OBVolume": obVolume, + "MitigatedIndex": mitigated_index, + "Percentage": percentage + }) + @classmethod def liquidity( cls, ohlc: DataFrame, swing_highs_lows: DataFrame, range_percent: float = 0.01 @@ -689,6 +858,150 @@ def liquidity( return pd.concat([liquidity, level, liquidity_end, liquidity_swept], axis=1) + @classmethod + def liquidity_vec_chatgpt(cls, ohlc: pd.DataFrame, swing_highs_lows: pd.DataFrame, range_percent: float = 0.01) -> pd.DataFrame: + """ + Liquidity + ----------- + Liquidity is when there are multiple highs within a small range of each other, + or multiple lows within a small range of each other. + + Parameters + ---------- + ohlc : DataFrame + Must contain at least the columns "high" and "low". + swing_highs_lows : DataFrame + The dataframe output from the swing_highs_lows function; must contain + columns "HighLow" (with 1 for swing highs and -1 for swing lows) and "Level". + range_percent : float, default 0.01 + The percentage of the range (highest high – lowest low) to determine the pip range. + + Returns + ------- + DataFrame + A DataFrame with four columns: + - Liquidity: 1 for bullish liquidity, -1 for bearish liquidity (NaN if none) + - Level: the average level of the grouped swing points (NaN if none) + - End: the index (in the OHLC data) of the last swing point in the group (NaN if none) + - Swept: the index of the candle that “swept” the liquidity level (NaN if none) + """ + + # Work on a copy so that we don’t modify the original swing_highs_lows DataFrame. + shl = swing_highs_lows.copy() + + # pip_range is the given percentage of the full price range. + pip_range = (ohlc["high"].max() - ohlc["low"].min()) * range_percent + + n = len(ohlc) + indices = np.arange(n) + + # Copy the swing indicator and level into numpy arrays. + # (We will “mark” swing points as used by setting their flag to 0.) + HL = shl["HighLow"].values.copy() + levels = shl["Level"].values.copy() + ohlc_high = ohlc["high"].values + ohlc_low = ohlc["low"].values + + # Prepare result arrays (using NaN where no liquidity is found). + # We use float arrays so that NaN can be stored. + liquidity = np.full(n, np.nan, dtype=np.float64) + liquidity_level = np.full(n, np.nan, dtype=np.float64) + liquidity_end = np.full(n, np.nan, dtype=np.float64) + liquidity_swept = np.full(n, np.nan, dtype=np.float64) + + # ----- Process bullish (high) swings ----- + # Find candidate indices for swing highs. + candidate_highs = np.where(HL == 1)[0] + for i in candidate_highs: + # Skip if this swing high has been “used” in an earlier liquidity group. + if HL[i] != 1: + continue + + high_level = levels[i] + range_low = high_level - pip_range + range_high = high_level + pip_range + + # --- Determine the “swept” candle index for this candidate --- + # Look ahead (from i+1 onward) for the first candle where the OHLC high + # is at or above the upper boundary. + cond = ohlc_high[i+1:] >= range_high + if np.any(cond): + # np.argmax returns the first True element in the sliced array. + swept_index = i + 1 + int(np.argmax(cond)) + else: + swept_index = 0 # same as the original code + + # --- Find subsequent candidate swing highs that fall in the liquidity range --- + if swept_index > 0: + valid = (indices > i) & (indices < swept_index) & (HL == 1) + else: + valid = (indices > i) & (HL == 1) + # Further require that the candidate’s level is within [range_low, range_high]. + valid = valid & (levels >= range_low) & (levels <= range_high) + valid_indices = indices[valid] + + # Form the group (including the candidate itself) + if valid_indices.size > 0: + group_levels = np.concatenate(([high_level], levels[valid_indices])) + else: + group_levels = np.array([high_level]) + + # Only if there is more than one swing in the group is there “liquidity.” + if group_levels.size > 1: + avg_level = group_levels.mean() + liquidity[i] = 1 + liquidity_level[i] = avg_level + # For liquidity_end, use the last swing index added (if any) – otherwise keep i. + liquidity_end[i] = valid_indices[-1] if valid_indices.size > 0 else i + liquidity_swept[i] = swept_index + # Mark all the swing highs that are now grouped so they are not re‐used. + HL[valid_indices] = 0 + + # ----- Process bearish (low) swings ----- + candidate_lows = np.where(HL == -1)[0] + for i in candidate_lows: + if HL[i] != -1: + continue + + low_level = levels[i] + range_low = low_level - pip_range + range_high = low_level + pip_range + + # For lows the “swept” candle is the first where the OHLC low is at or below the lower boundary. + cond = ohlc_low[i+1:] <= range_low + if np.any(cond): + swept_index = i + 1 + int(np.argmax(cond)) + else: + swept_index = 0 + + if swept_index > 0: + valid = (indices > i) & (indices < swept_index) & (HL == -1) + else: + valid = (indices > i) & (HL == -1) + valid = valid & (levels >= range_low) & (levels <= range_high) + valid_indices = indices[valid] + + if valid_indices.size > 0: + group_levels = np.concatenate(([low_level], levels[valid_indices])) + else: + group_levels = np.array([low_level]) + + if group_levels.size > 1: + avg_level = group_levels.mean() + liquidity[i] = -1 + liquidity_level[i] = avg_level + liquidity_end[i] = valid_indices[-1] if valid_indices.size > 0 else i + liquidity_swept[i] = swept_index + HL[valid_indices] = 0 + + # Convert results to Series (with names as in the original function) + liquidity_series = pd.Series(liquidity, name="Liquidity") + level_series = pd.Series(liquidity_level, name="Level") + end_series = pd.Series(liquidity_end, name="End") + swept_series = pd.Series(liquidity_swept, name="Swept") + + return pd.concat([liquidity_series, level_series, end_series, swept_series], axis=1) + @classmethod def previous_high_low(cls, ohlc: DataFrame, time_frame: str = "1D") -> Series: """