-
Notifications
You must be signed in to change notification settings - Fork 172
/
Copy pathmp_support_resist.py
137 lines (102 loc) · 4.39 KB
/
mp_support_resist.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import mplfinance as mpf
import scipy
import math
import pandas_ta as ta
def find_levels(
price: np.array, atr: float, # Log closing price, and log atr
first_w: float = 0.1,
atr_mult: float = 3.0,
prom_thresh: float = 0.1
):
# Setup weights
last_w = 1.0
w_step = (last_w - first_w) / len(price)
weights = first_w + np.arange(len(price)) * w_step
weights[weights < 0] = 0.0
# Get kernel of price.
kernal = scipy.stats.gaussian_kde(price, bw_method=atr*atr_mult, weights=weights)
# Construct market profile
min_v = np.min(price)
max_v = np.max(price)
step = (max_v - min_v) / 200
price_range = np.arange(min_v, max_v, step)
pdf = kernal(price_range) # Market profile
# Find significant peaks in the market profile
pdf_max = np.max(pdf)
prom_min = pdf_max * prom_thresh
peaks, props = scipy.signal.find_peaks(pdf, prominence=prom_min)
levels = []
for peak in peaks:
levels.append(np.exp(price_range[peak]))
return levels, peaks, props, price_range, pdf, weights
def support_resistance_levels(
data: pd.DataFrame, lookback: int,
first_w: float = 0.01, atr_mult:float=3.0, prom_thresh:float =0.25
):
# Get log average true range,
atr = ta.atr(np.log(data['high']), np.log(data['low']), np.log(data['close']), lookback)
all_levels = [None] * len(data)
for i in range(lookback, len(data)):
i_start = i - lookback
vals = np.log(data.iloc[i_start+1: i+1]['close'].to_numpy())
levels, peaks, props, price_range, pdf, weights= find_levels(vals, atr.iloc[i], first_w, atr_mult, prom_thresh)
all_levels[i] = levels
return all_levels
def sr_penetration_signal(data: pd.DataFrame, levels: list):
signal = np.zeros(len(data))
curr_sig = 0.0
close_arr = data['close'].to_numpy()
for i in range(1, len(data)):
if levels[i] is None:
continue
last_c = close_arr[i - 1]
curr_c = close_arr[i]
for level in levels[i]:
if curr_c > level and last_c <= level: # Close cross above line
curr_sig = 1.0
elif curr_c < level and last_c >= level: # Close cross below line
curr_sig = -1.0
signal[i] = curr_sig
return signal
def get_trades_from_signal(data: pd.DataFrame, signal: np.array):
long_trades = []
short_trades = []
close_arr = data['close'].to_numpy()
last_sig = 0.0
open_trade = None
idx = data.index
for i in range(len(data)):
if signal[i] == 1.0 and last_sig != 1.0: # Long entry
if open_trade is not None:
open_trade[2] = idx[i]
open_trade[3] = close_arr[i]
short_trades.append(open_trade)
open_trade = [idx[i], close_arr[i], -1, np.nan]
if signal[i] == -1.0 and last_sig != -1.0: # Short entry
if open_trade is not None:
open_trade[2] = idx[i]
open_trade[3] = close_arr[i]
long_trades.append(open_trade)
open_trade = [idx[i], close_arr[i], -1, np.nan]
last_sig = signal[i]
long_trades = pd.DataFrame(long_trades, columns=['entry_time', 'entry_price', 'exit_time', 'exit_price'])
short_trades = pd.DataFrame(short_trades, columns=['entry_time', 'entry_price', 'exit_time', 'exit_price'])
long_trades['percent'] = (long_trades['exit_price'] - long_trades['entry_price']) / long_trades['entry_price']
short_trades['percent'] = -1 * (short_trades['exit_price'] - short_trades['entry_price']) / short_trades['entry_price']
long_trades = long_trades.set_index('entry_time')
short_trades = short_trades.set_index('entry_time')
return long_trades, short_trades
if __name__ == '__main__':
# Trend following strategy
data = pd.read_csv('BTCUSDT86400.csv')
data['date'] = data['date'].astype('datetime64[s]')
data = data.set_index('date')
plt.style.use('dark_background')
levels = support_resistance_levels(data, 365, first_w=1.0, atr_mult=3.0)
data['sr_signal'] = sr_penetration_signal(data, levels)
data['log_ret'] = np.log(data['close']).diff().shift(-1)
data['sr_return'] = data['sr_signal'] * data['log_ret']
long_trades, short_trades = get_trades_from_signal(data, data['sr_signal'].to_numpy())