-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcd_utils.py
125 lines (104 loc) · 3.74 KB
/
cd_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#!/usr/bin/env python
"""
This module includes functions for Concept Drift Detection with a data iterator as a Stream source.
"""
__author__ = "Ismail El Hachimi & Mathieu Vandecasteele"
__title__ = "Concept Drift Detection Through Resampling"
# Dependencies
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split
import numpy as np
from time import time, sleep
np.random.seed(42)
# Data Stream Class
class DataStream:
def __init__(self, X, y, size):
self.iterator = iter(zip(X, y))
self.buffer = []
self.size = size
self.xshape = X.shape
self.yshape = y.shape
def __iter__(self):
return self
def __next__(self):
tmp = (np.zeros((self.size, self.xshape[1])),
np.zeros(self.size))
for i in range(self.size):
if self.buffer:
tmp_iter = self.buffer.pop()
else:
tmp_iter = next(self.iterator)
tmp[0][i,:] = tmp_iter[0]
tmp[1][i] = tmp_iter[1]
return tmp
def has_next(self):
if self.buffer:
return True
try:
self.buffer = [next(self.iterator)]
except StopIteration:
return False
else:
return True
# Concept Drift Functions
def get_data_stream(data_stream, X=None, y=None):
if X is None and y is None:
return data_stream
else:
return np.concatenate((X, data_stream[0]), axis=0), np.concatenate((y, data_stream[1]), axis=0)
def concept_drift_scheme(window_size, permut, cd_size, significance_rate, data_stream):
t_ = 0
k = window_size
D = dict()
X, y = get_data_stream(data_stream.__next__(), None, None)
times = []
Rord_list = []
Rsmean_list = []
i = 1
while(data_stream.has_next()):
print("############# STREAM N° {} #############".format(i))
i += 1
time_s = time()
X, y = get_data_stream(data_stream.__next__(), X, y)
Sx_ord, Sx_ord_t, Sy_ord, Sy_ord_t = train_test_split(X[t_:, :], y[t_:], test_size=window_size,
shuffle=False)
drift_status, Rord, Rsmean = detect_concept_drift((X[t_:, :], y[t_:]), (Sx_ord, Sy_ord), (Sx_ord_t, Sy_ord_t),
permut, cd_size, significance_rate, window_size)
Rord_list.append(Rord)
Rsmean_list.append(Rsmean)
if drift_status:
print("\nA CONCEPT DRIFT HAS BEEN DETECTED at k = {}\n".format(k))
t_ = k
D[i-1] = k
k += window_size
time_s = time() - time_s
times.append(time_s)
print("Took : "+str(time_s))
return D, times, Rord_list, Rsmean_list
def detect_concept_drift(data, S_ord, S_ord_t, permut, cd_size, significance_rate, window_size):
Rord = empirical_risk(S_ord, S_ord_t)
Rs = []
S = []
S_t = []
for i in range(permut):
X, X_t, y, y_t = train_test_split(data[0], data[1], test_size=window_size, shuffle=True)
S.append((X, y))
S_t.append((X_t, y_t))
Rs.append(empirical_risk(S[-1], S_t[-1]))
return TEST(Rord, Rs, cd_size, significance_rate), Rord, np.mean(Rs)
def empirical_risk(S, S_t):
hyperparams = {'kernel': 'rbf',
'C': 5,
'gamma': 0.05}
model = SVC(**hyperparams)
model.fit(S[0], S[1])
return model.score(S_t[0], S_t[1])
def TEST(Rord, Rs, cd_size, significance_rate):
nb_detected = 1
for i in range(len(Rs)):
nb_detected += ((Rord - Rs[i]) <= cd_size)*1
tmp = nb_detected/(len(Rs) + 1)
if tmp <= significance_rate:
return True
else:
return False