-
Notifications
You must be signed in to change notification settings - Fork 0
/
data.py
231 lines (165 loc) · 13 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import glob
import pickle
import operator
import itertools
import collections
import numpy as np
import metadatabase as mdb
from astropy.time import Time
class Data(collections.UserDict):
def __init__(self, count_result_set, locate_result_set, parent_directory):
super().__init__(mdb._load(count_result_set, locate_result_set, parent_directory))
@classmethod
def via_metadatabase(cls, categories=['Antenna', 'Switch', 'Temperature'], instruments=['100MHz', '70MHz'], channels=['EW', 'NS'], intervals=[(1524400000.0,1524500000.0),], quality=[1, 0, 'NULL'], integrity=[1, 0, 'NULL'], completeness=[1, 0, 'NULL'], selection=None):
""" Loads all data files matching the input arguments. """
if selection == None:
# Generates the needed result sets from the input arguments.
count_result_set = mdb.count(categories, instruments, channels, intervals, quality, integrity, completeness)
locate_result_set = mdb.locate(categories, instruments, channels, intervals, quality, integrity, completeness)
else:
# Loads the needed result sets from the input pickle file.
count_result_set, locate_result_set = pickle.load(open(selection, 'rb'))
return cls(count_result_set, locate_result_set, mdb._directories['data'])
@classmethod
def from_directories(cls, directory_addresses=['~'], classification_catalogue={'data_70MHz': '70MHz', 'data_100MHz': '100MHz'}, file_catalogue={'pol0.scio': ('float',['EW'],'pol'), 'pol1.scio': ('float',['NS'],'pol'), 'pol0.scio.bz2': ('float',['EW'],'pol'), 'pol1.scio.bz2': ('float',['NS'],'pol'), 'time_sys_stop.raw': ('float',['EW','NS'],'time_sys_stop'), 'time_sys_stop.raw': ('float',['EW','NS'],'time_sys_stop'), 'open.scio': ('float',['Switch'],'open'), 'short.scio': ('float',['Switch'],'short'), 'noise.scio': ('float',['Switch'],'noise'), 'res50.scio': ('float',['Switch'],'res50'), 'res100.scio': ('float',['Switch'],'res100'), 'antenna.scio': ('float',['Switch'],'antenna'), 'open.scio.bz2': ('float',['Switch'],'open'), 'short.scio.bz2': ('float',['Switch'],'short'), 'noise.scio.bz2': ('float',['Switch'],'noise'), 'res50.scio.bz2': ('float',['Switch'],'res50'), 'res100.scio.bz2': ('float',['Switch'],'res100'), 'antenna.scio.bz2': ('float',['Switch'],'antenna')}):
""" Loads the cataloged files located under the input directory addresses, organizing them according to the input classification catalogue. """
# Initializes the result sets.
count_result_set = []
locate_result_set = []
# Extracts all file names.
file_names = list(file_catalogue.keys())
# Collects the classification, path, name, alias, and data type of each cataloged file located within the input directory addresses.
for directory_address, file_name in itertools.product(*[directory_addresses, file_names]):
for file_path in glob.iglob(directory_address + '/**/' + file_name, recursive=True):
classification_name = [classification for key, classification in classification_catalogue.items() if key in file_path.split('/')][0]
file_extension = file_name.split('.')[1]
data_type, subclassification_names, file_alias = file_catalogue[file_name]
for subclassification_name in subclassification_names:
locate_result_set.append((classification_name, subclassification_name, file_path, file_extension, file_alias, data_type))
# Sorts the collected metadata.
locate_result_set.sort()
# Collects the classification, name, alias, data type, and count of each cataloged file of the same type located within the input directory addresses.
for ((classification_name, subclassification_name, file_extension, file_alias, data_type), file_count) in collections.Counter([(classification_name, subclassification_name, file_extension, file_alias, data_type) for (classification_name, subclassification_name, _, file_extension, file_alias, data_type) in locate_result_set]).items():
count_result_set.append((classification_name, subclassification_name, file_extension, file_alias, data_type, file_count))
return cls(count_result_set, locate_result_set, '')
def lst(self, instruments=['100MHz', '70MHz'], channels=['EW', 'NS'], location=('37.819638d', '-46.88694d')):
""" Produces local sidereal time entries for each input instrument and channel. The default location is set to PRIZM's deployment site at Marion island."""
lst(self, instruments, channels, location)
return
def partition(self, instruments=['100MHz', '70MHz'], channels=['EW', 'NS'], buffer=(0, 0)):
""" Produces partitions which slice the data according to the instrument's switch states. """
partition(self, instruments, channels, buffer)
return
def get(self, data='pol', instrument='100MHz', channel='EW', partition='antenna'):
""" Extracts the data partition associated with the input instrument and channel. """
return get(self, data, instrument, channel, partition)
def interpolate(self, times, instrument='100MHz', channel='EW', partition='short', threshold=500):
""" Employs linear interpolation over a given data partition to extrapolate spectra for each input time. """
return interpolate(self, times, instrument, channel, partition, threshold)
def iso(ctimes):
""" Converts UNIX times (UTC) to the ISO 8601 compliant date-time format 'YYYY-MM-DD HH:MM:SS.sss'. """
return Time(ctimes, format='unix', scale='utc').iso
def _interpolant(interpolation_data):
""" Linear interpolant used to extrapolate spectra of a given kind over time. """
# Unpacks the input interpolation data.
x0, y0, x1, y1, x = interpolation_data
# Returns the linearly interpolated spectrum.
if x0 == x1:
return y0
else:
return y0*(x1 - x)/(x1 - x0) + y1*(x - x0)/(x1 - x0)
def lst(self, instruments=['100MHz', '70MHz'], channels=['EW', 'NS'], location=('37.819638d', '-46.88694d')):
""" Produces local sidereal time entries for each input instrument and channel. The default location is set to PRIZM's deployment site at Marion island."""
for channel, instrument in itertools.product(*[channels, instruments]):
# Converts UNIX time to local sidereal time.
self[instrument][channel]['lst_sys_start'] = Time(self[instrument][channel]['time_sys_start'], format='unix', scale='utc', location=location).sidereal_time('apparent').value
self[instrument][channel]['lst_sys_stop'] = Time(self[instrument][channel]['time_sys_stop'], format='unix', scale='utc', location=location).sidereal_time('apparent').value
return
def partition(self, instruments=['100MHz', '70MHz'], channels=['EW', 'NS'], buffer=(0, 0)):
""" Produces partitions which slice the data according to the instrument's switch states. """
for channel, instrument in itertools.product(*[channels, instruments]):
# Ensures the data dictionary has an entry for data partitions.
if 'Partitions' not in self[instrument][channel].keys():
self[instrument][channel]['Partitions'] = {}
for file_alias, switch_data in self[instrument]['Switch'].items():
# Removes switch states with invalid timestamps, as well as unpaired switch states.
switch_data = np.array([list(group)[0] for timestamp, group in itertools.groupby(switch_data, lambda entry: entry[1]) if timestamp != 0])
switch_data = np.array([list(group)[-1] if state == 1 else list(group)[0] for state, group in itertools.groupby(switch_data, lambda entry: entry[0])])
if switch_data[-1,0] == 1: switch_data = switch_data[:-1,:]
# Initializes an empty list of data portions.
portions = list()
# Produces the data portions.
for _, start, _, stop in np.reshape(switch_data, (-1,4)):
beginning = np.searchsorted(self[instrument][channel]['time_sys_start'], start) + buffer[0]
end = np.searchsorted(self[instrument][channel]['time_sys_stop'], stop, side='right') - buffer[1]
if beginning < end: portions.append(slice(beginning, end, None))
self[instrument][channel]['Partitions'][file_alias] = portions
return
def get(self, data='pol', instrument='100MHz', channel='EW', partition='antenna'):
""" Extracts the data partition associated with the input instrument and channel. """
if partition == None:
return self[instrument][channel][data]
else:
return np.r_[operator.itemgetter(*self[instrument][channel]['Partitions'][partition])(self[instrument][channel][data])]
def interpolate(self, times, instrument='100MHz', channel='EW', partition='short', threshold=500):
""" Employs linear interpolation over a given data partition to extrapolate spectra for each input time. """
# The data to be interpolated: y = _interpolant(x)
x = self[instrument][channel]['time_sys_start']
y = self[instrument][channel]['pol']
# Warning!
if threshold >= (x[-1] - x[0]):
raise Exception("The chosen threshold exceeds the time interval spanned by the data.")
# Allocates the interpolation result.
interpolation = np.empty((len(times), y.shape[1]))
# Collects the appropriate portions of the data located in the vicinity of each input time value.
portions = self[instrument][channel]['Partitions'][partition]
slices = [slice(index, index + 1, None) for index in np.searchsorted(x, times)]
portions = [(portions[index - 2], portions[index - 1], portions[index % len(portions)], portions[(index + 1) % len(portions)]) for index in np.searchsorted(portions, slices)]
# Selects pairs of data portions to be used in the interpolation.
for index, (portion, time) in enumerate(zip(portions, times)):
# Identifies which data portions in the vicinity of the current time satisfy the input threshold.
pattern = list(map(np.mean, operator.itemgetter(*portion)(x)))
pattern.insert(2, time)
pattern = np.abs(np.diff(pattern)) < threshold
# Picks the appropriate pair of data portions based on the pattern identified above.
if np.all(pattern[1:3] == [True,True]): selection = (portion[1],portion[2])
elif np.all(pattern[0:2] == [True,True]): selection = (portion[0],portion[1])
elif np.all(pattern[2:4] == [True,True]): selection = (portion[2],portion[3])
elif np.all(pattern[0:3] == [False,True,False]): selection = (portion[1],portion[1])
elif np.all(pattern[1:4] == [False,True,False]): selection = (portion[2],portion[2])
else:
# None of the data portions clear the input threshold.
interpolation[index,:].fill(np.nan)
continue
# Interpolates.
interpolation[index,:] = _interpolant((x[selection[0]].mean(), y[selection[0]].mean(axis=0), x[selection[1]].mean(), y[selection[1]].mean(axis=0), time))
return interpolation
def prep_gsmcal_data(instruments=['100MHz', '70MHz'], channels=['EW','NS'], years=['2021', '2018']):
gsmcal_data = {}
time_data ={}
selections = {'100MHz':{'EW': {'2018': './selections/2018_100MHz_EW_Prototyping.p',
'2021': './selections/2021_100MHz_EW_Partial.p'},
'NS': {'2018': './selections/2018_100MHz_NS.p',
'2021':'./selections/2021_100MHz_NS_Partial.p'}
},
'70MHz':{'EW': {'2018': './selections/2018_70MHz_EW_Partial.p',
'2021': './selections/2021_70MHz_EW_Partial.p'},
'NS': {'2018':'./selections/2018_70MHz_NS_Partial.p',
'2021': './selections/2021_70MHz_NS_Partial.p'}
}
}
for channel, instrument, year in itertools.product(*[channels, instruments, years]):
data = Data.via_metadatabase(selection = selections[instrument][channel][year])
data.partition(instruments=[instrument], channels=[channel], buffer=(1,1))
#time = data.get(data='time_sys_start', instrument=instrument, channel=channel, partition='antenna')
shorts = data.interpolate(times=data.get(data='time_sys_start', instrument=instrument, channel=channel, partition='antenna'), instrument=instrument, channel=channel, partition='short', threshold=5000)
#shorts = data.interpolate(times=time, instrument=instrument, channel=channel, partition='short', threshold=5000)
antenna = data.get(data='pol', instrument=instrument, channel=channel, partition='antenna')
gsmcal_data[instrument] = {channel:{year: antenna-shorts}}
data.lst(instruments=[instrument], channels=[channel])
time_data = data.get(data = 'lst_sys_start', instrument=instrument, channel=channel, partition='antenna')
return gsmcal_data, time_data
# Aliases included for backwards compatibility.
timestamp_from_ctime = iso
siderealtime_from_ctime = lst
add_switch_flags = partition