Skip to content

Commit

Permalink
Extract secret from gcs
Browse files Browse the repository at this point in the history
update

move

update
  • Loading branch information
amishas157 committed Dec 16, 2024
1 parent 06884ed commit 1576b35
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 12 deletions.
5 changes: 4 additions & 1 deletion dags/external_data_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
get_airflow_metadata,
)
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
from stellar_etl_airflow.utils import access_secret

init_sentry()

Expand Down Expand Up @@ -65,7 +66,9 @@
"{{ subtract_data_interval(dag, data_interval_end).isoformat() }}",
],
use_gcs=True,
env_vars={"RETOOL_API_KEY": "{{ var.value.retool_api_key }}"},
env_vars={
"RETOOL_API_KEY": access_secret("retool-api-key-test", "default"),
},
)


Expand Down
12 changes: 1 addition & 11 deletions dags/stellar_etl_airflow/build_elementary_slack_alert_task.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
import base64
import logging
from datetime import timedelta

from airflow.configuration import conf
from airflow.models import Variable
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes import client, config
from kubernetes.client import models as k8s
from stellar_etl_airflow.default import alert_after_max_retries


def access_secret(secret_name, namespace):
config.load_kube_config()
v1 = client.CoreV1Api()
secret_data = v1.read_namespaced_secret(secret_name, namespace)
secret = secret_data.data
secret = base64.b64decode(secret["token"]).decode("utf-8")
return secret
from stellar_etl_airflow.utils import access_secret


def elementary_task(dag, task_name, command, cmd_args=[], resource_cfg="default"):
Expand Down
11 changes: 11 additions & 0 deletions dags/stellar_etl_airflow/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import base64
import logging
import re
import time

from airflow.configuration import conf
from airflow.models import Variable
from airflow.utils.state import TaskInstanceState
from kubernetes import client, config

base_log_folder = conf.get("logging", "base_log_folder")

Expand Down Expand Up @@ -100,3 +102,12 @@ def skip_retry_dbt_errors(context) -> None:
return
else:
return


def access_secret(secret_name, namespace):
config.load_kube_config()
v1 = client.CoreV1Api()
secret_data = v1.read_namespaced_secret(secret_name, namespace)
secret = secret_data.data
secret = base64.b64decode(secret["token"]).decode("utf-8")
return secret

0 comments on commit 1576b35

Please # to comment.