-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathget_feature.py
211 lines (127 loc) · 4.58 KB
/
get_feature.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from __future__ import print_function, division
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import net_sphere, cnn_cnn, multi_channel
import os
import pandas as pd
from skimage import io, transform
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import torchvision
import math
from tensorboardX import SummaryWriter
from input_pipeline import DatasetFolder
import matplotlib.pyplot as plt
# Ignore warnings
import warnings
warnings.filterwarnings("ignore")
Model = 'multi_channel' #choose from {'cnn_cnn','multi_channel'}
# load_epoch_num =13800
#参数设定
if Model == 'cnn_cnn':
Test_dir ='/media/nirheaven/nirheaven_ext4/M/aligned_imgs_test/'
Load_Model = '/media/zhineng/Data/M/code/code/Models/cnn_cnn_'
save_txt = ''
# net parameters
_batch_size = 20
_seq_num = 32
_classnum=1400
learning_rate = 0.00001
_concat_channel = False
if Model == 'multi_channel':
Test_dir ='/media/nirheaven/nirheaven_ext4/M/aligned_imgs_test/'
Load_Model = '/media/nirheaven/nirheaven_ext4/M/code/code/MC_Models/mc_'
save_txt = '/media/nirheaven/nirheaven_ext4/M/code/code/feature/mc_epoch'
#net parameters
_batch_size = 29 #53
_seq_num = 5
_classnum=1400
learning_rate = 0.00001
_concat_channel = True
#get file number
filenum = 0
for speaker in os.listdir(Test_dir):
for video in os.listdir(Test_dir+'/'+speaker):
filenum+=1
epoch_filenum = math.floor(filenum/_batch_size)*_batch_size
print('File Number is %d, Epoch File Number is %d'%(filenum,epoch_filenum))
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# Assume that we are on a CUDA machine, then this should print a CUDA device:
print(device)
# load net structure
if Model == 'cnn_cnn':
net = getattr(cnn_cnn,'cnn_cnn_')(batch_size=_batch_size,seq_num=_seq_num,classnum=_classnum,feature=True)
elif Model == 'multi_channel':
net = getattr(multi_channel,'mc_')(batch_size=_batch_size,seq_num=_seq_num,classnum=_classnum,feature=True)
else:
print('load model ERROR: Model does not exist')
transform = transforms.Compose(
[transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
testset = DatasetFolder(Test_dir,transform=transform,extensions='.jpg',seq_num=_seq_num,concat_channel=_concat_channel)
testloader = torch.utils.data.DataLoader(testset, batch_size=_batch_size,
shuffle=False, num_workers=1)
for load_epoch_num in range(8000,10001):
if load_epoch_num%800 == 0:
net.load_state_dict(torch.load(Load_Model+str(load_epoch_num)+'.pth'))
print('successfully load epoch_%d parameters'%(load_epoch_num))
net.to(device)
#get average pooling features
feature_list = []
for epoch in range(3):
# with open(save_txt+str(load_epoch_num)+'_'+str(epoch)+'.txt','w+') as f:
id_num = 0
for index,data in enumerate(testloader,0):
if index>(math.floor(filenum/_batch_size)-1): continue
inputs, labels = data
inputs = inputs.to(device)
with torch.no_grad():
outputs = net(inputs)
outputs = outputs.to('cpu')
for i in range(_batch_size):
if epoch == 0:
feature_list.append([labels.numpy()[i],outputs.numpy()[i]])
else:
feature_list[id_num][1] = feature_list[id_num][1] + outputs.numpy()[i]
id_num += 1
feature_list = [(x,y/3) for (x,y) in feature_list]
# compute distance
dist_info=[]
label_info =[]
for i,item in enumerate(feature_list):
label_i,feature_i = item
for j in range(i+1,len(feature_list)):
label_j,feature_j = feature_list[j]
# 归一化
# feature_i = feature_i/np.linalg.norm(feature_i)
# feature_j = feature_j/np.linalg.norm(feature_j)
# dist = np.linalg.norm(feature_i - feature_j) #欧式距离
dist = np.dot(feature_i,feature_j) #内积
label_info.append((label_i,label_j))
dist_info.append(dist)
#遍历阈值,画ROC图
TAR = []
FAR = []
for threshold in list(np.linspace(min(dist_info),max(dist_info),50)):
tp = tn = fp = fn = 0
for i in range(len(dist_info)):
if dist_info[i] > threshold and label_info[i][0]==label_info[i][1]:
tp += 1
elif dist_info[i] > threshold and not label_info[i][0]==label_info[i][1]:
fp += 1
elif dist_info[i] < threshold and label_info[i][0]==label_info[i][1]:
fn += 1
else:
tn += 1
FAR.append(fp/(fp+tn))
TAR.append(tp/(tp+fn))
plt.semilogx(FAR,TAR,label=str(load_epoch_num))
plt.xlabel('false positive rate')
plt.ylabel('true positive rate')
# plt.plot(TAR,FAR)
plt.legend()
plt.show()