From 3d81104800f0e77b2557f6c1ea832c000ebbaf86 Mon Sep 17 00:00:00 2001 From: Thiago Castro Ferreira Date: Mon, 17 Jun 2024 11:00:09 -0300 Subject: [PATCH 1/4] Saving pipelines as asset --- aixplain/factories/pipeline_factory.py | 12 ++++-------- aixplain/modules/pipeline.py | 22 +++++++++++++--------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/aixplain/factories/pipeline_factory.py b/aixplain/factories/pipeline_factory.py index 4ebdc439..02e098d9 100644 --- a/aixplain/factories/pipeline_factory.py +++ b/aixplain/factories/pipeline_factory.py @@ -225,16 +225,13 @@ def list( return {"results": pipelines, "page_total": page_total, "page_number": page_number, "total": total} @classmethod - def create( - cls, name: Text, pipeline: Union[Text, Dict], status: Text = "draft", api_key: Optional[Text] = None - ) -> Pipeline: - """Pipeline Creation + def create(cls, name: Text, pipeline: Union[Text, Dict], api_key: Optional[Text] = None) -> Pipeline: + """Draft Pipeline Creation Args: name (Text): Pipeline Name pipeline (Union[Text, Dict]): Pipeline as a Python dictionary or in a JSON file - status (Text, optional): Status of the pipeline. Currently only draft pipelines can be saved. Defaults to "draft". - api_key (Optional[Text], optional): _description_. Defaults to None. + api_key (Optional[Text], optional): Team API Key to create the Pipeline. Defaults to None. Raises: Exception: Currently just the creation of draft pipelines are supported @@ -243,12 +240,11 @@ def create( Pipeline: instance of the new pipeline """ try: - assert status == "draft", "Pipeline Creation Error: Currently just the creation of draft pipelines are supported." if isinstance(pipeline, str) is True: _, ext = os.path.splitext(pipeline) assert ( os.path.exists(pipeline) and ext == ".json" - ), "Pipeline Creation Error: Make sure the pipeline to be save is in a JSON file." + ), "Pipeline Creation Error: Make sure the pipeline to be saved is in a JSON file." with open(pipeline) as f: pipeline = json.load(f) diff --git a/aixplain/modules/pipeline.py b/aixplain/modules/pipeline.py index 3de49756..bfa20c3a 100644 --- a/aixplain/modules/pipeline.py +++ b/aixplain/modules/pipeline.py @@ -101,12 +101,12 @@ def __polling( time.sleep(wait_time) if wait_time < 60: wait_time *= 1.1 - except Exception as e: + except Exception: logging.error(f"Polling for Pipeline: polling for {name} : Continue") if response_body and response_body["status"] == "SUCCESS": try: logging.debug(f"Polling for Pipeline: Final status of polling for {name} : SUCCESS - {response_body}") - except Exception as e: + except Exception: logging.error(f"Polling for Pipeline: Final status of polling for {name} : ERROR - {response_body}") else: logging.error( @@ -130,7 +130,7 @@ def poll(self, poll_url: Text, name: Text = "pipeline_process") -> Dict: try: resp = r.json() logging.info(f"Single Poll for Pipeline: Status of polling for {name} : {resp}") - except Exception as e: + except Exception: resp = {"status": "FAILED"} return resp @@ -206,7 +206,7 @@ def __prepare_payload(self, data: Union[Text, Dict], data_asset: Optional[Union[ if isinstance(payload, int) is True or isinstance(payload, float) is True: payload = str(payload) payload = {"data": payload} - except Exception as e: + except Exception: payload = {"data": data} else: payload = {} @@ -251,7 +251,7 @@ def __prepare_payload(self, data: Union[Text, Dict], data_asset: Optional[Union[ if target_row.id == data[node_label]: data_found = True break - if data_found == True: + if data_found is True: break except Exception: data_asset_found = False @@ -303,17 +303,18 @@ def run_async( poll_url = resp["url"] response = {"status": "IN_PROGRESS", "url": poll_url} - except Exception as e: + except Exception: response = {"status": "FAILED"} if resp is not None: response["error"] = resp return response - def update(self, pipeline: Union[Text, Dict]): + def update(self, pipeline: Union[Text, Dict], save_as_asset: bool = False): """Update Pipeline Args: pipeline (Union[Text, Dict]): Pipeline as a Python dictionary or in a JSON file + save_as_asset (bool, optional): Save as asset (True) or draft (False). Defaults to False. Raises: Exception: Make sure the pipeline to be save is in a JSON file. @@ -323,12 +324,15 @@ def update(self, pipeline: Union[Text, Dict]): _, ext = os.path.splitext(pipeline) assert ( os.path.exists(pipeline) and ext == ".json" - ), "Pipeline Update Error: Make sure the pipeline to be save is in a JSON file." + ), "Pipeline Update Error: Make sure the pipeline to be saved is in a JSON file." with open(pipeline) as f: pipeline = json.load(f) # prepare payload - payload = {"name": self.name, "status": "draft", "architecture": pipeline} + status = "draft" + if save_as_asset is True: + status = "onboarded" + payload = {"name": self.name, "status": status, "architecture": pipeline} url = urljoin(config.BACKEND_URL, f"sdk/pipelines/{self.id}") headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"} logging.info(f"Start service for PUT Update Pipeline - {url} - {headers} - {json.dumps(payload)}") From 4e184f582528fb3e262ba7540f66900e50fd0049 Mon Sep 17 00:00:00 2001 From: Thiago Castro Ferreira Date: Mon, 17 Jun 2024 11:36:35 -0300 Subject: [PATCH 2/4] Pipeline delete service --- aixplain/modules/pipeline.py | 14 ++++++++++++++ tests/functional/pipelines/create_test.py | 7 +++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/aixplain/modules/pipeline.py b/aixplain/modules/pipeline.py index bfa20c3a..12c933c4 100644 --- a/aixplain/modules/pipeline.py +++ b/aixplain/modules/pipeline.py @@ -341,3 +341,17 @@ def update(self, pipeline: Union[Text, Dict], save_as_asset: bool = False): logging.info(f"Pipeline {response['id']} Updated.") except Exception as e: raise Exception(e) + + def delete(self) -> None: + """Delete Dataset service""" + try: + url = urljoin(config.BACKEND_URL, f"sdk/pipelines/{self.id}") + headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"} + logging.info(f"Start service for DELETE Pipeline - {url} - {headers}") + r = _request_with_retry("delete", url, headers=headers) + if r.status_code != 200: + raise Exception() + except Exception: + message = "Pipeline Deletion Error: Make sure the pipeline exists and you are the owner." + logging.error(message) + raise Exception(f"{message}") diff --git a/tests/functional/pipelines/create_test.py b/tests/functional/pipelines/create_test.py index f2c1a9c9..6431bd41 100644 --- a/tests/functional/pipelines/create_test.py +++ b/tests/functional/pipelines/create_test.py @@ -30,6 +30,7 @@ def test_create_pipeline_from_json(): assert isinstance(pipeline, Pipeline) assert pipeline.id != "" + pipeline.delete() def test_create_pipeline_from_string(): @@ -42,6 +43,7 @@ def test_create_pipeline_from_string(): assert isinstance(pipeline, Pipeline) assert pipeline.id != "" + pipeline.delete() def test_update_pipeline(): @@ -52,13 +54,14 @@ def test_update_pipeline(): pipeline_name = str(uuid4()) pipeline = PipelineFactory.create(name=pipeline_name, pipeline=pipeline_dict) - pipeline.update(pipeline=pipeline_json) + pipeline.update(pipeline=pipeline_json, save_as_asset=True) assert isinstance(pipeline, Pipeline) assert pipeline.id != "" + pipeline.delete() def test_create_pipeline_wrong_path(): pipeline_name = str(uuid4()) with pytest.raises(Exception): - pipeline = PipelineFactory.create(name=pipeline_name, pipeline="/") + PipelineFactory.create(name=pipeline_name, pipeline="/") From 8b2b51ebf2cfd60c8c58f04c974ea91bad390037 Mon Sep 17 00:00:00 2001 From: Thiago Castro Ferreira Date: Mon, 17 Jun 2024 12:34:05 -0300 Subject: [PATCH 3/4] Function type AI to lower case --- aixplain/factories/pipeline_factory.py | 3 +++ aixplain/modules/pipeline.py | 3 +++ tests/unit/pipeline_test.py | 3 +-- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/aixplain/factories/pipeline_factory.py b/aixplain/factories/pipeline_factory.py index 02e098d9..fac42283 100644 --- a/aixplain/factories/pipeline_factory.py +++ b/aixplain/factories/pipeline_factory.py @@ -248,6 +248,9 @@ def create(cls, name: Text, pipeline: Union[Text, Dict], api_key: Optional[Text] with open(pipeline) as f: pipeline = json.load(f) + for i, node in enumerate(pipeline["nodes"]): + if "functionType" in node and node["functionType"] == "AI": + pipeline["nodes"][i]["functionType"] = pipeline["nodes"][i]["functionType"].lower() # prepare payload payload = {"name": name, "status": "draft", "architecture": pipeline} url = urljoin(cls.backend_url, "sdk/pipelines") diff --git a/aixplain/modules/pipeline.py b/aixplain/modules/pipeline.py index 12c933c4..b079e2a3 100644 --- a/aixplain/modules/pipeline.py +++ b/aixplain/modules/pipeline.py @@ -328,6 +328,9 @@ def update(self, pipeline: Union[Text, Dict], save_as_asset: bool = False): with open(pipeline) as f: pipeline = json.load(f) + for i, node in enumerate(pipeline["nodes"]): + if "functionType" in node and node["functionType"] == "AI": + pipeline["nodes"][i]["functionType"] = pipeline["nodes"][i]["functionType"].lower() # prepare payload status = "draft" if save_as_asset is True: diff --git a/tests/unit/pipeline_test.py b/tests/unit/pipeline_test.py index 68a399aa..e983a298 100644 --- a/tests/unit/pipeline_test.py +++ b/tests/unit/pipeline_test.py @@ -24,7 +24,6 @@ from aixplain.factories import PipelineFactory from aixplain.modules import Pipeline from urllib.parse import urljoin -import pytest def test_create_pipeline(): @@ -34,6 +33,6 @@ def test_create_pipeline(): ref_response = {"id": "12345"} mock.post(url, headers=headers, json=ref_response) ref_pipeline = Pipeline(id="12345", name="Pipeline Test", api_key=config.TEAM_API_KEY) - hyp_pipeline = PipelineFactory.create(pipeline={}, name="Pipeline Test") + hyp_pipeline = PipelineFactory.create(pipeline={"nodes": []}, name="Pipeline Test") assert hyp_pipeline.id == ref_pipeline.id assert hyp_pipeline.name == ref_pipeline.name From 6f1830524815bc8c7c769b14a53803676dc268a7 Mon Sep 17 00:00:00 2001 From: Thiago Castro Ferreira <85182544+thiago-aixplain@users.noreply.github.com> Date: Mon, 17 Jun 2024 18:52:35 -0300 Subject: [PATCH 4/4] API Key as a parameter --- aixplain/modules/pipeline.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/aixplain/modules/pipeline.py b/aixplain/modules/pipeline.py index b079e2a3..ed131018 100644 --- a/aixplain/modules/pipeline.py +++ b/aixplain/modules/pipeline.py @@ -309,12 +309,13 @@ def run_async( response["error"] = resp return response - def update(self, pipeline: Union[Text, Dict], save_as_asset: bool = False): + def update(self, pipeline: Union[Text, Dict], save_as_asset: bool = False, api_key: Optional[Text] = None): """Update Pipeline Args: pipeline (Union[Text, Dict]): Pipeline as a Python dictionary or in a JSON file save_as_asset (bool, optional): Save as asset (True) or draft (False). Defaults to False. + api_key (Optional[Text], optional): Team API Key to create the Pipeline. Defaults to None. Raises: Exception: Make sure the pipeline to be save is in a JSON file. @@ -337,7 +338,8 @@ def update(self, pipeline: Union[Text, Dict], save_as_asset: bool = False): status = "onboarded" payload = {"name": self.name, "status": status, "architecture": pipeline} url = urljoin(config.BACKEND_URL, f"sdk/pipelines/{self.id}") - headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"} + api_key = api_key if api_key is not None else config.TEAM_API_KEY + headers = {"Authorization": f"Token {api_key}", "Content-Type": "application/json"} logging.info(f"Start service for PUT Update Pipeline - {url} - {headers} - {json.dumps(payload)}") r = _request_with_retry("put", url, headers=headers, json=payload) response = r.json()