Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

M 6875703542 agentification deployment #195

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 56 additions & 44 deletions aixplain/factories/agent_factory/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

from aixplain.factories.agent_factory.utils import build_agent
from aixplain.utils.file_utils import _request_with_retry
from urllib.parse import urljoin


class AgentFactory:
Expand All @@ -45,14 +46,13 @@ def create(
api_key: Text = config.TEAM_API_KEY,
supplier: Union[Dict, Text, Supplier, int] = "aiXplain",
version: Optional[Text] = None,
cost: Optional[Dict] = None,
llm_id: Optional[Text] = None,
) -> Agent:
"""Create a new agent in the platform."""
try:
agent = None
url = "http://54.86.247.242:8000/create"
headers = {"Authorization": "token " + api_key}
url = urljoin(config.BACKEND_URL, "sdk/agents")
headers = {"x-api-key": api_key}

if isinstance(supplier, dict):
supplier = supplier["code"]
Expand All @@ -65,52 +65,48 @@ def create(
tool_payload.append(
{
"function": tool.function.value,
"name": tool.name,
"type": "model",
"description": tool.description,
"supplier": tool.supplier.value if tool.supplier else None,
"supplier": tool.supplier.value["code"] if tool.supplier else None,
"version": tool.version if tool.version else None,
}
)
elif isinstance(tool, PipelineTool):
tool_payload.append(
{
"id": tool.pipeline,
"name": tool.name,
"assetId": tool.pipeline,
"description": tool.description,
"type": "pipeline",
}
)
else:
raise Exception("Agent Creation Error: Tool type not supported.")

payload = {
"name": name,
"api_key": api_key,
"tools": tool_payload,
"assets": tool_payload,
"description": description,
"supplier": supplier,
"version": version,
"cost": cost,
}
if llm_id is not None:
payload["language_model_id"] = llm_id
payload["llmId"] = llm_id

logging.info(f"Start service for POST Create Agent - {url} - {headers} - {json.dumps(payload)}")
r = _request_with_retry("post", url, headers=headers, data=json.dumps(payload))
r = _request_with_retry("post", url, headers=headers, json=payload)
if 200 <= r.status_code < 300:
response = r.json()

asset_id = response["id"]
agent = Agent(
id=asset_id,
name=name,
tools=tools,
description=description,
supplier=supplier,
version=version,
cost=cost,
api_key=api_key,
)
agent = build_agent(payload=response, api_key=api_key)
else:
error = r.json()
error_msg = "Agent Onboarding Error: Please contant the administrators."
if "message" in error:
msg = error["message"]
if error["message"] == "err.name_already_exists":
msg = "Agent name already exists."
elif error["message"] == "err.asset_is_not_available":
msg = "Some the tools are not available."
error_msg = f"Agent Onboarding Error (HTTP {r.status_code}): {msg}"
logging.exception(error_msg)
raise Exception(error_msg)
except Exception as e:
Expand All @@ -120,35 +116,51 @@ def create(
@classmethod
def list(cls) -> Dict:
"""List all agents available in the platform."""
url = "http://54.86.247.242:8000/list"
if config.AIXPLAIN_API_KEY != "":
headers = {"x-aixplain-key": f"{config.AIXPLAIN_API_KEY}", "Content-Type": "application/json"}
else:
headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"}
url = urljoin(config.BACKEND_URL, "sdk/agents")
headers = {"x-api-key": config.TEAM_API_KEY, "Content-Type": "application/json"}

payload = {}
logging.info(f"Start service for POST List Agents - {url} - {headers} - {json.dumps(payload)}")
r = _request_with_retry("get", url, headers=headers)
resp = r.json()
logging.info(f"Start service for GET List Agents - {url} - {headers} - {json.dumps(payload)}")
try:
r = _request_with_retry("get", url, headers=headers)
resp = r.json()

agents, page_total, total = [], 0, 0
results = resp
page_total = len(results)
total = len(results)
logging.info(f"Response for POST List Dataset - Page Total: {page_total} / Total: {total}")
for agent in results:
agents.append(build_agent(agent))
return {"results": agents, "page_total": page_total, "page_number": 0, "total": total}
if 200 <= r.status_code < 300:
agents, page_total, total = [], 0, 0
results = resp
page_total = len(results)
total = len(results)
logging.info(f"Response for GET List Agents - Page Total: {page_total} / Total: {total}")
for agent in results:
agents.append(build_agent(agent))
return {"results": agents, "page_total": page_total, "page_number": 0, "total": total}
else:
error_msg = "Agent Listing Error: Please contant the administrators."
if "message" in resp:
msg = resp["message"]
error_msg = f"Agent Listing Error (HTTP {r.status_code}): {msg}"
logging.exception(error_msg)
raise Exception(error_msg)
except Exception as e:
raise Exception(e)

@classmethod
def get(cls, agent_id: Text) -> Agent:
def get(cls, agent_id: Text, api_key: Optional[Text] = None) -> Agent:
"""Get agent by id."""
url = f"http://54.86.247.242:8000/get?id={agent_id}"
url = urljoin(config.BACKEND_URL, f"sdk/agents/{agent_id}")
if config.AIXPLAIN_API_KEY != "":
headers = {"x-aixplain-key": f"{config.AIXPLAIN_API_KEY}", "Content-Type": "application/json"}
else:
headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"}
api_key = api_key if api_key is not None else config.TEAM_API_KEY
headers = {"x-api-key": api_key, "Content-Type": "application/json"}
logging.info(f"Start service for GET Agent - {url} - {headers}")
r = _request_with_retry("get", url, headers=headers)
resp = r.json()
return build_agent(resp)
if 200 <= r.status_code < 300:
return build_agent(resp)
else:
msg = "Please contant the administrators."
if "message" in resp:
msg = resp["message"]
error_msg = f"Agent Get Error (HTTP {r.status_code}): {msg}"
raise Exception(error_msg)
37 changes: 26 additions & 11 deletions aixplain/factories/agent_factory/utils.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
__author__ = "thiagocastroferreira"

import aixplain.utils.config as config
from aixplain.enums import Function, Supplier
from aixplain.enums.asset_status import AssetStatus
from aixplain.modules.agent import Agent, ModelTool, PipelineTool
from typing import Dict
from typing import Dict, Text
from urllib.parse import urljoin


def build_agent(payload: Dict) -> Agent:
def build_agent(payload: Dict, api_key: Text = config.TEAM_API_KEY) -> Agent:
"""Instantiate a new agent in the platform."""
tools = payload["tools"]
tools = payload["assets"]
for i, tool in enumerate(tools):
if "function" in tool:
tool = ModelTool(**tool)
elif "id" in tool:
tool = PipelineTool(name=tool["name"], description=tool["description"], pipeline=tool["id"])
if tool["type"] == "model":
for supplier in Supplier:
if tool["supplier"].lower() in [supplier.value["code"].lower(), supplier.value["name"].lower()]:
tool["supplier"] = supplier
break

tool = ModelTool(
description=tool["description"],
function=Function(tool["function"]),
supplier=tool["supplier"],
version=tool["version"],
)
elif tool["type"] == "pipeline":
tool = PipelineTool(description=tool["description"], pipeline=tool["assetId"])
else:
raise Exception("Agent Creation Error: Tool type not supported.")
tools[i] = tool
Expand All @@ -21,11 +35,12 @@ def build_agent(payload: Dict) -> Agent:
name=payload["name"] if "name" in payload else "",
tools=tools,
description=payload["description"] if "description" in payload else "",
supplier=payload["supplier"] if "supplier" in payload else None,
supplier=payload["teamId"] if "teamId" in payload else None,
version=payload["version"] if "version" in payload else None,
cost=payload["cost"] if "cost" in payload else None,
llm_id=payload["language_model_id"] if "language_model_id" in payload else None,
api_key=payload["api_key"],
llm_id=payload["llmId"] if "llmId" in payload else "6646261c6eb563165658bbb1",
api_key=api_key,
status=AssetStatus(payload["status"]),
)
agent.url = "http://54.86.247.242:8000/async-execute"
agent.url = urljoin(config.BACKEND_URL, f"sdk/agents/{agent.id}/run")
return agent
111 changes: 105 additions & 6 deletions aixplain/modules/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@
Description:
Agentification Class
"""
import json
import logging
import time
import traceback

from aixplain.utils.file_utils import _request_with_retry
from aixplain.enums.supplier import Supplier
from aixplain.enums.asset_status import AssetStatus
from aixplain.modules.model import Model
from aixplain.modules.agent.tool import Tool
from aixplain.modules.agent.tool.model_tool import ModelTool
from aixplain.modules.agent.tool.pipeline_tool import PipelineTool
from typing import Dict, List, Text, Optional, Union
from urllib.parse import urljoin

from aixplain.utils import config

Expand All @@ -41,6 +46,7 @@ class Agent(Model):
name (Text): Name of the Agent
tools (List[Tool]): List of tools that the Agent uses.
description (Text, optional): description of the Agent. Defaults to "".
llm_id (Text, optional): large language model. Defaults to GPT-4o (6646261c6eb563165658bbb1).
supplier (Text): Supplier of the Agent.
version (Text): Version of the Agent.
backend_url (str): URL of the backend.
Expand All @@ -54,11 +60,12 @@ def __init__(
name: Text,
tools: List[Tool] = [],
description: Text = "",
llm_id: Text = "6646261c6eb563165658bbb1",
api_key: Optional[Text] = config.TEAM_API_KEY,
supplier: Union[Dict, Text, Supplier, int] = "aiXplain",
version: Optional[Text] = None,
cost: Optional[Dict] = None,
llm_id: Optional[Text] = None,
status: AssetStatus = AssetStatus.ONBOARDING,
**additional_info,
) -> None:
"""Create a FineTune with the necessary information.
Expand All @@ -68,29 +75,121 @@ def __init__(
name (Text): Name of the Agent
tools (List[Tool]): List of tools that the Agent uses.
description (Text, optional): description of the Agent. Defaults to "".
llm_id (Text, optional): large language model. Defaults to GPT-4o (6646261c6eb563165658bbb1).
supplier (Text): Supplier of the Agent.
version (Text): Version of the Agent.
backend_url (str): URL of the backend.
api_key (str): The TEAM API key used for authentication.
cost (Dict, optional): model price. Defaults to None.
**additional_info: Additional information to be saved with the FineTune.
"""
assert len(tools) > 0, "At least one tool must be provided."
super().__init__(id, name, description, api_key, supplier, version, cost=cost)
self.additional_info = additional_info
self.tools = tools
self.llm_id = llm_id
if isinstance(status, str):
try:
status = AssetStatus(status)
except Exception:
status = AssetStatus.ONBOARDING
self.status = status

def run(
self,
query: Text,
session_id: Optional[Text] = None,
history: Optional[List[Dict]] = None,
name: Text = "model_process",
timeout: float = 300,
parameters: Dict = {},
wait_time: float = 0.5,
) -> Dict:
"""Runs an agent call.

Args:
query (Text): query to be processed by the agent.
session_id (Optional[Text], optional): conversation Session ID. Defaults to None.
history (Optional[List[Dict]], optional): chat history (in case session ID is None). Defaults to None.
name (Text, optional): ID given to a call. Defaults to "model_process".
timeout (float, optional): total polling time. Defaults to 300.
parameters (Dict, optional): optional parameters to the model. Defaults to "{}".
wait_time (float, optional): wait time in seconds between polling calls. Defaults to 0.5.

Returns:
Dict: parsed output from model
"""
start = time.time()
try:
response = self.run_async(query=query, session_id=session_id, history=history, name=name, parameters=parameters)
if response["status"] == "FAILED":
end = time.time()
response["elapsed_time"] = end - start
return response
poll_url = response["url"]
end = time.time()
response = self.sync_poll(poll_url, name=name, timeout=timeout, wait_time=wait_time)
return response
except Exception as e:
msg = f"Error in request for {name} - {traceback.format_exc()}"
logging.error(f"Model Run: Error in running for {name}: {e}")
end = time.time()
return {"status": "FAILED", "error": msg, "elapsed_time": end - start}

def run_async(
self,
query: Text,
session_id: Optional[Text] = None,
history: Optional[List[Dict]] = None,
name: Text = "model_process",
parameters: Dict = {},
) -> Dict:
"""Runs asynchronously an agent call.

Args:
query (Text): query to be processed by the agent.
session_id (Optional[Text], optional): conversation Session ID. Defaults to None.
history (Optional[List[Dict]], optional): chat history (in case session ID is None). Defaults to None.
name (Text, optional): ID given to a call. Defaults to "model_process".
parameters (Dict, optional): optional parameters to the model. Defaults to "{}".

Returns:
dict: polling URL in response
"""
headers = {"x-api-key": self.api_key, "Content-Type": "application/json"}
from aixplain.factories.file_factory import FileFactory

payload = {"id": self.id, "query": FileFactory.to_link(query), "sessionId": session_id, "history": history}
payload.update(parameters)
payload = json.dumps(payload)

r = _request_with_retry("post", self.url, headers=headers, data=payload)
logging.info(f"Model Run Async: Start service for {name} - {self.url} - {payload} - {headers}")

resp = None
try:
resp = r.json()
logging.info(f"Result of request for {name} - {r.status_code} - {resp}")

poll_url = resp["data"]
response = {"status": "IN_PROGRESS", "url": poll_url}
except Exception:
response = {"status": "FAILED"}
msg = f"Error in request for {name} - {traceback.format_exc()}"
logging.error(f"Model Run Async: Error in running for {name}: {resp}")
if resp is not None:
response["error"] = msg
return response

def delete(self) -> None:
"""Delete Corpus service"""
try:
url = f"http://54.86.247.242:8000/delete/{self.id}"
headers = {"Authorization": f"Token {config.TEAM_API_KEY}", "Content-Type": "application/json"}
logging.info(f"Start service for DELETE Agent - {url} - {headers}")
url = urljoin(config.BACKEND_URL, f"sdk/agents/{self.id}")
headers = {"x-api-key": config.TEAM_API_KEY, "Content-Type": "application/json"}
logging.debug(f"Start service for DELETE Agent - {url} - {headers}")
r = _request_with_retry("delete", url, headers=headers)
if r.status_code != 200:
raise Exception()
except Exception:
message = "Agent Deletion Error: Make sure the agent exists and you are the owner."
message = f"Agent Deletion Error (HTTP {r.status_code}): Make sure the agent exists and you are the owner."
logging.error(message)
raise Exception(f"{message}")
Loading