From 026e54fad60d4ec746dec0c71bcf0961087097cd Mon Sep 17 00:00:00 2001 From: Pankaj Singh <98807258+pankajastro@users.noreply.github.com> Date: Wed, 29 May 2024 18:09:06 +0530 Subject: [PATCH] Fetch intermediate log async GKEStartPod (#39348) * Fetch intermediate log in async GKEStartPod This PR introduces a parameter that enables the retrieval of intermediate logs for the GKEStartPod asynchronous operator. Add param last_log_time and logging_interval in GKEStartPodTrigger serialize Add optional param last_log_time in method invoke_defer_method Example DAG: start_pod = GKEStartPodOperator( task_id="start_pod", project_id=PROJECT_ID, location=LOCATION, cluster_name=GKE_CLUSTER_NAME, do_xcom_push=True, namespace=GKE_NAMESPACE, image="ubuntu:jammy", cmds=["sh", "-c", "timeout 300 bash -c 'while true; do echo \"meow\"; sleep 30; done'"], name="test-sleep", in_cluster=False, on_finish_action="delete_pod", deferrable=True, get_logs=True, logging_interval=5, gcp_conn_id=GCP_CONN_ID ) --- .../cloud/operators/kubernetes_engine.py | 15 +++--- .../cloud/triggers/kubernetes_engine.py | 2 + .../cloud/operators/test_kubernetes_engine.py | 54 +++++++++++++++++++ .../cloud/triggers/test_kubernetes_engine.py | 2 + 4 files changed, 67 insertions(+), 6 deletions(-) diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py b/airflow/providers/google/cloud/operators/kubernetes_engine.py index 0a28809dd1597..f9a75d8b28b1d 100644 --- a/airflow/providers/google/cloud/operators/kubernetes_engine.py +++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py @@ -73,6 +73,7 @@ if TYPE_CHECKING: from kubernetes.client.models import V1Job, V1Pod + from pendulum import DateTime from airflow.utils.context import Context @@ -773,16 +774,16 @@ def fetch_cluster_info(self) -> tuple[str, str | None]: self._ssl_ca_cert = cluster.master_auth.cluster_ca_certificate return self._cluster_url, self._ssl_ca_cert - def invoke_defer_method(self): + def invoke_defer_method(self, last_log_time: DateTime | None = None): """Redefine triggers which are being used in child classes.""" trigger_start_time = utcnow() self.defer( trigger=GKEStartPodTrigger( - pod_name=self.pod.metadata.name, - pod_namespace=self.pod.metadata.namespace, + pod_name=self.pod.metadata.name, # type: ignore[union-attr] + pod_namespace=self.pod.metadata.namespace, # type: ignore[union-attr] trigger_start_time=trigger_start_time, - cluster_url=self._cluster_url, - ssl_ca_cert=self._ssl_ca_cert, + cluster_url=self._cluster_url, # type: ignore[arg-type] + ssl_ca_cert=self._ssl_ca_cert, # type: ignore[arg-type] get_logs=self.get_logs, startup_timeout=self.startup_timeout_seconds, cluster_context=self.cluster_context, @@ -792,6 +793,8 @@ def invoke_defer_method(self): on_finish_action=self.on_finish_action, gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain, + logging_interval=self.logging_interval, + last_log_time=last_log_time, ), method_name="execute_complete", kwargs={"cluster_url": self._cluster_url, "ssl_ca_cert": self._ssl_ca_cert}, @@ -802,7 +805,7 @@ def execute_complete(self, context: Context, event: dict, **kwargs): self._cluster_url = kwargs["cluster_url"] self._ssl_ca_cert = kwargs["ssl_ca_cert"] - return super().execute_complete(context, event, **kwargs) + return super().trigger_reentry(context, event) class GKEStartJobOperator(KubernetesJobOperator): diff --git a/airflow/providers/google/cloud/triggers/kubernetes_engine.py b/airflow/providers/google/cloud/triggers/kubernetes_engine.py index 8557bea082410..f05bb0dc6c731 100644 --- a/airflow/providers/google/cloud/triggers/kubernetes_engine.py +++ b/airflow/providers/google/cloud/triggers/kubernetes_engine.py @@ -142,6 +142,8 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "on_finish_action": self.on_finish_action.value, "gcp_conn_id": self.gcp_conn_id, "impersonation_chain": self.impersonation_chain, + "logging_interval": self.logging_interval, + "last_log_time": self.last_log_time, }, ) diff --git a/tests/providers/google/cloud/operators/test_kubernetes_engine.py b/tests/providers/google/cloud/operators/test_kubernetes_engine.py index c71cc99c7e64b..2e27db59b3de6 100644 --- a/tests/providers/google/cloud/operators/test_kubernetes_engine.py +++ b/tests/providers/google/cloud/operators/test_kubernetes_engine.py @@ -673,6 +673,7 @@ def setup_method(self): namespace=NAMESPACE, image=IMAGE, deferrable=True, + on_finish_action="delete_pod", ) self.gke_op.pod = mock.MagicMock( name=TASK_NAME, @@ -703,6 +704,59 @@ def test_async_create_pod_should_execute_successfully( fetch_cluster_info_mock.assert_called_once() assert isinstance(exc.value.trigger, GKEStartPodTrigger) + @pytest.mark.parametrize("status", ["error", "failed", "timeout"]) + @mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod") + @mock.patch(KUB_OP_PATH.format("_clean")) + @mock.patch("airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator.hook") + @mock.patch(KUB_OP_PATH.format("_write_logs")) + def test_execute_complete_failure(self, mock_write_logs, mock_gke_hook, mock_clean, mock_get_pod, status): + self.gke_op._cluster_url = CLUSTER_URL + self.gke_op._ssl_ca_cert = SSL_CA_CERT + with pytest.raises(AirflowException): + self.gke_op.execute_complete( + context=mock.MagicMock(), + event={"name": "test", "status": status, "namespace": "default", "message": ""}, + cluster_url=self.gke_op._cluster_url, + ssl_ca_cert=self.gke_op._ssl_ca_cert, + ) + mock_write_logs.assert_called_once() + + @mock.patch("airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator.hook") + @mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod") + @mock.patch(KUB_OP_PATH.format("_clean")) + @mock.patch(KUB_OP_PATH.format("_write_logs")) + def test_execute_complete_success(self, mock_write_logs, mock_clean, mock_get_pod, mock_gke_hook): + self.gke_op._cluster_url = CLUSTER_URL + self.gke_op._ssl_ca_cert = SSL_CA_CERT + self.gke_op.execute_complete( + context=mock.MagicMock(), + event={"name": "test", "status": "success", "namespace": "default"}, + cluster_url=self.gke_op._cluster_url, + ssl_ca_cert=self.gke_op._ssl_ca_cert, + ) + mock_write_logs.assert_called_once() + + @mock.patch(KUB_OP_PATH.format("pod_manager")) + @mock.patch( + "airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator.invoke_defer_method" + ) + @mock.patch("airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook.get_pod") + @mock.patch(KUB_OP_PATH.format("_clean")) + @mock.patch("airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator.hook") + def test_execute_complete_running( + self, mock_gke_hook, mock_clean, mock_get_pod, mock_invoke_defer_method, mock_pod_manager + ): + self.gke_op._cluster_url = CLUSTER_URL + self.gke_op._ssl_ca_cert = SSL_CA_CERT + self.gke_op.execute_complete( + context=mock.MagicMock(), + event={"name": "test", "status": "running", "namespace": "default"}, + cluster_url=self.gke_op._cluster_url, + ssl_ca_cert=self.gke_op._ssl_ca_cert, + ) + mock_pod_manager.fetch_container_logs.assert_called_once() + mock_invoke_defer_method.assert_called_once() + class TestGKEStartJobOperator: def setup_method(self): diff --git a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py index 8a18dfcb902af..8a43f3627c03d 100644 --- a/tests/providers/google/cloud/triggers/test_kubernetes_engine.py +++ b/tests/providers/google/cloud/triggers/test_kubernetes_engine.py @@ -125,6 +125,8 @@ def test_serialize_should_execute_successfully(self, trigger): "should_delete_pod": SHOULD_DELETE_POD, "gcp_conn_id": GCP_CONN_ID, "impersonation_chain": IMPERSONATION_CHAIN, + "last_log_time": None, + "logging_interval": None, } @pytest.mark.asyncio