-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfile_cache.py
251 lines (210 loc) · 9.61 KB
/
file_cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
"""
Created 05-26-18 by Matthew C. McCallum
"""
# Local imports
# None.
# Third party imports
# None.
# Python standard library imports
import os
import math
import shutil
import multiprocessing
import random
import pickle
import logging
import copy
logger = logging.getLogger(__name__)
class FileCache(object):
"""
This class is intended to take a location with a lot of URLs and move it chunk by chunk to a pair of local
directories.
This is useful for situations where a system has a large but slow storage location coupled with a smaller but faster
local storage location, e.g., an SSD. In this way, a small chunk can be operated on at a fast speed, while a
background process moves over the next chunk for processing from the larger storage location.
"""
CACHE_METADATA_FNAME = ".cache.pkl"
def __init__(self, from_urls, to_dir, size=None):
"""
Constructor.
Args:
from_dir: str - URL to the larger storage location that data with the provided extension will be cached
from.
to_dir: str - URL to the cache location which groups of files less than size will be copied over two. As the
cache requires two groups at this location, this location must have storage space of at least 2*size.
ext: str - Extension of the files to be cached, including the '.' prefix.
size: int - Size in bytes of each cache group. Two of these groups are copied to 'to_dir' simultaneously. If
None, then the whole file repo is copied across.
"""
if not size:
size = 1024*1024*1024*1024*1024 # 1 petabyte should be bigger than this class ever has to deal with.
# Initialise member variables
self._pool = None
self._cache_dir = to_dir
self._currently_caching = False
self._cache_a = None
self._cache_b = None
self._current_group = None
self._current_cache = None
self._all_files = None
self._all_sizes = None
self._cache_groups = None
# Check if we already have the cache information saved
metadata_filename = os.path.join(to_dir, self.CACHE_METADATA_FNAME)
if os.path.exists(metadata_filename):
# TODO [matthew.mccallum 05.26.18] There should be some checking for cache integrity here, e.g.:
# - Files currently in the cache match one of the cache groups
# - Cache groups match all filenames
# - File sizes are correct
# - Cache directories are correct
with open(metadata_filename, 'rb') as metadata_file:
metadata = pickle.load(metadata_file)
self._all_files = metadata['_all_files']
self._all_sizes = metadata['_all_sizes']
self._cache_groups = metadata['_cache_groups']
self._current_group = metadata['_current_group']
self._current_cache = metadata['_current_cache']
self._cache_a = metadata['_cache_a']
self._cache_b = metadata['_cache_b']
# NOTE: It is up to the user of this class to prepare the next cache, in this scenario, i.e., if one is already prepared.
else:
# Get all filenames and their sizes
self._all_files = from_urls
random.shuffle(self._all_files)
self._all_sizes = [0]*len(self._all_files)
total_size = 0
for ind, filename in enumerate(self._all_files):
self._all_sizes[ind] = os.path.getsize(filename)
total_size += self._all_sizes[ind]
# Separate into cache groups.
num_groups = int(math.ceil(total_size/size))
size_per_group = total_size/num_groups
filename_index = 0
self._cache_groups = []
for group_index in range(num_groups):
this_size = 0
self._cache_groups += [[]]
group = self._cache_groups[-1]
while (filename_index < len(self._all_files)) and (this_size + self._all_sizes[filename_index] < size_per_group):
group += [self._all_files[filename_index]]
this_size += self._all_sizes[filename_index]
filename_index += 1
# Initialise member variables
self._cache_a = os.path.join(self._cache_dir, 'a')
self._cache_b = os.path.join(self._cache_dir, 'b')
self._current_group = len(self._cache_groups) # Set to out of range index to indicate no current group
self._current_cache = self._cache_a
# Prepare the next cache.
self.PrepareNextCache()
def SaveState(self):
"""
Saves the current state of the cache to file, in case it needs to be used next time - to prevent having
to prefill the cache a second time.
"""
metadata = {
'_all_files': self._all_files,
'_all_sizes': self._all_sizes,
'_cache_groups': self._cache_groups,
'_current_group': self._current_group,
'_current_cache': self._current_cache,
'_cache_a': self._cache_a,
'_cache_b': self._cache_b
}
with open(os.path.join(self._cache_dir, self.CACHE_METADATA_FNAME), 'wb') as metadata_file:
pickle.dump(metadata, metadata_file, pickle.HIGHEST_PROTOCOL)
@property
def _next_group(self):
"""
Gets the next group to be cached.
Return:
int - Index of the next group to be cached, or that is currently caching.
"""
# An out of range current_group implies no groups are cached, start at 0.
if self._current_group==len(self._cache_groups):
return 0
else:
return (self._current_group + 1) % len(self._cache_groups)
@property
def current_files(self):
"""
Returns all files accessible in the current cache.
Return:
list(str) - A list of files safe for reading, that are in the cache currently.
"""
# An out of range current_group implies no groups are cached.
if self._current_group==len(self._cache_groups):
return []
else:
return [os.path.join(self._current_cache, os.path.basename(fname)) for fname in self._cache_groups[self._current_group]]
@staticmethod
def _copy_set(file_list, directory):
"""
A method that copies a list of files to a given directory. If that directory already exists, it will be cleared
and replaced.
This is intended to be called asynchronously as a background process.
Args:
file_list: str - A list of files, including complete paths, to be copied to the provided directory.
directory: str - A URL location to copy the specified files to.
Return:
bool - Returns true upon successful completion.
"""
# Clear directory first
if os.path.exists(directory):
shutil.rmtree(directory)
os.mkdir(directory)
# Copy all files
for filename in file_list:
shutil.copy(filename, directory)
# Return success
return True
def _copy_callback(self, arg):
"""
A callback to be called on cache copying completion or error. It will either raise the error locally, or update
the caching state.
Args:
arg: bool or Exception: The return value from an asynchronous process. Either a success / fail value or an
exception.
"""
if isinstance(arg, Exception):
self._currently_caching = False
raise arg
elif arg:
self._currently_caching = False
else:
raise Exception # arg = False should indicate unsuccessful caching, but this does not currently occur.
def PrepareNextCache(self):
"""
Prepares the next cache which may be later switched to when it is ready.
This preparation is performed asynchronously in a background process.
"""
logger.info("Peparing cache group at index: " + str(self._next_group))
self._pool = multiprocessing.Pool(1, maxtasksperchild=1)
self._currently_caching = True
# Copy all files to cache in another process
if self._current_cache != self._cache_a: new_cache = self._cache_a
else: new_cache = self._cache_b
self._pool.apply_async(self._copy_set,
(self._cache_groups[self._next_group], new_cache),
callback=self._copy_callback,
error_callback=self._copy_callback)
self._pool.close()
def IsCaching(self):
"""
Returns True if there is currently a background process caching a set of files locally.
"""
return self._currently_caching
def Update(self):
"""
This will switch over to the next cache, if it is ready, and start preparing the cache after that.
If the next cache is not ready, no action will be taken.
"""
if not self.IsCaching():
logger.info("Moving to next cache at index: " + str(self._next_group))
print('Next Cache!')
if self._current_cache != self._cache_a: self._current_cache = self._cache_a
else: self._current_cache = self._cache_b
self._currently_caching = True
self._current_group = self._next_group
self.PrepareNextCache()
# We have just switched to a fresh, fully prepared cache. This is a good time to save state.
self.SaveState()