Skip to content

Commit

Permalink
improving logic to resume job from previous execution (#121)
Browse files Browse the repository at this point in the history
* improving logic to resume job from previous execution
  • Loading branch information
John-Scira authored Oct 31, 2024
1 parent 402f23f commit 195519a
Showing 1 changed file with 29 additions and 24 deletions.
53 changes: 29 additions & 24 deletions python/azure_tdr_to_gcp_file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,30 +176,35 @@ def write_to_transfer_manifest(file_dict: dict) -> None:
# snapshot_id=args.target_id)

download_client = DownloadAzBlob(export_info=export_info, tdr_client=tdr_client)
for file in file_list[:10]:
for file in file_list:
access_url = file["fileDetail"]["accessUrl"]
download_path = f"/tmp/{Path(access_url).name}"
file_download_completed, job_logs = download_client.run(
blob_path=access_url, output_path=download_path)
file_name = Path(access_url).name
md5 = next(checksum for checksum in file["checksums"] if checksum["type"] == "md5")
gcp_upload_path = construct_upload_path(file, args)
copy_info = {
"source_path": access_url,
"destination_path": gcp_upload_path,
"md5": md5["checksum"]
}
if file_download_completed:
copy_info["download_completed_successfully"] = 'True'
logging.info(f"Uploading {file_name} to {gcp_upload_path}")
upload_blob = gcp_bucket.blob(gcp_upload_path)
upload_blob.upload_from_filename(download_path)
upload_completed = 'True' if upload_blob.exists() else 'False'
copy_info["upload_completed_successfully"] = upload_completed
write_to_transfer_manifest(copy_info)
# cleanup file before next iteration
Path(download_path).unlink()
else:
copy_info["download_completed_successfully"] = 'False'
write_to_transfer_manifest(copy_info)
logging.error(f"Failed to download {file_name}")
destination_blob = gcp_bucket.blob(gcp_upload_path)
if not destination_blob.exists():
file_download_completed, job_logs = download_client.run(
blob_path=access_url, output_path=download_path)
file_name = Path(access_url).name
md5 = next(checksum for checksum in file["checksums"] if checksum["type"] == "md5")

copy_info = {
"source_path": access_url,
"destination_path": gcp_upload_path,
"md5": md5["checksum"]
}

if file_download_completed:
copy_info["download_completed_successfully"] = 'True'
logging.info(f"Uploading {file_name} to {gcp_upload_path}")
destination_blob.upload_from_filename(download_path)
upload_completed = 'True' if destination_blob.exists() else 'False'
copy_info["upload_completed_successfully"] = upload_completed
write_to_transfer_manifest(copy_info)
# cleanup file before next iteration
Path(download_path).unlink()
else:
copy_info["download_completed_successfully"] = 'False'
write_to_transfer_manifest(copy_info)
logging.error(f"Failed to download {file_name}")
else:
logging.info("File already uploaded to target bucket, moving to next file")

0 comments on commit 195519a

Please # to comment.