diff --git a/aixplain/factories/agent_factory/__init__.py b/aixplain/factories/agent_factory/__init__.py index ad4f29d1..0a543d77 100644 --- a/aixplain/factories/agent_factory/__init__.py +++ b/aixplain/factories/agent_factory/__init__.py @@ -33,6 +33,7 @@ from aixplain.factories.agent_factory.utils import build_agent from aixplain.utils.file_utils import _request_with_retry +from urllib.parse import urljoin class AgentFactory: @@ -45,14 +46,13 @@ def create( api_key: Text = config.TEAM_API_KEY, supplier: Union[Dict, Text, Supplier, int] = "aiXplain", version: Optional[Text] = None, - cost: Optional[Dict] = None, llm_id: Optional[Text] = None, ) -> Agent: """Create a new agent in the platform.""" try: agent = None - url = "http://54.86.247.242:8000/create" - headers = {"Authorization": "token " + api_key} + url = urljoin(config.BACKEND_URL, "sdk/agents") + headers = {"x-api-key": api_key} if isinstance(supplier, dict): supplier = supplier["code"] @@ -65,17 +65,18 @@ def create( tool_payload.append( { "function": tool.function.value, - "name": tool.name, + "type": "model", "description": tool.description, - "supplier": tool.supplier.value if tool.supplier else None, + "supplier": tool.supplier.value["code"] if tool.supplier else None, + "version": tool.version if tool.version else None, } ) elif isinstance(tool, PipelineTool): tool_payload.append( { - "id": tool.pipeline, - "name": tool.name, + "assetId": tool.pipeline, "description": tool.description, + "type": "pipeline", } ) else: @@ -83,34 +84,29 @@ def create( payload = { "name": name, - "api_key": api_key, - "tools": tool_payload, + "assets": tool_payload, "description": description, "supplier": supplier, "version": version, - "cost": cost, } if llm_id is not None: - payload["language_model_id"] = llm_id + payload["llmId"] = llm_id logging.info(f"Start service for POST Create Agent - {url} - {headers} - {json.dumps(payload)}") - r = _request_with_retry("post", url, headers=headers, data=json.dumps(payload)) + r = _request_with_retry("post", url, headers=headers, json=payload) if 200 <= r.status_code < 300: response = r.json() - - asset_id = response["id"] - agent = Agent( - id=asset_id, - name=name, - tools=tools, - description=description, - supplier=supplier, - version=version, - cost=cost, - api_key=api_key, - ) + agent = build_agent(payload=response, api_key=api_key) else: + error = r.json() error_msg = "Agent Onboarding Error: Please contant the administrators." + if "message" in error: + msg = error["message"] + if error["message"] == "err.name_already_exists": + msg = "Agent name already exists." + elif error["message"] == "err.asset_is_not_available": + msg = "Some the tools are not available." + error_msg = f"Agent Onboarding Error (HTTP {r.status_code}): {msg}" logging.exception(error_msg) raise Exception(error_msg) except Exception as e: @@ -120,35 +116,51 @@ def create( @classmethod def list(cls) -> Dict: """List all agents available in the platform.""" - url = "http://54.86.247.242:8000/list" - if config.AIXPLAIN_API_KEY != "": - headers = {"x-aixplain-key": f"{config.AIXPLAIN_API_KEY}", "Content-Type": "application/json"} - else: - headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"} + url = urljoin(config.BACKEND_URL, "sdk/agents") + headers = {"x-api-key": config.TEAM_API_KEY, "Content-Type": "application/json"} payload = {} - logging.info(f"Start service for POST List Agents - {url} - {headers} - {json.dumps(payload)}") - r = _request_with_retry("get", url, headers=headers) - resp = r.json() + logging.info(f"Start service for GET List Agents - {url} - {headers} - {json.dumps(payload)}") + try: + r = _request_with_retry("get", url, headers=headers) + resp = r.json() - agents, page_total, total = [], 0, 0 - results = resp - page_total = len(results) - total = len(results) - logging.info(f"Response for POST List Dataset - Page Total: {page_total} / Total: {total}") - for agent in results: - agents.append(build_agent(agent)) - return {"results": agents, "page_total": page_total, "page_number": 0, "total": total} + if 200 <= r.status_code < 300: + agents, page_total, total = [], 0, 0 + results = resp + page_total = len(results) + total = len(results) + logging.info(f"Response for GET List Agents - Page Total: {page_total} / Total: {total}") + for agent in results: + agents.append(build_agent(agent)) + return {"results": agents, "page_total": page_total, "page_number": 0, "total": total} + else: + error_msg = "Agent Listing Error: Please contant the administrators." + if "message" in resp: + msg = resp["message"] + error_msg = f"Agent Listing Error (HTTP {r.status_code}): {msg}" + logging.exception(error_msg) + raise Exception(error_msg) + except Exception as e: + raise Exception(e) @classmethod - def get(cls, agent_id: Text) -> Agent: + def get(cls, agent_id: Text, api_key: Optional[Text] = None) -> Agent: """Get agent by id.""" - url = f"http://54.86.247.242:8000/get?id={agent_id}" + url = urljoin(config.BACKEND_URL, f"sdk/agents/{agent_id}") if config.AIXPLAIN_API_KEY != "": headers = {"x-aixplain-key": f"{config.AIXPLAIN_API_KEY}", "Content-Type": "application/json"} else: - headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"} + api_key = api_key if api_key is not None else config.TEAM_API_KEY + headers = {"x-api-key": api_key, "Content-Type": "application/json"} logging.info(f"Start service for GET Agent - {url} - {headers}") r = _request_with_retry("get", url, headers=headers) resp = r.json() - return build_agent(resp) + if 200 <= r.status_code < 300: + return build_agent(resp) + else: + msg = "Please contant the administrators." + if "message" in resp: + msg = resp["message"] + error_msg = f"Agent Get Error (HTTP {r.status_code}): {msg}" + raise Exception(error_msg) diff --git a/aixplain/factories/agent_factory/utils.py b/aixplain/factories/agent_factory/utils.py index e7ecde0e..410d1692 100644 --- a/aixplain/factories/agent_factory/utils.py +++ b/aixplain/factories/agent_factory/utils.py @@ -1,17 +1,31 @@ __author__ = "thiagocastroferreira" +import aixplain.utils.config as config +from aixplain.enums import Function, Supplier +from aixplain.enums.asset_status import AssetStatus from aixplain.modules.agent import Agent, ModelTool, PipelineTool -from typing import Dict +from typing import Dict, Text +from urllib.parse import urljoin -def build_agent(payload: Dict) -> Agent: +def build_agent(payload: Dict, api_key: Text = config.TEAM_API_KEY) -> Agent: """Instantiate a new agent in the platform.""" - tools = payload["tools"] + tools = payload["assets"] for i, tool in enumerate(tools): - if "function" in tool: - tool = ModelTool(**tool) - elif "id" in tool: - tool = PipelineTool(name=tool["name"], description=tool["description"], pipeline=tool["id"]) + if tool["type"] == "model": + for supplier in Supplier: + if tool["supplier"].lower() in [supplier.value["code"].lower(), supplier.value["name"].lower()]: + tool["supplier"] = supplier + break + + tool = ModelTool( + description=tool["description"], + function=Function(tool["function"]), + supplier=tool["supplier"], + version=tool["version"], + ) + elif tool["type"] == "pipeline": + tool = PipelineTool(description=tool["description"], pipeline=tool["assetId"]) else: raise Exception("Agent Creation Error: Tool type not supported.") tools[i] = tool @@ -21,11 +35,12 @@ def build_agent(payload: Dict) -> Agent: name=payload["name"] if "name" in payload else "", tools=tools, description=payload["description"] if "description" in payload else "", - supplier=payload["supplier"] if "supplier" in payload else None, + supplier=payload["teamId"] if "teamId" in payload else None, version=payload["version"] if "version" in payload else None, cost=payload["cost"] if "cost" in payload else None, - llm_id=payload["language_model_id"] if "language_model_id" in payload else None, - api_key=payload["api_key"], + llm_id=payload["llmId"] if "llmId" in payload else "6646261c6eb563165658bbb1", + api_key=api_key, + status=AssetStatus(payload["status"]), ) - agent.url = "http://54.86.247.242:8000/async-execute" + agent.url = urljoin(config.BACKEND_URL, f"sdk/agents/{agent.id}/run") return agent diff --git a/aixplain/modules/agent/__init__.py b/aixplain/modules/agent/__init__.py index 2985d5a8..ca9df4b5 100644 --- a/aixplain/modules/agent/__init__.py +++ b/aixplain/modules/agent/__init__.py @@ -20,15 +20,20 @@ Description: Agentification Class """ +import json import logging +import time +import traceback from aixplain.utils.file_utils import _request_with_retry from aixplain.enums.supplier import Supplier +from aixplain.enums.asset_status import AssetStatus from aixplain.modules.model import Model from aixplain.modules.agent.tool import Tool from aixplain.modules.agent.tool.model_tool import ModelTool from aixplain.modules.agent.tool.pipeline_tool import PipelineTool from typing import Dict, List, Text, Optional, Union +from urllib.parse import urljoin from aixplain.utils import config @@ -41,6 +46,7 @@ class Agent(Model): name (Text): Name of the Agent tools (List[Tool]): List of tools that the Agent uses. description (Text, optional): description of the Agent. Defaults to "". + llm_id (Text, optional): large language model. Defaults to GPT-4o (6646261c6eb563165658bbb1). supplier (Text): Supplier of the Agent. version (Text): Version of the Agent. backend_url (str): URL of the backend. @@ -54,11 +60,12 @@ def __init__( name: Text, tools: List[Tool] = [], description: Text = "", + llm_id: Text = "6646261c6eb563165658bbb1", api_key: Optional[Text] = config.TEAM_API_KEY, supplier: Union[Dict, Text, Supplier, int] = "aiXplain", version: Optional[Text] = None, cost: Optional[Dict] = None, - llm_id: Optional[Text] = None, + status: AssetStatus = AssetStatus.ONBOARDING, **additional_info, ) -> None: """Create a FineTune with the necessary information. @@ -68,29 +75,121 @@ def __init__( name (Text): Name of the Agent tools (List[Tool]): List of tools that the Agent uses. description (Text, optional): description of the Agent. Defaults to "". + llm_id (Text, optional): large language model. Defaults to GPT-4o (6646261c6eb563165658bbb1). supplier (Text): Supplier of the Agent. version (Text): Version of the Agent. backend_url (str): URL of the backend. api_key (str): The TEAM API key used for authentication. cost (Dict, optional): model price. Defaults to None. - **additional_info: Additional information to be saved with the FineTune. """ assert len(tools) > 0, "At least one tool must be provided." super().__init__(id, name, description, api_key, supplier, version, cost=cost) self.additional_info = additional_info self.tools = tools self.llm_id = llm_id + if isinstance(status, str): + try: + status = AssetStatus(status) + except Exception: + status = AssetStatus.ONBOARDING + self.status = status + + def run( + self, + query: Text, + session_id: Optional[Text] = None, + history: Optional[List[Dict]] = None, + name: Text = "model_process", + timeout: float = 300, + parameters: Dict = {}, + wait_time: float = 0.5, + ) -> Dict: + """Runs an agent call. + + Args: + query (Text): query to be processed by the agent. + session_id (Optional[Text], optional): conversation Session ID. Defaults to None. + history (Optional[List[Dict]], optional): chat history (in case session ID is None). Defaults to None. + name (Text, optional): ID given to a call. Defaults to "model_process". + timeout (float, optional): total polling time. Defaults to 300. + parameters (Dict, optional): optional parameters to the model. Defaults to "{}". + wait_time (float, optional): wait time in seconds between polling calls. Defaults to 0.5. + + Returns: + Dict: parsed output from model + """ + start = time.time() + try: + response = self.run_async(query=query, session_id=session_id, history=history, name=name, parameters=parameters) + if response["status"] == "FAILED": + end = time.time() + response["elapsed_time"] = end - start + return response + poll_url = response["url"] + end = time.time() + response = self.sync_poll(poll_url, name=name, timeout=timeout, wait_time=wait_time) + return response + except Exception as e: + msg = f"Error in request for {name} - {traceback.format_exc()}" + logging.error(f"Model Run: Error in running for {name}: {e}") + end = time.time() + return {"status": "FAILED", "error": msg, "elapsed_time": end - start} + + def run_async( + self, + query: Text, + session_id: Optional[Text] = None, + history: Optional[List[Dict]] = None, + name: Text = "model_process", + parameters: Dict = {}, + ) -> Dict: + """Runs asynchronously an agent call. + + Args: + query (Text): query to be processed by the agent. + session_id (Optional[Text], optional): conversation Session ID. Defaults to None. + history (Optional[List[Dict]], optional): chat history (in case session ID is None). Defaults to None. + name (Text, optional): ID given to a call. Defaults to "model_process". + parameters (Dict, optional): optional parameters to the model. Defaults to "{}". + + Returns: + dict: polling URL in response + """ + headers = {"x-api-key": self.api_key, "Content-Type": "application/json"} + from aixplain.factories.file_factory import FileFactory + + payload = {"id": self.id, "query": FileFactory.to_link(query), "sessionId": session_id, "history": history} + payload.update(parameters) + payload = json.dumps(payload) + + r = _request_with_retry("post", self.url, headers=headers, data=payload) + logging.info(f"Model Run Async: Start service for {name} - {self.url} - {payload} - {headers}") + + resp = None + try: + resp = r.json() + logging.info(f"Result of request for {name} - {r.status_code} - {resp}") + + poll_url = resp["data"] + response = {"status": "IN_PROGRESS", "url": poll_url} + except Exception: + response = {"status": "FAILED"} + msg = f"Error in request for {name} - {traceback.format_exc()}" + logging.error(f"Model Run Async: Error in running for {name}: {resp}") + if resp is not None: + response["error"] = msg + return response def delete(self) -> None: """Delete Corpus service""" try: - url = f"http://54.86.247.242:8000/delete/{self.id}" - headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"} - logging.info(f"Start service for DELETE Agent - {url} - {headers}") + url = urljoin(config.BACKEND_URL, f"sdk/agents/{self.id}") + headers = {"x-api-key": config.TEAM_API_KEY, "Content-Type": "application/json"} + logging.debug(f"Start service for DELETE Agent - {url} - {headers}") r = _request_with_retry("delete", url, headers=headers) if r.status_code != 200: raise Exception() except Exception: - message = "Agent Deletion Error: Make sure the agent exists and you are the owner." + message = f"Agent Deletion Error (HTTP {r.status_code}): Make sure the agent exists and you are the owner." logging.error(message) raise Exception(f"{message}") diff --git a/aixplain/modules/agent/tool/__init__.py b/aixplain/modules/agent/tool/__init__.py index 64dd901f..2a22511a 100644 --- a/aixplain/modules/agent/tool/__init__.py +++ b/aixplain/modules/agent/tool/__init__.py @@ -21,7 +21,7 @@ Agentification Class """ from abc import ABC -from typing import Text +from typing import Optional, Text class Tool(ABC): @@ -30,12 +30,14 @@ class Tool(ABC): Attributes: name (Text): name of the tool description (Text): descriptiion of the tool + version (Text): version of the tool """ def __init__( self, name: Text, description: Text, + version: Optional[Text] = None, **additional_info, ) -> None: """Specialized software or resource designed to assist the AI in executing specific tasks or functions based on user commands. @@ -43,7 +45,9 @@ def __init__( Args: name (Text): name of the tool description (Text): descriptiion of the tool + version (Text): version of the tool """ self.name = name self.description = description + self.version = version self.additional_info = additional_info diff --git a/aixplain/modules/agent/tool/model_tool.py b/aixplain/modules/agent/tool/model_tool.py index 6796d298..38c03fe7 100644 --- a/aixplain/modules/agent/tool/model_tool.py +++ b/aixplain/modules/agent/tool/model_tool.py @@ -31,7 +31,6 @@ class ModelTool(Tool): """Specialized software or resource designed to assist the AI in executing specific tasks or functions based on user commands. Attributes: - name (Text): name of the tool description (Text): descriptiion of the tool function (Function): task that the tool performs supplier (Optional[Union[Dict, Text, Supplier, int]], optional): Preferred supplier to perform the task. Defaults to None. @@ -39,7 +38,6 @@ class ModelTool(Tool): def __init__( self, - name: Text, description: Text, function: Function, supplier: Optional[Supplier] = None, @@ -53,7 +51,7 @@ def __init__( function (Function): task that the tool performs supplier (Optional[Union[Dict, Text, Supplier, int]], optional): Preferred supplier to perform the task. Defaults to None. """ - super().__init__(name, description, **additional_info) + super().__init__("", description, **additional_info) if isinstance(function, str): function = Function(function) self.function = function diff --git a/aixplain/modules/agent/tool/pipeline_tool.py b/aixplain/modules/agent/tool/pipeline_tool.py index c0e37b65..330d4c67 100644 --- a/aixplain/modules/agent/tool/pipeline_tool.py +++ b/aixplain/modules/agent/tool/pipeline_tool.py @@ -30,13 +30,11 @@ class PipelineTool(Tool): """Specialized software or resource designed to assist the AI in executing specific tasks or functions based on user commands. Attributes: - name (Text): name of the tool description (Text): descriptiion of the tool """ def __init__( self, - name: Text, description: Text, pipeline: Union[Text, Pipeline], **additional_info, @@ -44,11 +42,10 @@ def __init__( """Specialized software or resource designed to assist the AI in executing specific tasks or functions based on user commands. Args: - name (Text): name of the tool description (Text): description of the tool pipeline (Union[Text, Pipeline]): pipeline """ - super().__init__(name, description, **additional_info) + super().__init__("", description, **additional_info) if isinstance(pipeline, Pipeline): pipeline = pipeline.id self.pipeline = pipeline diff --git a/aixplain/modules/asset.py b/aixplain/modules/asset.py index 52b79912..c453415d 100644 --- a/aixplain/modules/asset.py +++ b/aixplain/modules/asset.py @@ -57,7 +57,13 @@ def __init__( elif isinstance(supplier, Dict) is True: self.supplier = Supplier(supplier) else: - self.supplier = supplier + self.supplier = None + for supplier_ in Supplier: + if supplier.lower() in [supplier_.value["code"].lower(), supplier_.value["name"].lower()]: + self.supplier = supplier_ + break + if self.supplier is None: + self.supplier = supplier except Exception: self.supplier = str(supplier) self.version = version diff --git a/aixplain/modules/model.py b/aixplain/modules/model.py index be468dac..e1e13c63 100644 --- a/aixplain/modules/model.py +++ b/aixplain/modules/model.py @@ -100,7 +100,7 @@ def __repr__(self): except Exception: return f"<Model: {self.name} by {self.supplier}>" - def __polling(self, poll_url: Text, name: Text = "model_process", wait_time: float = 0.5, timeout: float = 300) -> Dict: + def sync_poll(self, poll_url: Text, name: Text = "model_process", wait_time: float = 0.5, timeout: float = 300) -> Dict: """Keeps polling the platform to check whether an asynchronous call is done. Args: @@ -161,7 +161,7 @@ def poll(self, poll_url: Text, name: Text = "model_process") -> Dict: resp["status"] = "FAILED" else: resp["status"] = "IN_PROGRESS" - logging.info(f"Single Poll for Model: Status of polling for {name}: {resp}") + logging.debug(f"Single Poll for Model: Status of polling for {name}: {resp}") except Exception as e: resp = {"status": "FAILED"} logging.error(f"Single Poll for Model: Error of polling for {name}: {e}") @@ -196,7 +196,7 @@ def run( return response poll_url = response["url"] end = time.time() - response = self.__polling(poll_url, name=name, timeout=timeout, wait_time=wait_time) + response = self.sync_poll(poll_url, name=name, timeout=timeout, wait_time=wait_time) return response except Exception as e: msg = f"Error in request for {name} - {traceback.format_exc()}"