forked from TakHemlata/SSL_Anti-spoofing
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_utils_SSL.py
173 lines (120 loc) · 6.27 KB
/
data_utils_SSL.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import os
import numpy as np
import torch
import torch.nn as nn
from torch import Tensor
import librosa
from torch.utils.data import Dataset
from RawBoost import ISD_additive_noise,LnL_convolutive_noise,SSI_additive_noise,normWav
from random import randrange
import random
___author__ = "Hemlata Tak"
__email__ = "tak@eurecom.fr"
def genSpoof_list( dir_meta,is_train=False,is_eval=False):
d_meta = {}
file_list=[]
with open(dir_meta, 'r') as f:
l_meta = f.readlines()
if (is_train):
for line in l_meta:
_,key,_,_,label = line.strip().split()
file_list.append(key)
d_meta[key] = 1 if label == 'bonafide' else 0
return d_meta,file_list
elif(is_eval):
for line in l_meta:
key= line.strip()
file_list.append(key)
return file_list
else:
for line in l_meta:
_,key,_,_,label = line.strip().split()
file_list.append(key)
d_meta[key] = 1 if label == 'bonafide' else 0
return d_meta,file_list
def pad(x, max_len=64600):
x_len = x.shape[0]
if x_len >= max_len:
return x[:max_len]
# need to pad
num_repeats = int(max_len / x_len)+1
padded_x = np.tile(x, (1, num_repeats))[:, :max_len][0]
return padded_x
class Dataset_ASVspoof2019_train(Dataset):
def __init__(self,args,list_IDs, labels, base_dir,algo):
'''self.list_IDs : list of strings (each string: utt key),
self.labels : dictionary (key: utt key, value: label integer)'''
self.list_IDs = list_IDs
self.labels = labels
self.base_dir = base_dir
self.algo=algo
self.args=args
self.cut=64600 # take ~4 sec audio (64600 samples)
def __len__(self):
return len(self.list_IDs)
def __getitem__(self, index):
utt_id = self.list_IDs[index]
X,fs = librosa.load(self.base_dir+'flac/'+utt_id+'.flac', sr=16000)
Y=process_Rawboost_feature(X,fs,self.args,self.algo)
X_pad= pad(Y,self.cut)
x_inp= Tensor(X_pad)
target = self.labels[utt_id]
return x_inp, target
class Dataset_ASVspoof2021_eval(Dataset):
def __init__(self, list_IDs, base_dir):
'''self.list_IDs : list of strings (each string: utt key),
'''
self.list_IDs = list_IDs
self.base_dir = base_dir
self.cut=64600 # take ~4 sec audio (64600 samples)
def __len__(self):
return len(self.list_IDs)
def __getitem__(self, index):
utt_id = self.list_IDs[index]
X, fs = librosa.load(self.base_dir+'flac/'+utt_id+'.flac', sr=16000)
X_pad = pad(X,self.cut)
x_inp = Tensor(X_pad)
return x_inp,utt_id
#--------------RawBoost data augmentation algorithms---------------------------##
def process_Rawboost_feature(feature, sr,args,algo):
# Data process by Convolutive noise (1st algo)
if algo==1:
feature =LnL_convolutive_noise(feature,args.N_f,args.nBands,args.minF,args.maxF,args.minBW,args.maxBW,args.minCoeff,args.maxCoeff,args.minG,args.maxG,args.minBiasLinNonLin,args.maxBiasLinNonLin,sr)
# Data process by Impulsive noise (2nd algo)
elif algo==2:
feature=ISD_additive_noise(feature, args.P, args.g_sd)
# Data process by coloured additive noise (3rd algo)
elif algo==3:
feature=SSI_additive_noise(feature,args.SNRmin,args.SNRmax,args.nBands,args.minF,args.maxF,args.minBW,args.maxBW,args.minCoeff,args.maxCoeff,args.minG,args.maxG,sr)
# Data process by all 3 algo. together in series (1+2+3)
elif algo==4:
feature =LnL_convolutive_noise(feature,args.N_f,args.nBands,args.minF,args.maxF,args.minBW,args.maxBW,
args.minCoeff,args.maxCoeff,args.minG,args.maxG,args.minBiasLinNonLin,args.maxBiasLinNonLin,sr)
feature=ISD_additive_noise(feature, args.P, args.g_sd)
feature=SSI_additive_noise(feature,args.SNRmin,args.SNRmax,args.nBands,args.minF,
args.maxF,args.minBW,args.maxBW,args.minCoeff,args.maxCoeff,args.minG,args.maxG,sr)
# Data process by 1st two algo. together in series (1+2)
elif algo==5:
feature =LnL_convolutive_noise(feature,args.N_f,args.nBands,args.minF,args.maxF,args.minBW,args.maxBW,
args.minCoeff,args.maxCoeff,args.minG,args.maxG,args.minBiasLinNonLin,args.maxBiasLinNonLin,sr)
feature=ISD_additive_noise(feature, args.P, args.g_sd)
# Data process by 1st and 3rd algo. together in series (1+3)
elif algo==6:
feature =LnL_convolutive_noise(feature,args.N_f,args.nBands,args.minF,args.maxF,args.minBW,args.maxBW,
args.minCoeff,args.maxCoeff,args.minG,args.maxG,args.minBiasLinNonLin,args.maxBiasLinNonLin,sr)
feature=SSI_additive_noise(feature,args.SNRmin,args.SNRmax,args.nBands,args.minF,args.maxF,args.minBW,args.maxBW,args.minCoeff,args.maxCoeff,args.minG,args.maxG,sr)
# Data process by 2nd and 3rd algo. together in series (2+3)
elif algo==7:
feature=ISD_additive_noise(feature, args.P, args.g_sd)
feature=SSI_additive_noise(feature,args.SNRmin,args.SNRmax,args.nBands,args.minF,args.maxF,args.minBW,args.maxBW,args.minCoeff,args.maxCoeff,args.minG,args.maxG,sr)
# Data process by 1st two algo. together in Parallel (1||2)
elif algo==8:
feature1 =LnL_convolutive_noise(feature,args.N_f,args.nBands,args.minF,args.maxF,args.minBW,args.maxBW,
args.minCoeff,args.maxCoeff,args.minG,args.maxG,args.minBiasLinNonLin,args.maxBiasLinNonLin,sr)
feature2=ISD_additive_noise(feature, args.P, args.g_sd)
feature_para=feature1+feature2
feature=normWav(feature_para,0) #normalized resultant waveform
# original data without Rawboost processing
else:
feature=feature
return feature