-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path1-zip2csv-2013-2014.py
376 lines (307 loc) · 15.2 KB
/
1-zip2csv-2013-2014.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
from concurrent.futures import ProcessPoolExecutor, as_completed, ThreadPoolExecutor
from contextlib import contextmanager
from tqdm import tqdm
import gc
import pandas as pd
import subprocess
import logging
import zipfile
import shutil
import os
import geopandas as gpd
import fiona
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def process_file_with_retry(zip_file, max_retries=3, delay=5):
"""
Process a single file with retry logic.
:param zip_file: Path to the zip file to process
:param max_retries: Maximum number of retry attempts
:param delay: Delay between retries in seconds
"""
for attempt in range(max_retries):
try:
convert_gdb_to_csv(zip_file)
return # Success, exit the function
except Exception as e:
logging.error(f"Attempt {attempt + 1} failed for {zip_file}: {str(e)}")
if attempt < max_retries - 1:
logging.info(f"Retrying in {delay} seconds...")
time.sleep(delay)
else:
logging.error(f"All attempts failed for {zip_file}")
@contextmanager
def managed_gdf(gdf):
"""
Context manager to ensure proper cleanup of GeoPandas dataframes.
"""
try:
yield gdf
finally:
del gdf
gc.collect()
def convert_gdb_to_csv(zip_file, output_folder="csv/"):
"""
Converts a file in GDB format to CSV format.
:param zip_file: The path to the GDB zip file.
:param output_folder: The folder where the CSV files will be saved. Default is "csv/".
:return: None
"""
gdb_path = unzip_into_directory(zip_file)
if gdb_path is None:
logging.warning(f"No valid GDB folder found in {zip_file}")
return
os.makedirs(output_folder, exist_ok=True)
layers = fiona.listlayers(gdb_path)
file_identifier = gdb_path.split('/')[1].split('.')[0]
for layer in layers:
try:
with managed_gdf(gpd.read_file(gdb_path, layer=layer)) as gdf:
if 'geometry' in gdf.columns:
gdf['X'] = gdf.geometry.x
gdf['Y'] = gdf.geometry.y
csv_path = os.path.join(output_folder, f"{file_identifier}_{layer}.csv")
# Write the DataFrame to CSV in chunks
chunk_size = 100000 # Adjust based on available memory
for i in range(0, len(gdf), chunk_size):
chunk = gdf.iloc[i:i+chunk_size]
mode = 'w' if i == 0 else 'a'
chunk.to_csv(csv_path, index=False, mode=mode, header=(i == 0))
# # Recycle GeoDataframe after processing
# del gdf
# gc.collect()
except Exception as e:
print(f"Failed to process layer {file_identifier}_{layer}: {e}")
join_files(file_identifier)
shutil.rmtree(gdb_path)
def join_files(file_suffix, output_folder="unified/", csv_path="csv/"):
"""
Joins multiple CSV files into a single file.
:param file_suffix: The suffix of the files to be joined.
:param output_folder: The folder where the joined file will be saved. Default is "unified/".
:param csv_path: The path where the CSV files are located. Default is "csv/".
:return: None
"""
os.makedirs(output_folder, exist_ok=True)
dfs = []
for csv_file in [f'{csv_path}{file_suffix}_{file_suffix}_{name}.csv' for name in ['Broadcast', 'Voyage', 'Vessel']]:
try:
dfs.append(pd.read_csv(csv_file, dtype={'MMSI': str, 'VoyageID': str}, low_memory=False, engine="c"))
except Exception as e:
print(f"Failed to read CSV file {csv_file}: {e}")
try:
broadcast_vessel_joined = pd.merge(dfs[0], dfs[2], on='MMSI', how='left')
final_joined_df = pd.merge(broadcast_vessel_joined, dfs[1], on=['VoyageID', 'MMSI'], how='left')
selected_final_df = final_joined_df[[
'MMSI', 'BaseDateTime', 'Y', 'X', 'SOG', 'COG', 'Heading',
'Name', 'IMO', 'CallSign', 'VesselType', 'Status',
'Length', 'Width', 'Draught', 'Cargo'
]]
renamed_final_df = selected_final_df.rename(
columns={
'Y': 'LAT',
'X': 'LON',
'Draught': 'Draft',
'Name': 'VesselName'
}
)
renamed_final_df.to_csv(f'{output_folder}/{file_suffix}_UNIFIED.csv', index=False)
except Exception as e:
print(f"Failed to join data layers of {file_suffix}: {e}")
def unzip_into_directory(zip_path, output_folder="gdb/"):
"""
Unzips the given ZIP file into the specified output folder. If the ZIP contains a folder that
itself contains a .gdb folder, moves the .gdb folder up to the output folder. Returns the full
path to the unzipped .gdb folder.
:param zip_path: The path to the ZIP file.
:param output_folder: The target folder for the extracted contents. Defaults to "gdb/".
:return: The full path to the unzipped folder ending with .gdb.
"""
gdb_folder_full_path = None # Initialize variable to store the full path of the .gdb folder
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(output_folder)
extracted_paths = zip_ref.namelist()
# Identify the initial extraction path, assuming the ZIP contains a single top-level directory
if extracted_paths:
initial_folder_name = extracted_paths[0].split('/')[0]
full_folder_path = os.path.join(output_folder, initial_folder_name)
# Check if the extracted content itself is the .gdb folder
if full_folder_path.endswith('.gdb'):
gdb_folder_full_path = full_folder_path
else:
# Look for a .gdb folder inside the extracted content
for root, dirs, files in os.walk(full_folder_path):
for dir_name in dirs:
if dir_name.endswith('.gdb'):
gdb_folder_path = os.path.join(root, dir_name)
target_path = os.path.join(output_folder, dir_name)
if not os.path.exists(target_path):
shutil.move(gdb_folder_path, output_folder)
gdb_folder_full_path = target_path
else:
# Handle case where the target .gdb folder already exists in the output folder
gdb_folder_full_path = target_path
# Log the root folder before deleting to ensure it's the correct one
print(f"Cleaning up: {root}")
# Cleanup: Remove any intermediate directories left behind after moving the .gdb folder
if root != output_folder and os.path.exists(root):
shutil.rmtree(root)
return gdb_folder_full_path
def process_directory(curr_directory="data/", max_workers=2):
"""
Process the given directory to convert GDB files to CSV files using multi-processing.
:param curr_directory: The path of the current directory (default="data/")
:param max_workers: Maximum number of worker processes to use
:return: None
"""
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = []
zip_files = [os.path.join(curr_directory, d) for d in os.listdir(curr_directory) if d.endswith('.zip')]
for zip_file in zip_files:
futures.append(executor.submit(process_file_with_retry, zip_file))
for future in tqdm(as_completed(futures), total=len(futures)):
try:
future.result()
except Exception as e:
print(f"Error occurred during processing: {e}")
if __name__ == "__main__":
process_directory(max_workers=2)
# from concurrent.futures import ProcessPoolExecutor, as_completed, ThreadPoolExecutor
# from tqdm import tqdm
# import gc
# import pandas as pd
# import subprocess
# import zipfile
# import shutil
# import os
# import geopandas as gpd
# import fiona
# def convert_gdb_to_csv(zip_file, output_folder="csv/"):
# """
# Converts a file in GDB format to CSV format.
# :param zip_file: The path to the GDB zip file.
# :param output_folder: The folder where the CSV files will be saved. Default is "csv/".
# :return: None
# """
# gdb_path = unzip_into_directory(zip_file) # export and return the path
# os.makedirs(output_folder, exist_ok=True)
# layers = fiona.listlayers(gdb_path)
# file_identifier = gdb_path.split('/')[1].split('.')[0]
# for layer in layers:
# try:
# # Read the layer using GeoPandas
# gdf = gpd.read_file(gdb_path, layer=layer)
# # Extract X (Longitude) and Y (Latitude) coordinates from the geometry column
# if 'geometry' in gdf.columns:
# gdf['X'] = gdf.geometry.x # Longitude
# gdf['Y'] = gdf.geometry.y # Latitude
# # Define CSV file path
# csv_path = os.path.join(output_folder, f"{file_identifier}_{layer}.csv")
# # Export to CSV and recycle the GeoPandas dataframe
# gdf.to_csv(csv_path, index=False)
# del gdf
# gc.collect()
# except Exception as e:
# print(f"Failed to process layer {file_identifier}_{layer}: {e}")
# join_files(file_identifier) # join the multiple layers into a single shared file format
# shutil.rmtree(gdb_path)
# def join_files(file_suffix, output_folder="unified/", csv_path="csv/"):
# """
# Joins multiple CSV files into a single file.
# :param file_suffix: The suffix of the files to be joined.
# :param output_folder: The folder where the joined file will be saved. Default is "unified/".
# :param csv_path: The path where the CSV files are located. Default is "csv/".
# :return: None
# """
# os.makedirs(output_folder, exist_ok=True)
# broadcast_df = pd.read_csv(f'{csv_path}{file_suffix}_{file_suffix}_Broadcast.csv',
# dtype={'MMSI': str, 'VoyageID': str}, low_memory=False, engine="c")
# voyage_df = pd.read_csv(f'{csv_path}{file_suffix}_{file_suffix}_Voyage.csv',
# dtype={'VoyageID': str, 'MMSI': str}, low_memory=False, engine="c")
# vessel_df = pd.read_csv(f'{csv_path}{file_suffix}_{file_suffix}_Vessel.csv',
# dtype={'MMSI': str}, low_memory=False, engine="c")
# try:
# # >>> perform the join between Broadcast and Vessel on 'MMSI'
# broadcast_vessel_joined = pd.merge(broadcast_df, vessel_df, on='MMSI', how='left')
# # >>> perform the join between the result of the first join and Voyage on 'VoyageID'
# final_joined_df = pd.merge(broadcast_vessel_joined, voyage_df, on=['VoyageID', 'MMSI'], how='left')
# except Exception as e:
# print(f"Failed to join data layers of {file_suffix}: {e}")
# try:
# selected_final_df = final_joined_df[[
# # Select and reorder the columns in the final joined DataFrame
# 'MMSI', 'BaseDateTime', 'Y', 'X', 'SOG', 'COG', 'Heading',
# 'Name', 'IMO', 'CallSign', 'VesselType', 'Status',
# 'Length', 'Width', 'Draught', 'Cargo'
# ]]
# # Renaming files to match more recent naming system
# renamed_final_df = selected_final_df.rename(
# columns={
# 'Y': 'LAT',
# 'X': 'LON',
# 'Draught': 'Draft',
# 'Name': 'VesselName'
# }
# )
# except Exception as e:
# print(f"Failed to select and join prefered fields of {file_suffix}: {e}")
# # Save the final DataFrame to a new CSV file
# renamed_final_df.to_csv(f'{output_folder}/{file_suffix}_UNIFIED.csv', index=False)
# def unzip_into_directory(zip_path, output_folder="gdb/"):
# """
# Unzips the given ZIP file into the specified output folder. If the ZIP contains a folder that
# itself contains a .gdb folder, moves the .gdb folder up to the output folder. Returns the full
# path to the unzipped .gdb folder.
# :param zip_path: The path to the ZIP file.
# :param output_folder: The target folder for the extracted contents. Defaults to "gdb/".
# :return: The full path to the unzipped folder ending with .gdb.
# """
# gdb_folder_full_path = None # Initialize variable to store the full path of the .gdb folder
# with zipfile.ZipFile(zip_path, 'r') as zip_ref:
# zip_ref.extractall(output_folder)
# extracted_paths = zip_ref.namelist()
# # Identify the initial extraction path, assuming the ZIP contains a single top-level directory
# if extracted_paths:
# initial_folder_name = extracted_paths[0].split('/')[0]
# full_folder_path = os.path.join(output_folder, initial_folder_name)
# # Check if the extracted content itself is the .gdb folder
# if full_folder_path.endswith('.gdb'):
# gdb_folder_full_path = full_folder_path
# else:
# # Look for a .gdb folder inside the extracted content
# for root, dirs, files in os.walk(full_folder_path):
# for dir_name in dirs:
# if dir_name.endswith('.gdb'):
# gdb_folder_path = os.path.join(root, dir_name)
# target_path = os.path.join(output_folder, dir_name)
# if not os.path.exists(target_path):
# shutil.move(gdb_folder_path, output_folder)
# gdb_folder_full_path = target_path
# else:
# # Handle case where the target .gdb folder already exists in the output folder
# gdb_folder_full_path = target_path
# # Log the root folder before deleting to ensure it's the correct one
# print(f"Cleaning up: {root}")
# # Cleanup: Remove any intermediate directories left behind after moving the .gdb folder
# if root != output_folder and os.path.exists(root):
# shutil.rmtree(root)
# return gdb_folder_full_path
# def process_directory(curr_directory="data/"):
# """
# Process the given directory to convert GDB files to CSV files using multi-threading.
# :param curr_directory: The path of the current directory (default="zip/")
# :return: None
# """
# # with ProcessPoolExecutor(max_workers=2) as executor:
# with ProcessPoolExecutor(max_workers=2) as executor:
# futures = [] # store the results of the threads
# zip_files = [os.path.join(curr_directory, d) for d in os.listdir(curr_directory) if d.endswith('.zip')]
# for zip_file in zip_files:
# futures.append(executor.submit(convert_gdb_to_csv, zip_file))
# for future in tqdm(as_completed(futures), total=len(futures)):
# try:
# future.result() # This will raise any exception caught during processing
# except Exception as e:
# print(f"Error occurred during processing: {e}")
# if __name__ == "__main__":
# process_directory()