Skip to content

Commit

Permalink
POD-2374: Add optional Version to Staging Workspace (#198)
Browse files Browse the repository at this point in the history
* initial commit

* update logic for populating workspace metrics

* add more defaults
  • Loading branch information
sahakiann authored Jan 21, 2025
1 parent fbdaa42 commit 9cba2a5
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 35 deletions.
87 changes: 71 additions & 16 deletions python/set_up_staging_workspace_and_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from utils.requests_utils.request_util import RunRequest
from utils.token_util import Token
from utils.gcp_utils import GCPCloudFunctions
from utils import GCP, comma_separated_list
from utils import GCP, comma_separated_list, ARG_DEFAULTS


logging.basicConfig(
format="%(levelname)s: %(asctime)s : %(message)s", level=logging.INFO
Expand Down Expand Up @@ -88,6 +89,13 @@ def get_args() -> Namespace:
action="store_true",
help="If dataset already exists, delete it before creating a new one",
)
parser.add_argument(
"--workspace_version",
help="The version of the workspace. This should be used when more data needs to be "
"uploaded to a dataset, but it's not ready to be made public yet.",
required=False,
type=int,
)
return parser.parse_args()


Expand All @@ -100,7 +108,8 @@ def __init__(
continue_if_exists: bool,
controlled_access: bool,
resource_owners: list[str],
resource_members: Optional[list[str]]
resource_members: Optional[list[str]],
workspace_version: Optional[int],
):
self.terra_workspace = terra_workspace
self.terra_groups = terra_groups
Expand All @@ -109,10 +118,12 @@ def __init__(
self.controlled_access = controlled_access
self.resource_owners = resource_owners
self.resource_members = resource_members
self.workspace_version = workspace_version

def _set_up_access_group(self) -> None:
logging.info(f"Creating group {self.auth_group}")
self.terra_groups.create_group(group_name=self.auth_group, continue_if_exists=self.continue_if_exists)
continue_if_exists = True if self.workspace_version else self.continue_if_exists
self.terra_groups.create_group(group_name=self.auth_group, continue_if_exists=continue_if_exists)
for user in self.resource_owners:
self.terra_groups.add_user_to_group(email=user, group=self.auth_group, role=ADMIN)
if self.resource_members:
Expand Down Expand Up @@ -199,7 +210,8 @@ def __init__(
controlled_access: bool,
terra_billing_project: str,
delete_existing_dataset: bool,
phs_id: Optional[str] = None
workspace_version: Optional[int],
phs_id: Optional[str] = None,
):
self.tdr = tdr
self.dataset_name = dataset_name
Expand All @@ -212,6 +224,7 @@ def __init__(
self.auth_group = auth_group
self.delete_existing_dataset = delete_existing_dataset
self.controlled_access = controlled_access
self.workspace_version = workspace_version

def _create_dataset_properties(self) -> dict:
additional_properties = {
Expand All @@ -225,17 +238,53 @@ def _create_dataset_properties(self) -> dict:
return additional_properties

def _add_row_to_table(self, dataset_id: str) -> None:
dataset_metrics = tdr.get_dataset_table_metrics(dataset_id=dataset_id, target_table_name=self.REFERENCE_TABLE)
workspace_billing_combo = f"{self.terra_billing_project}/{self.workspace_name}"

ingest_records = []
if not dataset_metrics:
ingest_records.extend(
[
{
"key": "Staging Workspace",
"value": workspace_billing_combo,
},
{
"key": "Authorization Group",
"value": self.auth_group
}
]
)
else:
linked_workspaces = [w["value"] for w in dataset_metrics if dataset_metrics]
if workspace_billing_combo not in linked_workspaces:
if not self.workspace_version:
ingest_records.extend(
[
{
"key": "Staging Workspace",
"value": workspace_billing_combo
}
]
)
else:
ingest_records.extend(
[
{
"key": f"Staging Workspace Version {self.workspace_version}",
"value": workspace_billing_combo
}
]
)

StartAndMonitorIngest(
ingest_records=[
{"key": "Staging Workspace", "value": f'{self.terra_billing_project}/{self.workspace_name}'},
{"key": "Authorization Group", "value": self.auth_group}
],
ingest_records=ingest_records,
target_table_name=self.REFERENCE_TABLE,
dataset_id=dataset_id,
load_tag=f"{dataset_name}_initial_load",
bulk_mode=False,
update_strategy="replace",
waiting_time_to_poll=15,
update_strategy=ARG_DEFAULTS["update_strategy"],
waiting_time_to_poll=ARG_DEFAULTS["waiting_time_to_poll"],
tdr=self.tdr,
).run()

Expand Down Expand Up @@ -308,7 +357,7 @@ def run(self) -> None:
self.terra_groups.remove_user_from_group(
group=self.auth_group,
email=self.current_user_email,
role="admin"
role=ADMIN
)


Expand Down Expand Up @@ -481,6 +530,7 @@ def run(self) -> list[WorkflowConfigs]:
wdls_to_import = args.wdls_to_import
notebooks_to_import = args.notebooks_to_import
delete_existing_dataset = args.delete_existing_dataset
workspace_version = args.workspace_version if args.workspace_version and args.workspace_version > 1 else None

# Validate wdls to import are valid and exclude any that are not
if wdls_to_import:
Expand All @@ -489,12 +539,15 @@ def run(self) -> list[WorkflowConfigs]:
if wdl in GetWorkflowNames().get_workflow_names()
]

workspace_name = f"{dataset_name}_Staging"
workspace_name = (
f"{dataset_name}_Staging_v{workspace_version}" if workspace_version else f"{dataset_name}_Staging"
)
auth_group = f"AUTH_{dataset_name}"

# Set up Terra, TerraGroups, and TDR classes
token = Token(cloud=GCP)
request_util = RunRequest(token=token, max_retries=5, max_backoff_time=60)
request_util = RunRequest(
token=token, max_retries=ARG_DEFAULTS["max_retries"], max_backoff_time=ARG_DEFAULTS["max_backoff_time"])
tdr = TDR(request_util=request_util)
terra_groups = TerraGroups(request_util=request_util)
terra_workspace = TerraWorkspace(
Expand All @@ -511,7 +564,8 @@ def run(self) -> list[WorkflowConfigs]:
continue_if_exists=continue_if_exists,
controlled_access=controlled_access,
resource_owners=resource_owners,
resource_members=resource_members
resource_members=resource_members,
workspace_version=workspace_version,
).run()
logging.info("Finished setting up Terra workspace")
workspace_bucket = f"gs://{terra_workspace.get_workspace_bucket()}"
Expand All @@ -521,13 +575,14 @@ def run(self) -> list[WorkflowConfigs]:
dataset_name=dataset_name,
tdr_billing_profile=tdr_billing_profile,
phs_id=phs_id,
continue_if_exists=continue_if_exists,
continue_if_exists=True if workspace_version else continue_if_exists,
terra_billing_project=terra_billing_project,
workspace_name=workspace_name,
resource_owners=resource_owners,
auth_group=auth_group,
controlled_access=controlled_access,
delete_existing_dataset=delete_existing_dataset
delete_existing_dataset=delete_existing_dataset,
workspace_version=workspace_version,
)
if delete_existing_dataset:
sa_for_dataset_to_delete = dataset_setup.get_sa_for_dataset_to_delete()
Expand Down
Loading

0 comments on commit 9cba2a5

Please # to comment.