Skip to content

Commit 3d82b4e

Browse files
authored
Merge pull request #207 from broadinstitute/sn_POD-2450_create_script_to_upload_files
Adding script to upload tsv and files to Terra
2 parents 04c690f + b01096b commit 3d82b4e

File tree

6 files changed

+268
-0
lines changed

6 files changed

+268
-0
lines changed

.dockstore.yml

+7
Original file line numberDiff line numberDiff line change
@@ -139,3 +139,10 @@ workflows:
139139
readMePath: /wdl/DeleteTdrRows/README.md
140140
testParameterFiles:
141141
- /wdl/DeleteTdrRows/DeleteTdrRows.wdl
142+
143+
- name: UploadMetricsAndGcpFilesToTerra
144+
subclass: WDL
145+
primaryDescriptorPath: /wdl/UploadMetricsAndGcpFilesToTerra/UploadMetricsAndGcpFilesToTerra.wdl
146+
readMePath: /wdl/UploadMetricsAndGcpFilesToTerra/README.md
147+
testParameterFiles:
148+
- /wdl/UploadMetricsAndGcpFilesToTerra/UploadMetricsAndGcpFilesToTerra.wdl
+172
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import logging
2+
from argparse import ArgumentParser, Namespace
3+
from utils.requests_utils.request_util import RunRequest
4+
from utils.token_util import Token
5+
from utils import GCP, comma_separated_list
6+
from utils.gcp_utils import GCPCloudFunctions
7+
from utils.terra_utils.terra_util import TerraWorkspace
8+
from utils.csv_util import Csv
9+
from utils import ARG_DEFAULTS
10+
import os
11+
12+
logging.basicConfig(
13+
format="%(levelname)s: %(asctime)s : %(message)s", level=logging.INFO
14+
)
15+
16+
17+
def get_args() -> Namespace:
18+
parser = ArgumentParser()
19+
parser.add_argument("--workspace_name", "-w", type=str, required=True, help="Terra workspace name")
20+
parser.add_argument("--billing_project", "-b", type=str, required=True, help="Billing project name")
21+
parser.add_argument("--metrics_tsv", "-m", type=str, required=True, help="Path to the metrics TSV file")
22+
parser.add_argument("--skip_upload_column", "-s", type=comma_separated_list,
23+
help="Column name to skip upload. Use comma separated values for multiple columns. Optional")
24+
parser.add_argument("--flatten_path", "-f", action="store_true",
25+
help="If you want to flatten all file paths and put all files in one dir. Optional")
26+
parser.add_argument("--subdir", "-d", type=str, help="Subdirectory to upload files to. Optional")
27+
parser.add_argument("--id_column", "-i", type=str, help="Column name for the id column", required=True)
28+
return parser.parse_args()
29+
30+
31+
class ConvertContents:
32+
def __init__(
33+
self,
34+
contents: list[dict],
35+
id_column: str,
36+
bucket_name: str,
37+
flatten_path: bool,
38+
subdir: str,
39+
skip_upload_column: list[str],
40+
):
41+
self.contents = contents
42+
self.id_column = id_column
43+
self.flatten_path = flatten_path
44+
self.skip_upload_column = skip_upload_column
45+
self.new_bucket_path = f"gs://{bucket_name}" if not subdir else f"gs://{bucket_name}/{subdir}"
46+
self.files_to_copy: list[dict] = []
47+
self.headers: set = set()
48+
49+
def _get_file_copy_dict(self, file_path: str) -> dict:
50+
if self.flatten_path:
51+
file_name = os.path.basename(file_path)
52+
new_path = f"{self.new_bucket_path}/{file_name}"
53+
else:
54+
path_without_bucket = '/'.join(file_path.split("/")[3:])
55+
new_path = f"{self.new_bucket_path}/{path_without_bucket}"
56+
return {"source_file": file_path, "full_destination_path": new_path}
57+
58+
@staticmethod
59+
def _check_paths_unique(file_destinations: list[str]) -> bool:
60+
seen = set()
61+
duplicates = set()
62+
for file_path in file_destinations:
63+
if file_path in seen:
64+
duplicates.add(file_path)
65+
seen.add(file_path)
66+
if duplicates:
67+
logging.error(f"Duplicate destination files paths found. Will overwrite each other: {duplicates}")
68+
return False
69+
return True
70+
71+
def _update_file_paths(self, cell_value: str) -> str:
72+
if cell_value.startswith("gs://"):
73+
copy_dict = self._get_file_copy_dict(cell_value)
74+
self.files_to_copy.append(copy_dict)
75+
return copy_dict["full_destination_path"]
76+
return cell_value
77+
78+
def _validate_results(self) -> None:
79+
dest_file_paths = [copy_dict["full_destination_path"] for copy_dict in self.files_to_copy]
80+
if not self._check_paths_unique(dest_file_paths):
81+
raise ValueError("Duplicate destination file paths found. Will overwrite each other.")
82+
if f"entity:{self.id_column}" not in self.headers:
83+
raise ValueError(f"ID column {self.id_column} not found in TSV file.")
84+
85+
def run(self) -> tuple[list[dict], list[dict], set]:
86+
new_tsv_contents = []
87+
# Create set of all headers
88+
for row in self.contents:
89+
new_row = {}
90+
for header, value in row.items():
91+
# Add entity: to the id column
92+
if header == self.id_column:
93+
header = f"entity:{header}"
94+
# Add header to set of headers
95+
self.headers.add(header)
96+
if self.skip_upload_column and header in self.skip_upload_column:
97+
# if skip upload column then leave as is
98+
new_row[header] = value
99+
# If value is a list then check each entry in the list
100+
elif isinstance(value, list):
101+
new_list = []
102+
for entry in value:
103+
new_list.append(self._update_file_paths(entry))
104+
new_row[header] = new_list
105+
else:
106+
new_row[header] = self._update_file_paths(value)
107+
new_tsv_contents.append(new_row)
108+
self._validate_results()
109+
return new_tsv_contents, self.files_to_copy, self.headers
110+
111+
112+
class UploadContentsToTerra:
113+
NEW_TSV = "updated_metrics.tsv"
114+
115+
def __init__(self, terra_workspace: TerraWorkspace, contents: list[dict], id_column: str, headers: set):
116+
self.terra_workspace = terra_workspace
117+
self.contents = contents
118+
self.id_column = f"entity:{id_column}"
119+
self.headers = headers
120+
121+
def run(self) -> None:
122+
header_list = [self.id_column] + [header for header in self.headers if header != self.id_column]
123+
Csv(file_path=self.NEW_TSV).create_tsv_from_list_of_dicts(
124+
list_of_dicts=self.contents,
125+
header_list=header_list
126+
)
127+
logging.info(f"Uploading {self.NEW_TSV} to Terra")
128+
self.terra_workspace.upload_metadata_to_workspace_table(self.NEW_TSV)
129+
130+
131+
if __name__ == '__main__':
132+
args = get_args()
133+
billing_project, workspace_name = args.billing_project, args.workspace_name
134+
metrics_tsv, skip_upload_column = args.metrics_tsv, args.skip_upload_column
135+
flatten_path, subdir = args.flatten_path, args.subdir
136+
137+
token = Token(cloud=GCP)
138+
request_util = RunRequest(token=token)
139+
# Create Terra object to interact with the Terra with the request_util object
140+
terra_workspace = TerraWorkspace(
141+
billing_project=billing_project,
142+
workspace_name=workspace_name,
143+
request_util=request_util
144+
)
145+
146+
workspace_bucket = terra_workspace.get_workspace_bucket()
147+
# Read in TSV file
148+
metrics_tsv_contents = Csv(file_path=metrics_tsv).create_list_of_dicts_from_tsv()
149+
150+
converted_contents, files_to_copy, headers = ConvertContents(
151+
contents=metrics_tsv_contents,
152+
id_column=args.id_column,
153+
bucket_name=workspace_bucket,
154+
flatten_path=flatten_path,
155+
subdir=subdir,
156+
skip_upload_column=skip_upload_column
157+
).run()
158+
159+
logging.info(f"Copying {len(files_to_copy)} files to {workspace_bucket}")
160+
# Copy files to new location
161+
GCPCloudFunctions().multithread_copy_of_files_with_validation(
162+
files_to_copy=files_to_copy,
163+
workers=ARG_DEFAULTS['multithread_workers'],
164+
max_retries=5
165+
)
166+
167+
UploadContentsToTerra(
168+
terra_workspace=terra_workspace,
169+
contents=converted_contents,
170+
id_column=args.id_column,
171+
headers=headers
172+
).run()

python/utils/gcp_utils.py

+1
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,7 @@ def multithread_copy_of_files_with_validation(
382382
383383
Args:
384384
files_to_copy (list[dict]): List of dictionaries containing source and destination file paths.
385+
dict should have keys "source_file" and "full_destination_path"
385386
workers (int): Number of worker threads.
386387
max_retries (int): Maximum number of retries.
387388
"""
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# WDL Input Overview
2+
This workflow will copy metadata and files from a tsv into a Terra workspace. All GCP files linked from the sheet will be copied to the workspace bucket and entries from the TSV that contained a link to a file path will be updated to point to the new file locations.
3+
4+
## Prerequisites
5+
* Make sure that your proxy service account has access to the source files in the tsv
6+
7+
## Inputs Table:
8+
| Input Name | Description | Type | Required | Default |
9+
|-------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|----------|---------------------------------------------------------------------------------------------|
10+
| **billing_project** | The Terra billing project. | String | Yes | N/A |
11+
| **workspace_name** | The GCP Workspace ingest the metadata and files into. | String | Yes | N/A |
12+
| **metrics_tsv** | Source tsv | File | Yes | N/A |
13+
| **flatten_path** | Use if you want all paths to be in same directory. If not used it will mantain current path as source files (with bucket updated) | Boolean | Yes | N/A |
14+
| **id_column** | Column to be used as the primary key in the Terra table. Must be present in the intput tsv. | String | Yes | N/A |
15+
| **skip_upload_columns** | Pass in comma seperated list (no spaces) of columns you do not want to try copying files in from. This is only helpful if there is columns WITH file paths in it that you do NOT want copied in | String | No | N/A |
16+
| **subdir** | Subdirectory to put files into in new bucket. If flatten path is used all files will be directory in this directory. If flatten is not used then the path structure will stay intact, but all paths will start with `gs://{bucket}/{subdir}/` | String | No | N/A |
17+
| **docker** | Specifies a custom Docker image to use. Optional. | String | No | us-central1-docker.pkg.dev/operations-portal-427515/ops-toolbox/ops_terra_utils_slim:latest |
18+
19+
20+
## Outputs Table:
21+
This script does not generate any outputs directly. However, logs will be provided to track the progress of the file ingestion and metadata transfer. These logs will include details on ingestion status, any errors encountered, and retries if necessary. You can review the logs in the stderr file for detailed information.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
version 1.0
2+
3+
workflow GCPWorkspaceToDatasetIngest {
4+
input {
5+
String billing_project
6+
String workspace_name
7+
File metrics_tsv
8+
String? skip_upload_columns
9+
Boolean flatten_path
10+
String? subdir
11+
String id_column
12+
String? docker
13+
}
14+
15+
String docker_image = select_first([docker, "us-central1-docker.pkg.dev/operations-portal-427515/ops-toolbox/ops_terra_utils_slim:latest"])
16+
17+
call UploadMetricsAndFilesToTerra {
18+
input:
19+
billing_project = billing_project,
20+
workspace_name = workspace_name,
21+
metrics_tsv = metrics_tsv,
22+
skip_upload_columns = skip_upload_columns,
23+
flatten_path = flatten_path,
24+
subdir = subdir,
25+
id_column = id_column,
26+
docker_image = docker_image
27+
}
28+
}
29+
30+
task UploadMetricsAndFilesToTerra {
31+
input {
32+
String billing_project
33+
String workspace_name
34+
File metrics_tsv
35+
String? skip_upload_columns
36+
Boolean flatten_path
37+
String? subdir
38+
String id_column
39+
String docker_image
40+
}
41+
42+
command <<<
43+
python /etc/terra_utils/python/upload_metrics_and_files_to_terra.py \
44+
--billing_project ~{billing_project} \
45+
--workspace_name "~{workspace_name}" \
46+
--metrics_tsv ~{metrics_tsv} \
47+
--id_column ~{id_column} \
48+
~{if flatten_path then "--flatten_path" else ""} \
49+
~{"--skip_upload_columns" + skip_upload_columns} \
50+
~{"--subdir " + subdir}
51+
>>>
52+
53+
runtime {
54+
docker: docker_image
55+
}
56+
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"UploadMetricsAndGcpFilesToTerra.billing_project": "String",
3+
"UploadMetricsAndGcpFilesToTerra.workspace_name": "String",
4+
"UploadMetricsAndGcpFilesToTerra.metrics_tsv": "String",
5+
"UploadMetricsAndGcpFilesToTerra.skip_upload_columns": "String",
6+
"UploadMetricsAndGcpFilesToTerra.flatten_path": "Boolean",
7+
"UploadMetricsAndGcpFilesToTerra.subdir": "String",
8+
"UploadMetricsAndGcpFilesToTerra.id_column": "String",
9+
"UploadMetricsAndGcpFilesToTerra.docker": "String"
10+
}

0 commit comments

Comments
 (0)