forked from MingchaoZhu/DeepLearning
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchapter 11.py
289 lines (247 loc) · 9.57 KB
/
chapter 11.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
import pandas as pd
import numpy as np
import itertools
import time
import re
from scipy.stats import norm
import matplotlib.pyplot as plt
def cal_conf_matrix(labels, preds):
"""
计算混淆矩阵。
参数说明:
labels:样本标签 (真实结果)
preds:预测结果
"""
n_sample = len(labels)
result = pd.DataFrame(index=range(0,n_sample),columns=('probability','label'))
result['label'] = np.array(labels)
result['probability'] = np.array(preds)
cm = np.arange(4).reshape(2,2)
cm[0,0] = len(result[result['label']==1][result['probability']>=0.5]) # TP,注意这里是以 0.5 为阈值
cm[0,1] = len(result[result['label']==1][result['probability']<0.5]) # FN
cm[1,0] = len(result[result['label']==0][result['probability']>=0.5]) # FP
cm[1,1] = len(result[result['label']==0][result['probability']<0.5]) # TN
return cm
def cal_PRF1(labels, preds):
"""
计算查准率P,查全率R,F1值。
"""
cm = cal_conf_matrix(labels, preds)
P = cm[0,0]/(cm[0,0]+cm[1,0])
R = cm[0,0]/(cm[0,0]+cm[0,1])
F1 = 2*P*R/(P+R)
return P, R, F1
def cal_PRcurve(labels, preds):
"""
计算PR曲线上的值。
"""
n_sample = len(labels)
result = pd.DataFrame(index=range(0,n_sample),columns=('probability','label'))
y_pred[y_pred>=0.5] = 1
y_pred[y_pred<0.5] = 0
result['label'] = np.array(labels)
result['probability'] = np.array(preds)
result.sort_values('probability',inplace=True,ascending=False)
PandR = pd.DataFrame(index=range(len(labels)),columns=('P','R'))
for j in range(len(result)):
# 以每一个概率为分类的阈值,统计此时正例和反例的数量
result_j = result.head(n=j+1)
P = len(result_j[result_j['label']==1])/float(len(result_j)) # 当前实际为正的数量/当前预测为正的数量
R = len(result_j[result_j['label']==1])/float(len(result[result['label']==1])) # 当前真正例的数量/实际为正的数量
PandR.iloc[j] = [P,R]
return PandR
def cal_ROCcurve(labels, preds):
"""
计算ROC曲线上的值。
"""
n_sample = len(labels)
result = pd.DataFrame(index=range(0,n_sample),columns=('probability','label'))
y_pred[y_pred>=0.5] = 1
y_pred[y_pred<0.5] = 0
result['label'] = np.array(labels)
result['probability'] = np.array(preds)
# 计算 TPR,FPR
result.sort_values('probability',inplace=True,ascending=False)
TPRandFPR=pd.DataFrame(index=range(len(result)),columns=('TPR','FPR'))
for j in range(len(result)):
# 以每一个概率为分类的阈值,统计此时正例和反例的数量
result_j=result.head(n=j+1)
TPR=len(result_j[result_j['label']==1])/float(len(result[result['label']==1])) # 当前真正例的数量/实际为正的数量
FPR=len(result_j[result_j['label']==0])/float(len(result[result['label']==0])) # 当前假正例的数量/实际为负的数量
TPRandFPR.iloc[j]=[TPR,FPR]
return TPRandFPR
def timeit(func):
"""
装饰器,计算函数执行时间
"""
def wrapper(*args, **kwargs):
time_start = time.time()
result = func(*args, **kwargs)
time_end = time.time()
exec_time = time_end - time_start
print("{function} exec time: {time}s".format(function=func.__name__,time=exec_time))
return result
return wrapper
@timeit
def area_auc(labels, preds):
"""
AUC值的梯度法计算
"""
TPRandFPR = cal_ROCcurve(labels, preds)
# 计算AUC,计算小矩形的面积之和
auc = 0.
prev_x = 0
for x, y in zip(TPRandFPR.FPR,TPRandFPR.TPR):
if x != prev_x:
auc += (x - prev_x) * y
prev_x = x
return auc
@timeit
def naive_auc(labels, preds):
"""
AUC值的概率法计算
"""
n_pos = sum(labels)
n_neg = len(labels) - n_pos
total_pair = n_pos * n_neg # 总的正负样本对的数目
labels_preds = zip(labels, preds)
labels_preds = sorted(labels_preds,key=lambda x:x[1]) # 对预测概率升序排序
count_neg = 0 # 统计负样本出现的个数
satisfied_pair = 0 # 统计满足条件的样本对的个数
for i in range(len(labels_preds)):
if labels_preds[i][0] == 1:
satisfied_pair += count_neg # 表明在这个正样本下,有哪些负样本满足条件
else:
count_neg += 1
return satisfied_pair / float(total_pair)
#####----Bayesian Hyperparameter Optimization----####
class KernelBase(ABC):
def __init__(self):
super().__init__()
self.params = {}
self.hyperparams = {}
@abstractmethod
def _kernel(self, X, Y):
raise NotImplementedError
def __call__(self, X, Y=None):
return self._kernel(X, Y)
def __str__(self):
P, H = self.params, self.hyperparams
p_str = ", ".join(["{}={}".format(k, v) for k, v in P.items()])
return "{}({})".format(H["op"], p_str)
def summary(self):
return {
"op": self.hyperparams["op"],
"params": self.params,
"hyperparams": self.hyperparams,
}
class RBFKernel(KernelBase):
def __init__(self, sigma=None):
"""
RBF 核。
"""
super().__init__()
self.hyperparams = {"op": "RBFKernel"}
self.params = {"sigma": sigma} # 如果 sigma 未赋值则默认为 np.sqrt(n_features/2),n_features 为特征数。
def _kernel(self, X, Y=None):
"""
对 X 和 Y 的行的每一对计算 RBF 核。如果 Y 为空,则 Y=X。
参数说明:
X:输入数组,为 (n_samples, n_features)
Y:输入数组,为 (m_samples, n_features)
"""
X = X.reshape(-1, 1) if X.ndim == 1 else X
Y = X if Y is None else Y
Y = Y.reshape(-1, 1) if Y.ndim == 1 else Y
assert X.ndim == 2 and Y.ndim == 2, "X and Y must have 2 dimensions"
sigma = np.sqrt(X.shape[1] / 2) if self.params["sigma"] is None else self.params["sigma"]
X, Y = X / sigma, Y / sigma
D = -2 * X @ Y.T + np.sum(Y**2, axis=1) + np.sum(X**2, axis=1)[:, np.newaxis]
D[D < 0] = 0
return np.exp(-0.5 * D)
class KernelInitializer(object):
def __init__(self, param=None):
self.param = param
def __call__(self):
r = r"([a-zA-Z0-9]*)=([^,)]*)"
kr_str = self.param.lower()
kwargs = dict([(i, eval(j)) for (i, j) in re.findall(r, self.param)])
if "rbf" in kr_str:
kernel = RBFKernel(**kwargs)
else:
raise NotImplementedError("{}".format(kr_str))
return kernel
class GPRegression:
"""
高斯过程回归
"""
def __init__(self, kernel="RBFKernel", sigma=1e-10):
self.kernel = KernelInitializer(kernel)()
self.params = {"GP_mean": None, "GP_cov": None, "X": None}
self.hyperparams = {"kernel": str(self.kernel), "sigma": sigma}
def fit(self, X, y):
"""
用已有的样本集合得到 GP 先验。
参数说明:
X:输入数组,为 (n_samples, n_features)
y:输入数组 X 的目标值,为 (n_samples)
"""
mu = np.zeros(X.shape[0])
Cov = self.kernel(X, X)
self.params["X"] = X
self.params["y"] = y
self.params["GP_cov"] = Cov
self.params["GP_mean"] = mu
def predict(self, X_star, conf_interval=0.95):
"""
对新的样本 X 进行预测。
参数说明:
X_star:输入数组,为 (n_samples, n_features)
conf_interval:置信区间,浮点型 (0, 1),default=0.95
"""
X = self.params["X"]
y = self.params["y"]
K = self.params["GP_cov"]
sigma = self.hyperparams["sigma"]
K_star = self.kernel(X_star, X)
K_star_star = self.kernel(X_star, X_star)
sig = np.eye(K.shape[0]) * sigma
K_y_inv = np.linalg.pinv(K + sig)
mean = K_star @ K_y_inv @ y
cov = K_star_star - K_star @ K_y_inv @ K_star.T
percentile = norm.ppf(conf_interval)
conf = percentile * np.sqrt(np.diag(cov))
return mean, conf, cov
class BayesianOptimization:
def __init__(self):
self.model = GPRegression()
def acquisition_function(self, Xsamples):
mu, _, cov = self.model.predict(Xsamples)
mu = mu if mu.ndim==1 else (mu.T)[0]
ysample = np.random.multivariate_normal(mu, cov)
return ysample
def opt_acquisition(self, X, n_samples=20):
# 样本搜索策略,一般方法有随机搜索、基于网格的搜索,或局部搜索
# 我们这里就用简单的随机搜索,这里也可以定义样本的范围
Xsamples = np.random.randint(low=1,high=50,size=n_samples*X.shape[1])
Xsamples = Xsamples.reshape(n_samples, X.shape[1])
# 计算采集函数的值并取最大的值
scores = self.acquisition_function(Xsamples)
ix = np.argmax(scores)
return Xsamples[ix, 0]
def fit(self, f, X, y):
# 拟合 GPR 模型
self.model.fit(X, y)
# 优化过程
for i in range(15):
x_star = self.opt_acquisition(X) # 下一个采样点
y_star = f(x_star)
mean, conf, cov = self.model.predict(np.array([[x_star]]))
# 添加当前数据到数据集合
X = np.vstack((X, [[x_star]]))
y = np.vstack((y, [[y_star]]))
# 更新 GPR 模型
self.model.fit(X, y)
ix = np.argmax(y)
print('Best Result: x=%.3f, y=%.3f' % (X[ix], y[ix]))
return X[ix], y[ix]