Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimized methods ob and liquidity using LLMs #71

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
313 changes: 313 additions & 0 deletions smartmoneyconcepts/smc.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,175 @@ def ob(
axis=1,
)

@classmethod
def ob_vec_deepseek(cls, ohlc: pd.DataFrame, swing_highs_lows: pd.DataFrame, close_mitigation: bool = False) -> pd.DataFrame:
swing_highs_lows = swing_highs_lows.copy()
ohlc_len = len(ohlc)
_open = ohlc["open"].values
_high = ohlc["high"].values
_low = ohlc["low"].values
_close = ohlc["close"].values
_volume = ohlc["volume"].values
_swing_high_low = swing_highs_lows["HighLow"].values

crossed = np.full(ohlc_len, False, dtype=bool)
ob = np.zeros(ohlc_len, dtype=np.int32)
top = np.full(ohlc_len, np.nan, dtype=np.float32)
bottom = np.full(ohlc_len, np.nan, dtype=np.float32)
obVolume = np.full(ohlc_len, np.nan, dtype=np.float32)
lowVolume = np.full(ohlc_len, np.nan, dtype=np.float32)
highVolume = np.full(ohlc_len, np.nan, dtype=np.float32)
percentage = np.full(ohlc_len, np.nan, dtype=np.float32)
mitigated_index = np.full(ohlc_len, np.nan, dtype=np.float32)
breaker = np.full(ohlc_len, False, dtype=bool)

# Precompute swing indices
swing_high_indices = np.where(_swing_high_low == 1)[0]
swing_low_indices = np.where(_swing_high_low == -1)[0]

# Precompute last swing high/low indices for each position
last_swing_high_indices = np.full(ohlc_len, -1, dtype=int)
if swing_high_indices.size > 0:
insert_pos = np.searchsorted(swing_high_indices, np.arange(ohlc_len), side='right') - 1
valid = insert_pos >= 0
last_swing_high_indices[valid] = swing_high_indices[insert_pos[valid]]

last_swing_low_indices = np.full(ohlc_len, -1, dtype=int)
if swing_low_indices.size > 0:
insert_pos = np.searchsorted(swing_low_indices, np.arange(ohlc_len), side='right') - 1
valid = insert_pos >= 0
last_swing_low_indices[valid] = swing_low_indices[insert_pos[valid]]

# Precompute volume sums
volume_sum = np.zeros(ohlc_len)
volume_sum[2:] = _volume[2:] + _volume[1:-1] + _volume[:-2]
high_vol_bull = np.zeros(ohlc_len)
high_vol_bull[1:] = _volume[1:] + _volume[:-1]
low_vol_bull = np.zeros(ohlc_len)
low_vol_bull[2:] = _volume[:-2]
high_vol_bear = np.zeros(ohlc_len)
high_vol_bear[2:] = _volume[:-2]
low_vol_bear = np.zeros(ohlc_len)
low_vol_bear[1:] = _volume[1:] + _volume[:-1]

for i in range(ohlc_len):
# Process existing bullish OBs
current_bull_obs = np.where(ob == 1)[0]
if current_bull_obs.size > 0:
mask_breaker_true = breaker[current_bull_obs]
invalidation_mask = mask_breaker_true & (_high[i] > top[current_bull_obs])
invalidation_indices = current_bull_obs[invalidation_mask]
ob[invalidation_indices] = 0
top[invalidation_indices] = np.nan
bottom[invalidation_indices] = np.nan
obVolume[invalidation_indices] = np.nan
lowVolume[invalidation_indices] = np.nan
highVolume[invalidation_indices] = np.nan
mitigated_index[invalidation_indices] = np.nan
percentage[invalidation_indices] = np.nan

mask_breaker_false = ~mask_breaker_true
current_bull_obs_bf = current_bull_obs[mask_breaker_false]
if current_bull_obs_bf.size > 0:
if close_mitigation:
mitigation_cond = np.minimum(_open[i], _close[i]) < bottom[current_bull_obs_bf]
else:
mitigation_cond = _low[i] < bottom[current_bull_obs_bf]
mitigate_indices = current_bull_obs_bf[mitigation_cond]
breaker[mitigate_indices] = True
mitigated_index[mitigate_indices] = i

# Process new bullish OBs
last_top_idx = last_swing_high_indices[i]
if last_top_idx != -1:
if _close[i] > _high[last_top_idx] and not crossed[last_top_idx]:
crossed[last_top_idx] = True
window_start = last_top_idx + 1
window_end = i - 1
if window_start <= window_end:
window_lows = _low[window_start:window_end+1]
min_idx = window_start + np.argmin(window_lows)
obBtm = _low[min_idx]
obTop = _high[min_idx]
obIndex = min_idx

ob[obIndex] = 1
top[obIndex] = obTop
bottom[obIndex] = obBtm
obVolume[obIndex] = volume_sum[i]
lowVolume[obIndex] = low_vol_bull[i]
highVolume[obIndex] = high_vol_bull[i]
min_vol = min(high_vol_bull[i], low_vol_bull[i])
max_vol = max(high_vol_bull[i], low_vol_bull[i])
percentage[obIndex] = (min_vol / max_vol * 100) if max_vol != 0 else 100

# Process existing bearish OBs
current_bear_obs = np.where(ob == -1)[0]
if current_bear_obs.size > 0:
mask_breaker_true = breaker[current_bear_obs]
invalidation_mask = mask_breaker_true & (_low[i] < bottom[current_bear_obs])
invalidation_indices = current_bear_obs[invalidation_mask]
ob[invalidation_indices] = 0
top[invalidation_indices] = np.nan
bottom[invalidation_indices] = np.nan
obVolume[invalidation_indices] = np.nan
lowVolume[invalidation_indices] = np.nan
highVolume[invalidation_indices] = np.nan
mitigated_index[invalidation_indices] = np.nan
percentage[invalidation_indices] = np.nan

mask_breaker_false = ~mask_breaker_true
current_bear_obs_bf = current_bear_obs[mask_breaker_false]
if current_bear_obs_bf.size > 0:
if close_mitigation:
mitigation_cond = np.maximum(_open[i], _close[i]) > top[current_bear_obs_bf]
else:
mitigation_cond = _high[i] > top[current_bear_obs_bf]
mitigate_indices = current_bear_obs_bf[mitigation_cond]
breaker[mitigate_indices] = True
mitigated_index[mitigate_indices] = i

# Process new bearish OBs
last_btm_idx = last_swing_low_indices[i]
if last_btm_idx != -1:
if _close[i] < _low[last_btm_idx] and not crossed[last_btm_idx]:
crossed[last_btm_idx] = True
window_start = last_btm_idx + 1
window_end = i - 1
if window_start <= window_end:
window_highs = _high[window_start:window_end+1]
max_idx = window_start + np.argmax(window_highs)
obTop = _high[max_idx]
obBtm = _low[max_idx]
obIndex = max_idx

ob[obIndex] = -1
top[obIndex] = obTop
bottom[obIndex] = obBtm
obVolume[obIndex] = volume_sum[i]
lowVolume[obIndex] = low_vol_bear[i]
highVolume[obIndex] = high_vol_bear[i]
min_vol = min(high_vol_bear[i], low_vol_bear[i])
max_vol = max(high_vol_bear[i], low_vol_bear[i])
percentage[obIndex] = (min_vol / max_vol * 100) if max_vol != 0 else 100

# Convert zeros to NaN for OB and related arrays
ob = np.where(ob != 0, ob, np.nan)
top = np.where(np.isnan(ob), np.nan, top)
bottom = np.where(np.isnan(ob), np.nan, bottom)
obVolume = np.where(np.isnan(ob), np.nan, obVolume)
mitigated_index = np.where(np.isnan(ob), np.nan, mitigated_index)
percentage = np.where(np.isnan(ob), np.nan, percentage)

return pd.DataFrame({
"OB": ob,
"Top": top,
"Bottom": bottom,
"OBVolume": obVolume,
"MitigatedIndex": mitigated_index,
"Percentage": percentage
})

@classmethod
def liquidity(
cls, ohlc: DataFrame, swing_highs_lows: DataFrame, range_percent: float = 0.01
Expand Down Expand Up @@ -689,6 +858,150 @@ def liquidity(

return pd.concat([liquidity, level, liquidity_end, liquidity_swept], axis=1)

@classmethod
def liquidity_vec_chatgpt(cls, ohlc: pd.DataFrame, swing_highs_lows: pd.DataFrame, range_percent: float = 0.01) -> pd.DataFrame:
"""
Liquidity
-----------
Liquidity is when there are multiple highs within a small range of each other,
or multiple lows within a small range of each other.

Parameters
----------
ohlc : DataFrame
Must contain at least the columns "high" and "low".
swing_highs_lows : DataFrame
The dataframe output from the swing_highs_lows function; must contain
columns "HighLow" (with 1 for swing highs and -1 for swing lows) and "Level".
range_percent : float, default 0.01
The percentage of the range (highest high – lowest low) to determine the pip range.

Returns
-------
DataFrame
A DataFrame with four columns:
- Liquidity: 1 for bullish liquidity, -1 for bearish liquidity (NaN if none)
- Level: the average level of the grouped swing points (NaN if none)
- End: the index (in the OHLC data) of the last swing point in the group (NaN if none)
- Swept: the index of the candle that “swept” the liquidity level (NaN if none)
"""

# Work on a copy so that we don’t modify the original swing_highs_lows DataFrame.
shl = swing_highs_lows.copy()

# pip_range is the given percentage of the full price range.
pip_range = (ohlc["high"].max() - ohlc["low"].min()) * range_percent

n = len(ohlc)
indices = np.arange(n)

# Copy the swing indicator and level into numpy arrays.
# (We will “mark” swing points as used by setting their flag to 0.)
HL = shl["HighLow"].values.copy()
levels = shl["Level"].values.copy()
ohlc_high = ohlc["high"].values
ohlc_low = ohlc["low"].values

# Prepare result arrays (using NaN where no liquidity is found).
# We use float arrays so that NaN can be stored.
liquidity = np.full(n, np.nan, dtype=np.float64)
liquidity_level = np.full(n, np.nan, dtype=np.float64)
liquidity_end = np.full(n, np.nan, dtype=np.float64)
liquidity_swept = np.full(n, np.nan, dtype=np.float64)

# ----- Process bullish (high) swings -----
# Find candidate indices for swing highs.
candidate_highs = np.where(HL == 1)[0]
for i in candidate_highs:
# Skip if this swing high has been “used” in an earlier liquidity group.
if HL[i] != 1:
continue

high_level = levels[i]
range_low = high_level - pip_range
range_high = high_level + pip_range

# --- Determine the “swept” candle index for this candidate ---
# Look ahead (from i+1 onward) for the first candle where the OHLC high
# is at or above the upper boundary.
cond = ohlc_high[i+1:] >= range_high
if np.any(cond):
# np.argmax returns the first True element in the sliced array.
swept_index = i + 1 + int(np.argmax(cond))
else:
swept_index = 0 # same as the original code

# --- Find subsequent candidate swing highs that fall in the liquidity range ---
if swept_index > 0:
valid = (indices > i) & (indices < swept_index) & (HL == 1)
else:
valid = (indices > i) & (HL == 1)
# Further require that the candidate’s level is within [range_low, range_high].
valid = valid & (levels >= range_low) & (levels <= range_high)
valid_indices = indices[valid]

# Form the group (including the candidate itself)
if valid_indices.size > 0:
group_levels = np.concatenate(([high_level], levels[valid_indices]))
else:
group_levels = np.array([high_level])

# Only if there is more than one swing in the group is there “liquidity.”
if group_levels.size > 1:
avg_level = group_levels.mean()
liquidity[i] = 1
liquidity_level[i] = avg_level
# For liquidity_end, use the last swing index added (if any) – otherwise keep i.
liquidity_end[i] = valid_indices[-1] if valid_indices.size > 0 else i
liquidity_swept[i] = swept_index
# Mark all the swing highs that are now grouped so they are not re‐used.
HL[valid_indices] = 0

# ----- Process bearish (low) swings -----
candidate_lows = np.where(HL == -1)[0]
for i in candidate_lows:
if HL[i] != -1:
continue

low_level = levels[i]
range_low = low_level - pip_range
range_high = low_level + pip_range

# For lows the “swept” candle is the first where the OHLC low is at or below the lower boundary.
cond = ohlc_low[i+1:] <= range_low
if np.any(cond):
swept_index = i + 1 + int(np.argmax(cond))
else:
swept_index = 0

if swept_index > 0:
valid = (indices > i) & (indices < swept_index) & (HL == -1)
else:
valid = (indices > i) & (HL == -1)
valid = valid & (levels >= range_low) & (levels <= range_high)
valid_indices = indices[valid]

if valid_indices.size > 0:
group_levels = np.concatenate(([low_level], levels[valid_indices]))
else:
group_levels = np.array([low_level])

if group_levels.size > 1:
avg_level = group_levels.mean()
liquidity[i] = -1
liquidity_level[i] = avg_level
liquidity_end[i] = valid_indices[-1] if valid_indices.size > 0 else i
liquidity_swept[i] = swept_index
HL[valid_indices] = 0

# Convert results to Series (with names as in the original function)
liquidity_series = pd.Series(liquidity, name="Liquidity")
level_series = pd.Series(liquidity_level, name="Level")
end_series = pd.Series(liquidity_end, name="End")
swept_series = pd.Series(liquidity_swept, name="Swept")

return pd.concat([liquidity_series, level_series, end_series, swept_series], axis=1)

@classmethod
def previous_high_low(cls, ohlc: DataFrame, time_frame: str = "1D") -> Series:
"""
Expand Down