From 1d3f6653b4c8003eff1b213ddaac4f5820560023 Mon Sep 17 00:00:00 2001 From: phact Date: Sat, 7 Dec 2024 01:51:03 -0500 Subject: [PATCH 01/27] mcp WIP --- pyproject.toml | 1 + src/backend/base/langflow/api/router.py | 2 + src/backend/base/langflow/api/v1/__init__.py | 2 + src/backend/base/langflow/api/v1/mcp.py | 213 +++++++++++++++++++ uv.lock | 33 +++ 5 files changed, 251 insertions(+) create mode 100644 src/backend/base/langflow/api/v1/mcp.py diff --git a/pyproject.toml b/pyproject.toml index 388bf2331cee..87c9e9761aad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -114,6 +114,7 @@ dependencies = [ "sseclient-py==1.8.0", "arize-phoenix-otel>=0.6.1", "openinference-instrumentation-langchain>=0.1.29", + "mcp>=0.9.1", ] [project.urls] diff --git a/src/backend/base/langflow/api/router.py b/src/backend/base/langflow/api/router.py index d2ce1905ada0..f72aeeec909b 100644 --- a/src/backend/base/langflow/api/router.py +++ b/src/backend/base/langflow/api/router.py @@ -15,6 +15,7 @@ users_router, validate_router, variables_router, + mcp_router, ) router = APIRouter( @@ -33,3 +34,4 @@ router.include_router(monitor_router) router.include_router(folders_router) router.include_router(starter_projects_router) +router.include_router(mcp_router) diff --git a/src/backend/base/langflow/api/v1/__init__.py b/src/backend/base/langflow/api/v1/__init__.py index 48383770ab77..4c59a79ac514 100644 --- a/src/backend/base/langflow/api/v1/__init__.py +++ b/src/backend/base/langflow/api/v1/__init__.py @@ -11,6 +11,7 @@ from langflow.api.v1.users import router as users_router from langflow.api.v1.validate import router as validate_router from langflow.api.v1.variable import router as variables_router +from langflow.api.v1.mcp import router as mcp_router __all__ = [ "api_key_router", @@ -26,4 +27,5 @@ "users_router", "validate_router", "variables_router", + "mcp_router", ] diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py new file mode 100644 index 000000000000..bd67aad55f52 --- /dev/null +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -0,0 +1,213 @@ +import asyncio +import logging +import traceback +import json +from typing import Annotated +from uuid import UUID, uuid4 +from contextvars import ContextVar + +from fastapi import APIRouter, Request, Depends +from fastapi.responses import StreamingResponse +from pydantic_core import ValidationError +from starlette.background import BackgroundTasks + +from langflow.api.v1.chat import build_flow +from mcp.server import Server +from mcp.server.sse import SseServerTransport +import mcp.types as types + +from langflow.api.v1.schemas import InputValueRequest +from langflow.graph import Graph +from langflow.services.auth.utils import get_current_active_user +from langflow.services.database.models import Flow, User +from langflow.services.deps import get_session, get_async_session, get_db_service +from sqlmodel import select + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/mcp", tags=["mcp"]) + +server = Server("langflow-mcp-server") + +# Create a context variable to store the current user +current_user_ctx: ContextVar[User] = ContextVar("current_user_ctx") + + +def json_schema_from_flow(flow: Flow) -> dict: + """Generate JSON schema from flow input nodes.""" + # Get the flow's data which contains the nodes and their configurations + flow_data = flow.data if flow.data else {} + + graph = Graph.from_payload(flow_data) + input_nodes = [vertex for vertex in graph.vertices if vertex.is_input] + + properties = {} + required = [] + for node in input_nodes: + node_data = node.data["node"] + template = node_data["template"] + + for field_name, field_data in template.items(): + if field_data != "Component" and field_data.get("show", False) and not field_data.get("advanced", False): + field_type = field_data.get("type", "string") + properties[field_name] = { + "type": field_type, + "description": field_data.get("info", f"Input for {field_name}"), + } + + if field_data.get("required", False): + required.append(field_name) + + return {"type": "object", "properties": properties, "required": required} + + +@server.list_tools() +async def handle_list_tools(): + try: + session = next(get_session()) + flows = session.exec(select(Flow)).all() + tools = [] + name_count = {} # Track name occurrences + + for flow in flows: + # Generate unique name by appending _N if needed + base_name = flow.name + if base_name in name_count: + name_count[base_name] += 1 + unique_name = f"{base_name}_{name_count[base_name]}" + else: + name_count[base_name] = 0 + unique_name = base_name + + tool = types.Tool( + name=str(flow.id), # Use flow.id instead of name + description=f"{unique_name}: {flow.description}" + if flow.description + else f"Tool generated from flow: {unique_name}", + inputSchema=json_schema_from_flow(flow), + ) + tools.append(tool) + + return tools + except Exception as e: + logger.error(f"Error in listing tools: {str(e)}") + trace = traceback.format_exc() + logger.error(trace) + raise e + + +@server.call_tool() +async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent]: + """Handle tool execution requests.""" + try: + session = next(get_session()) + background_tasks = BackgroundTasks() + + try: + current_user = current_user_ctx.get() + except LookupError: + raise ValueError("No authenticated user found in context") + + flow = session.exec(select(Flow).where(Flow.id == UUID(name))).first() + + if not flow: + raise ValueError(f"Flow with id '{name}' not found") + + # Process inputs + processed_inputs = {} + for key, value in arguments.items(): + processed_inputs[key] = value + + # Send initial progress notification + # if progress_token := context.meta.progressToken: + # await context.session.send_progress_notification( + # progress_token=progress_token, + # progress=0.5, + # total=1.0 + # ) + + conversation_id = str(uuid4()) + input_request = InputValueRequest( + input_value=processed_inputs["input_value"], components=[], type="chat", session=conversation_id + ) + + result = "" + db_service = get_db_service() + async with db_service.with_async_session() as async_session: + try: + response = await build_flow( + flow_id=UUID(name), + inputs=input_request, + background_tasks=background_tasks, + current_user=current_user, + session=async_session, + ) + + async for line in response.body_iterator: + if not line: + continue + try: + event_data = json.loads(line) + if event_data.get("event") == "end_vertex": + result += ( + event_data.get("data", {}) + .get("build_data", {}) + .get("data", {}) + .get("results", {}) + .get("message", {}) + .get("text", "") + ) + except json.JSONDecodeError: + logger.warning(f"Failed to parse event data: {line}") + continue + except asyncio.CancelledError as e: + logger.info(f"Request was cancelled {e}") + trace = traceback.format_exc() + logger.info(trace) + return [types.TextContent(type="text", text="Request was cancelled")] + + # Send final progress notification + # if progress_token: + # await context.session.send_progress_notification( + # progress_token=progress_token, + # progress=1.0, + # total=1.0 + # ) + print(result) + + return [types.TextContent(type="text", text=result)] + + except Exception as e: + logger.error(f"Error executing tool {name}: {str(e)}") + trace = traceback.format_exc() + logger.error(trace) + raise + + +sse = SseServerTransport("/api/v1/mcp") + + +@router.get("/sse", response_class=StreamingResponse) +async def handle_sse(request: Request, current_user: Annotated[User, Depends(get_current_active_user)]): + # Store the current user in context + token = current_user_ctx.set(current_user) + try: + async with sse.connect_sse(request.scope, request.receive, request._send) as streams: + try: + await server.run(streams[0], streams[1], server.create_initialization_options()) + except ValidationError as e: + logger.warning(f"Validation error in MCP: {e}") + except asyncio.CancelledError as e: + logger.info(f"SSE connection was cancelled {e}") + except Exception as e: + logger.error(f"Error in MCP: {str(e)}") + trace = traceback.format_exc() + logger.error(trace) + raise + finally: + current_user_ctx.reset(token) + + +@router.post("/") +async def handle_messages(request: Request, current_user: Annotated[User, Depends(get_current_active_user)]): + await sse.handle_post_message(request.scope, request.receive, request._send) diff --git a/uv.lock b/uv.lock index a88935f79cdf..59d2ce598431 100644 --- a/uv.lock +++ b/uv.lock @@ -3590,6 +3590,7 @@ dependencies = [ { name = "litellm" }, { name = "markdown" }, { name = "markupsafe" }, + { name = "mcp" }, { name = "mem0ai" }, { name = "metal-sdk" }, { name = "metaphor-python" }, @@ -3744,6 +3745,7 @@ requires-dist = [ { name = "llama-cpp-python", marker = "extra == 'local'", specifier = "~=0.2.0" }, { name = "markdown", specifier = ">=3.7,<4.0.0" }, { name = "markupsafe", specifier = ">=2.1.3,<4.0.0" }, + { name = "mcp", specifier = ">=0.9.1" }, { name = "mem0ai", specifier = ">=0.1.26,<1.0.0" }, { name = "metal-sdk", specifier = ">=2.5.0,<3.0.0" }, { name = "metaphor-python", specifier = ">=0.1.11,<1.0.0" }, @@ -4372,6 +4374,23 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8f/8e/9ad090d3553c280a8060fbf6e24dc1c0c29704ee7d1c372f0c174aa59285/matplotlib_inline-0.1.7-py3-none-any.whl", hash = "sha256:df192d39a4ff8f21b1895d72e6a13f5fcc5099f00fa84384e0ea28c2cc0653ca", size = 9899 }, ] +[[package]] +name = "mcp" +version = "0.9.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "httpx" }, + { name = "httpx-sse" }, + { name = "pydantic" }, + { name = "sse-starlette" }, + { name = "starlette" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/e7/1c/932818470ffd49c33509110c835101a8dc4c9cdd06028b9f647fb3dde237/mcp-0.9.1.tar.gz", hash = "sha256:e8509a37c2ab546095788ed170e0fb4d7ce0cf5a3ee56b6449c78af27321a425", size = 78218 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b3/a0/2ee813d456b57a726d583868417d1ad900fbe12ee3c8cd866e3e804ca486/mcp-0.9.1-py3-none-any.whl", hash = "sha256:7f640fcfb0be486aa510594df309920ae1d375cdca1f8aff21db3a96d837f303", size = 31562 }, +] + [[package]] name = "mdurl" version = "0.1.2" @@ -7386,6 +7405,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/cc/5f/8838e6b1b6673709e93386d6d42d28030883079b5ebcbdc7a37f2953e993/sqlmodel-0.0.18-py3-none-any.whl", hash = "sha256:d70fdf8fe595e30a918660cf4537b9c5fc2fffdbfcba851a0135de73c3ebcbb7", size = 26507 }, ] +[[package]] +name = "sse-starlette" +version = "2.1.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "starlette" }, + { name = "uvicorn" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/72/fc/56ab9f116b2133521f532fce8d03194cf04dcac25f583cf3d839be4c0496/sse_starlette-2.1.3.tar.gz", hash = "sha256:9cd27eb35319e1414e3d2558ee7414487f9529ce3b3cf9b21434fd110e017169", size = 19678 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/52/aa/36b271bc4fa1d2796311ee7c7283a3a1c348bad426d37293609ca4300eef/sse_starlette-2.1.3-py3-none-any.whl", hash = "sha256:8ec846438b4665b9e8c560fcdea6bc8081a3abf7942faa95e5a744999d219772", size = 9383 }, +] + [[package]] name = "sseclient-py" version = "1.8.0" From 26dbf6f16fa4ffb2488a17c73db07fcc6bd673d7 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Sat, 7 Dec 2024 07:07:11 +0000 Subject: [PATCH 02/27] [autofix.ci] apply automated fixes --- src/backend/base/langflow/api/router.py | 2 +- src/backend/base/langflow/api/v1/__init__.py | 2 +- src/backend/base/langflow/api/v1/mcp.py | 23 ++++++++++---------- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/src/backend/base/langflow/api/router.py b/src/backend/base/langflow/api/router.py index f72aeeec909b..2e6ea01acdba 100644 --- a/src/backend/base/langflow/api/router.py +++ b/src/backend/base/langflow/api/router.py @@ -9,13 +9,13 @@ flows_router, folders_router, login_router, + mcp_router, monitor_router, starter_projects_router, store_router, users_router, validate_router, variables_router, - mcp_router, ) router = APIRouter( diff --git a/src/backend/base/langflow/api/v1/__init__.py b/src/backend/base/langflow/api/v1/__init__.py index 4c59a79ac514..eef95c4bd860 100644 --- a/src/backend/base/langflow/api/v1/__init__.py +++ b/src/backend/base/langflow/api/v1/__init__.py @@ -5,13 +5,13 @@ from langflow.api.v1.flows import router as flows_router from langflow.api.v1.folders import router as folders_router from langflow.api.v1.login import router as login_router +from langflow.api.v1.mcp import router as mcp_router from langflow.api.v1.monitor import router as monitor_router from langflow.api.v1.starter_projects import router as starter_projects_router from langflow.api.v1.store import router as store_router from langflow.api.v1.users import router as users_router from langflow.api.v1.validate import router as validate_router from langflow.api.v1.variable import router as variables_router -from langflow.api.v1.mcp import router as mcp_router __all__ = [ "api_key_router", diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index bd67aad55f52..70cfaa39f7ed 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -1,27 +1,26 @@ import asyncio +import json import logging import traceback -import json +from contextvars import ContextVar from typing import Annotated from uuid import UUID, uuid4 -from contextvars import ContextVar -from fastapi import APIRouter, Request, Depends +from fastapi import APIRouter, Depends, Request from fastapi.responses import StreamingResponse +from mcp import types +from mcp.server import Server +from mcp.server.sse import SseServerTransport from pydantic_core import ValidationError +from sqlmodel import select from starlette.background import BackgroundTasks from langflow.api.v1.chat import build_flow -from mcp.server import Server -from mcp.server.sse import SseServerTransport -import mcp.types as types - from langflow.api.v1.schemas import InputValueRequest from langflow.graph import Graph from langflow.services.auth.utils import get_current_active_user from langflow.services.database.models import Flow, User -from langflow.services.deps import get_session, get_async_session, get_db_service -from sqlmodel import select +from langflow.services.deps import get_db_service, get_session logger = logging.getLogger(__name__) @@ -90,7 +89,7 @@ async def handle_list_tools(): return tools except Exception as e: - logger.error(f"Error in listing tools: {str(e)}") + logger.error(f"Error in listing tools: {e!s}") trace = traceback.format_exc() logger.error(trace) raise e @@ -178,7 +177,7 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent return [types.TextContent(type="text", text=result)] except Exception as e: - logger.error(f"Error executing tool {name}: {str(e)}") + logger.error(f"Error executing tool {name}: {e!s}") trace = traceback.format_exc() logger.error(trace) raise @@ -200,7 +199,7 @@ async def handle_sse(request: Request, current_user: Annotated[User, Depends(get except asyncio.CancelledError as e: logger.info(f"SSE connection was cancelled {e}") except Exception as e: - logger.error(f"Error in MCP: {str(e)}") + logger.error(f"Error in MCP: {e!s}") trace = traceback.format_exc() logger.error(trace) raise From eb5e41fb691dab7957b65aa4ec5060a4e69005e3 Mon Sep 17 00:00:00 2001 From: phact Date: Wed, 11 Dec 2024 00:11:51 -0500 Subject: [PATCH 03/27] logging and flow user check --- src/backend/base/langflow/api/v1/mcp.py | 122 ++++++++++++++++-------- 1 file changed, 83 insertions(+), 39 deletions(-) diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index bd67aad55f52..95fc29352520 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -8,7 +8,7 @@ from fastapi import APIRouter, Request, Depends from fastapi.responses import StreamingResponse -from pydantic_core import ValidationError +from pydantic import ValidationError from starlette.background import BackgroundTasks from langflow.api.v1.chat import build_flow @@ -24,6 +24,22 @@ from sqlmodel import select logger = logging.getLogger(__name__) +if False: + logger.setLevel(logging.DEBUG) + if not logger.handlers: + handler = logging.StreamHandler() + handler.setLevel(logging.DEBUG) + formatter = logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) + + # Enable debug logging for MCP package + mcp_logger = logging.getLogger("mcp") + mcp_logger.setLevel(logging.DEBUG) + if not mcp_logger.handlers: + mcp_logger.addHandler(handler) + + logger.debug("MCP module loaded - debug logging enabled") router = APIRouter(prefix="/mcp", tags=["mcp"]) @@ -44,21 +60,25 @@ def json_schema_from_flow(flow: Flow) -> dict: properties = {} required = [] for node in input_nodes: - node_data = node.data["node"] - template = node_data["template"] + node_data = node.data['node'] + template = node_data['template'] for field_name, field_data in template.items(): - if field_data != "Component" and field_data.get("show", False) and not field_data.get("advanced", False): - field_type = field_data.get("type", "string") + if field_data != "Component" and field_data.get('show', False) and not field_data.get('advanced', False): + field_type = field_data.get('type', 'string') properties[field_name] = { "type": field_type, - "description": field_data.get("info", f"Input for {field_name}"), + "description": field_data.get('info', f"Input for {field_name}") } - if field_data.get("required", False): + if field_data.get('required', False): required.append(field_name) - return {"type": "object", "properties": properties, "required": required} + return { + "type": "object", + "properties": properties, + "required": required + } @server.list_tools() @@ -68,8 +88,10 @@ async def handle_list_tools(): flows = session.exec(select(Flow)).all() tools = [] name_count = {} # Track name occurrences - + for flow in flows: + if flow.user_id is None: + continue # Generate unique name by appending _N if needed base_name = flow.name if base_name in name_count: @@ -78,13 +100,11 @@ async def handle_list_tools(): else: name_count[base_name] = 0 unique_name = base_name - + tool = types.Tool( name=str(flow.id), # Use flow.id instead of name - description=f"{unique_name}: {flow.description}" - if flow.description - else f"Tool generated from flow: {unique_name}", - inputSchema=json_schema_from_flow(flow), + description=f"{unique_name}: {flow.description}" if flow.description else f"Tool generated from flow: {unique_name}", + inputSchema=json_schema_from_flow(flow) ) tools.append(tool) @@ -102,7 +122,7 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent try: session = next(get_session()) background_tasks = BackgroundTasks() - + try: current_user = current_user_ctx.get() except LookupError: @@ -119,7 +139,7 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent processed_inputs[key] = value # Send initial progress notification - # if progress_token := context.meta.progressToken: + #if progress_token := context.meta.progressToken: # await context.session.send_progress_notification( # progress_token=progress_token, # progress=0.5, @@ -128,7 +148,10 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent conversation_id = str(uuid4()) input_request = InputValueRequest( - input_value=processed_inputs["input_value"], components=[], type="chat", session=conversation_id + input_value=processed_inputs['input_value'], + components=[], + type="chat", + session=conversation_id ) result = "" @@ -149,25 +172,27 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent try: event_data = json.loads(line) if event_data.get("event") == "end_vertex": - result += ( - event_data.get("data", {}) - .get("build_data", {}) - .get("data", {}) - .get("results", {}) - .get("message", {}) - .get("text", "") - ) + result += event_data.get("data", {}).get("build_data", {}).get("data", {}).get( + "results", {}).get("message", {}).get("text", "") except json.JSONDecodeError: logger.warning(f"Failed to parse event data: {line}") continue except asyncio.CancelledError as e: - logger.info(f"Request was cancelled {e}") - trace = traceback.format_exc() - logger.info(trace) - return [types.TextContent(type="text", text="Request was cancelled")] + logger.info(f"Request was cancelled: {str(e)}") + # Create a proper cancellation notification + notification = types.ProgressNotification( + method="notifications/progress", + params=types.ProgressNotificationParams( + progressToken=str(uuid4()), + progress=1.0, + message=f"Request cancelled: {str(e)}" + ) + ) + await server.request_context.session.send_notification(notification) + return [types.TextContent(type="text", text=f"Request cancelled: {str(e)}")] # Send final progress notification - # if progress_token: + #if progress_token: # await context.session.send_progress_notification( # progress_token=progress_token, # progress=1.0, @@ -183,22 +208,38 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent logger.error(trace) raise - sse = SseServerTransport("/api/v1/mcp") - @router.get("/sse", response_class=StreamingResponse) -async def handle_sse(request: Request, current_user: Annotated[User, Depends(get_current_active_user)]): - # Store the current user in context +async def handle_sse( + request: Request, + current_user: Annotated[User, Depends(get_current_active_user)] +): token = current_user_ctx.set(current_user) try: async with sse.connect_sse(request.scope, request.receive, request._send) as streams: try: - await server.run(streams[0], streams[1], server.create_initialization_options()) + logger.debug("Starting SSE connection") + logger.debug(f"Stream types: read={type(streams[0])}, write={type(streams[1])}") + + # Let's look at the initialization options + init_options = server.create_initialization_options() + logger.debug(f"Initialization options: {init_options}") + + await server.run( + streams[0], streams[1], init_options + ) except ValidationError as e: logger.warning(f"Validation error in MCP: {e}") - except asyncio.CancelledError as e: - logger.info(f"SSE connection was cancelled {e}") + logger.debug(f"Failed message type: {type(e).__name__}") + logger.debug(f"Validation error details: {e.errors()}") + # Add more details about the failed validation + if hasattr(e, 'model'): + logger.debug(f"Failed validation model: {e.model.__name__}") + if hasattr(e, 'raw_errors'): + logger.debug(f"Raw validation errors: {e.raw_errors}") + except asyncio.CancelledError: + logger.info("SSE connection was cancelled") except Exception as e: logger.error(f"Error in MCP: {str(e)}") trace = traceback.format_exc() @@ -207,7 +248,10 @@ async def handle_sse(request: Request, current_user: Annotated[User, Depends(get finally: current_user_ctx.reset(token) - @router.post("/") -async def handle_messages(request: Request, current_user: Annotated[User, Depends(get_current_active_user)]): +async def handle_messages( + request: Request, + current_user: Annotated[User, Depends(get_current_active_user)] +): await sse.handle_post_message(request.scope, request.receive, request._send) + From c8c0840910fdc0f12c6cf14bbb280fcced4e3520 Mon Sep 17 00:00:00 2001 From: phact Date: Thu, 12 Dec 2024 12:29:45 -0500 Subject: [PATCH 04/27] mcp stdio client component --- pyproject.toml | 1 + src/backend/base/langflow/api/v1/mcp.py | 21 +-- .../langflow/components/tools/__init__.py | 2 + .../langflow/components/tools/mcp_stdio.py | 136 ++++++++++++++++++ uv.lock | 43 +++--- 5 files changed, 172 insertions(+), 31 deletions(-) create mode 100644 src/backend/base/langflow/components/tools/mcp_stdio.py diff --git a/pyproject.toml b/pyproject.toml index 87c9e9761aad..bfee646fa66b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -115,6 +115,7 @@ dependencies = [ "arize-phoenix-otel>=0.6.1", "openinference-instrumentation-langchain>=0.1.29", "mcp>=0.9.1", + "uv>=0.5.7", ] [project.urls] diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index 6e47e238765e..92494702b40b 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -8,10 +8,10 @@ from fastapi import APIRouter, Depends, Request from fastapi.responses import StreamingResponse -from pydantic import ValidationError from mcp import types from mcp.server import Server from mcp.server.sse import SseServerTransport +from pydantic import ValidationError from sqlmodel import select from starlette.background import BackgroundTasks @@ -178,16 +178,17 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent logger.warning(f"Failed to parse event data: {line}") continue except asyncio.CancelledError as e: - logger.info(f"Request was cancelled: {str(e)}") + logger.info(f"Request was cancelled: {e!s}") # Create a proper cancellation notification - notification = types.ProgressNotification( - method="notifications/progress", - params=types.ProgressNotificationParams( - progressToken=str(uuid4()), progress=1.0, message=f"Request cancelled: {str(e)}" - ), - ) - await server.request_context.session.send_notification(notification) - return [types.TextContent(type="text", text=f"Request cancelled: {str(e)}")] + # notification = types.ProgressNotification( + # method="notifications/progress", + # params=types.ProgressNotificationParams( + # progressToken=str(uuid4()), + # progress=1.0 + # ), + # ) + # await server.request_context.session.send_notification(notification) + return [types.TextContent(type="text", text=f"Request cancelled: {e!s}")] # Send final progress notification # if progress_token: diff --git a/src/backend/base/langflow/components/tools/__init__.py b/src/backend/base/langflow/components/tools/__init__.py index 17b5d11c6744..41f8c108413f 100644 --- a/src/backend/base/langflow/components/tools/__init__.py +++ b/src/backend/base/langflow/components/tools/__init__.py @@ -9,6 +9,7 @@ from .glean_search_api import GleanSearchAPIComponent from .google_search_api import GoogleSearchAPIComponent from .google_serper_api import GoogleSerperAPIComponent +from .mcp_stdio import MCPStdio from .python_code_structured_tool import PythonCodeStructuredTool from .python_repl import PythonREPLToolComponent from .search_api import SearchAPIComponent @@ -47,4 +48,5 @@ "WolframAlphaAPIComponent", "YfinanceToolComponent", "YouTubeTranscriptsComponent", + "MCPStdio", ] diff --git a/src/backend/base/langflow/components/tools/mcp_stdio.py b/src/backend/base/langflow/components/tools/mcp_stdio.py new file mode 100644 index 000000000000..03cdf8699e5d --- /dev/null +++ b/src/backend/base/langflow/components/tools/mcp_stdio.py @@ -0,0 +1,136 @@ +# from langflow.field_typing import Data +import os +from collections.abc import Awaitable, Callable +from contextlib import AsyncExitStack +from typing import Any + +from anthropic import Anthropic, BaseModel +from dotenv import load_dotenv +from langchain_core.tools import StructuredTool +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client +from pydantic import Field, create_model + +from langflow.custom import Component +from langflow.io import MessageTextInput, Output + +load_dotenv() # load environment variables from .env + + +class MCPClient: + def __init__(self): + # Initialize session and client objects + self.session: ClientSession | None = None + self.exit_stack = AsyncExitStack() + self.anthropic = Anthropic() + + async def connect_to_server(self, command: str): + server_params = StdioServerParameters( + command="uvx", args=["mcp-sse-shim"], env={"DEBUG": "true", "PATH": os.environ["PATH"]} + ) + + stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params)) + self.stdio, self.write = stdio_transport + self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write)) + + await self.session.initialize() + + # List available tools + response = await self.session.list_tools() + tools = response.tools + print("\nConnected to server with tools:", [tool.name for tool in tools]) + return tools + + +def create_input_schema_from_json_schema(schema: dict[str, Any]) -> type[BaseModel]: + """Converts a JSON schema into a Pydantic model dynamically. + + :param schema: The JSON schema as a dictionary. + :return: A Pydantic model class. + """ + if schema.get("type") != "object": + raise ValueError("JSON schema must be of type 'object' at the root level.") + + fields = {} + properties = schema.get("properties", {}) + required_fields = set(schema.get("required", [])) + + for field_name, field_def in properties.items(): + # Extract type + field_type_str = field_def.get("type", "str") # Default to string type if not specified + field_type = { + "string": str, + "str": str, + "integer": int, + "int": int, + "number": float, + "boolean": bool, + "array": list, + "object": dict, + }.get(field_type_str, Any) + + # Extract description and default if present + field_metadata = {"description": field_def.get("description", "")} + if field_name not in required_fields: + field_metadata["default"] = field_def.get("default", None) + + # Create Pydantic field + fields[field_name] = (field_type, Field(**field_metadata)) + + # Dynamically create the model + return create_model("InputSchema", **fields) + + +class MCPStdio(Component): + client = MCPClient() + tools = None + tool_names = [] + display_name = "Get Tools from MCP" + description = "Use as a template to create your own component." + documentation: str = "http://docs.langflow.org/components/custom" + icon = "code" + name = "CustomComponent" + + inputs = [ + MessageTextInput( + name="command", + display_name="mcp command", + info="mcp command", + value="uv mcp-sse-shim@latest", + tool_mode=True, + ), + ] + + outputs = [ + Output(display_name="Output", name="output", method="build_output"), + ] + + def create_tool_coroutine(self, tool_name: str) -> Callable[[dict], Awaitable]: + async def tool_coroutine(*args, **kwargs): + return await self.client.session.call_tool(tool_name, arguments=kwargs) + + return tool_coroutine + + async def build_output(self) -> list[StructuredTool]: + if self.client.session is None: + self.tools = await self.client.connect_to_server(self.command) + + tool_list = [] + + for tool in self.tools: + args_schema = create_input_schema_from_json_schema(tool.inputSchema) + callbacks = self.get_langchain_callbacks() + tool_list.append( + StructuredTool( + name=tool.name, # maybe format this + description=tool.description, + coroutine=self.create_tool_coroutine(tool.name), + args_schema=args_schema, + # args_schema=DataSchema, + handle_tool_error=True, + callbacks=callbacks, + ) + ) + + self.tool_names = [tool.name for tool in self.tools] + return tool_list diff --git a/uv.lock b/uv.lock index 59d2ce598431..c9fe15336900 100644 --- a/uv.lock +++ b/uv.lock @@ -3616,6 +3616,7 @@ dependencies = [ { name = "supabase" }, { name = "types-cachetools" }, { name = "upstash-vector" }, + { name = "uv" }, { name = "weaviate-client" }, { name = "wikipedia" }, { name = "wolframalpha" }, @@ -3772,6 +3773,7 @@ requires-dist = [ { name = "supabase", specifier = "~=2.6.0" }, { name = "types-cachetools", specifier = ">=5.3.0.5,<6.0.0" }, { name = "upstash-vector", specifier = ">=0.5.0,<1.0.0" }, + { name = "uv", specifier = ">=0.5.7" }, { name = "weaviate-client", specifier = ">=4.8,<5.0.0" }, { name = "wikipedia", specifier = ">=1.4.0,<2.0.0" }, { name = "wolframalpha", specifier = ">=5.1.3,<6.0.0" }, @@ -8209,27 +8211,26 @@ wheels = [ [[package]] name = "uv" -version = "0.4.25" -source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/d0/bc/1a013408b7f9f437385705652f404b6b15127ecf108327d13be493bdfb81/uv-0.4.25.tar.gz", hash = "sha256:d39077cdfe3246885fcdf32e7066ae731a166101d063629f9cea08738f79e6a3", size = 2064863 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/84/18/9c9056d373620b1cf5182ce9b2d258e86d117d667cf8883e12870f2a5edf/uv-0.4.25-py3-none-linux_armv6l.whl", hash = "sha256:94fb2b454afa6bdfeeea4b4581c878944ca9cf3a13712e6762f245f5fbaaf952", size = 13028246 }, - { url = "https://files.pythonhosted.org/packages/a1/19/8a3f09aba30ac5433dfecde55d5241a07c96bb12340c3b810bc58188a12e/uv-0.4.25-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:a7c3a18c20ddb527d296d1222bddf42b78031c50b5b4609d426569b5fb61f5b0", size = 13175265 }, - { url = "https://files.pythonhosted.org/packages/e8/c9/2f924bb29bd53c51b839c1c6126bd2cf4c451d4a7d8f34be078f9e31c57e/uv-0.4.25-py3-none-macosx_11_0_arm64.whl", hash = "sha256:18100f0f36419a154306ed6211e3490bf18384cdf3f1a0950848bf64b62fa251", size = 12255610 }, - { url = "https://files.pythonhosted.org/packages/b2/5a/d8f8971aeb3389679505cf633a786cd72a96ce232f80f14cfe5a693b4c64/uv-0.4.25-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.musllinux_1_1_aarch64.whl", hash = "sha256:6e981b1465e30102e41946adede9cb08051a5d70c6daf09f91a7ea84f0b75c08", size = 12506511 }, - { url = "https://files.pythonhosted.org/packages/e3/96/8c73520daeba5022cec8749e44afd4ca9ef774bf728af9c258bddec3577f/uv-0.4.25-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:578ae385fad6bd6f3868828e33d54994c716b315b1bc49106ec1f54c640837e4", size = 12836250 }, - { url = "https://files.pythonhosted.org/packages/67/3d/b0e810d365fb154fe1d380a0f43ee35a683cf9162f2501396d711bec2621/uv-0.4.25-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2d29a78f011ecc2f31c13605acb6574c2894c06d258b0f8d0dbb899986800450", size = 13521303 }, - { url = "https://files.pythonhosted.org/packages/2d/f4/dd3830ec7fc6e7e5237c184f30f2dbfed4f93605e472147eca1373bcc72b/uv-0.4.25-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:ec181be2bda10651a3558156409ac481549983e0276d0e3645e3b1464e7f8715", size = 14105308 }, - { url = "https://files.pythonhosted.org/packages/f4/4e/0fca02f8681e4870beda172552e747e0424f6e9186546b00a5e92525fea9/uv-0.4.25-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:50c7d0d9e7f392f81b13bf3b7e37768d1486f2fc9d533a54982aa0ed11e4db23", size = 13859475 }, - { url = "https://files.pythonhosted.org/packages/33/07/1100e9bc652f2850930f466869515d16ffe9582aaaaa99bac332ebdfe3ea/uv-0.4.25-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:2fc35b5273f1e018aecd66b70e0fd7d2eb6698853dde3e2fc644e7ebf9f825b1", size = 18100840 }, - { url = "https://files.pythonhosted.org/packages/fa/98/ba1cb7dd2aa639a064a9e49721e08f12a3424456d60dde1327e7c6437930/uv-0.4.25-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a7022a71ff63a3838796f40e954b76bf7820fc27e96fe002c537e75ff8e34f1d", size = 13645464 }, - { url = "https://files.pythonhosted.org/packages/0d/05/b97fb8c828a070e8291826922b2712d1146b11563b4860bc9ba80f5635d1/uv-0.4.25-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:e02afb0f6d4b58718347f7d7cfa5a801e985ce42181ba971ed85ef149f6658ca", size = 12694995 }, - { url = "https://files.pythonhosted.org/packages/b3/97/63df050811379130202898f60e735a1a331ba3a93b8aa1e9bb466f533913/uv-0.4.25-py3-none-musllinux_1_1_armv7l.whl", hash = "sha256:3d7680795ea78cdbabbcce73d039b2651cf1fa635ddc1aa3082660f6d6255c50", size = 12831737 }, - { url = "https://files.pythonhosted.org/packages/dc/e0/08352dcffa6e8435328861ea60b2c05e8bd030f1e93998443ba66209db7b/uv-0.4.25-py3-none-musllinux_1_1_i686.whl", hash = "sha256:aae9dcafd20d5ba978c8a4939ab942e8e2e155c109e9945207fbbd81d2892c9e", size = 13273529 }, - { url = "https://files.pythonhosted.org/packages/25/f4/eaf95e5eee4e2e69884df0953d094deae07216f72068ef1df08c0f49841d/uv-0.4.25-py3-none-musllinux_1_1_ppc64le.whl", hash = "sha256:4c55040e67470f2b73e95e432aba06f103a0b348ea0b9c6689b1029c8d9e89fd", size = 15039860 }, - { url = "https://files.pythonhosted.org/packages/69/04/482b1cc9e8d599c7d766c4ba2d7a512ed3989921443792f92f26b8d44fe6/uv-0.4.25-py3-none-musllinux_1_1_x86_64.whl", hash = "sha256:bdbfd0c476b9e80a3f89af96aed6dd7d2782646311317a9c72614ccce99bb2ad", size = 13776302 }, - { url = "https://files.pythonhosted.org/packages/cd/7e/3d1cb735cc3df6341ac884b73eeec1f51a29192721be40be8e9b1d82666d/uv-0.4.25-py3-none-win32.whl", hash = "sha256:7d266e02fefef930609328c31c075084295c3cb472bab3f69549fad4fd9d82b3", size = 12970553 }, - { url = "https://files.pythonhosted.org/packages/04/e9/c00d2bb4a286b13fad0f06488ea9cbe9e76d0efcd81e7a907f72195d5b83/uv-0.4.25-py3-none-win_amd64.whl", hash = "sha256:be2a4fc4fcade9ea5e67e51738c95644360d6e59b6394b74fc579fb617f902f7", size = 14702875 }, +version = "0.5.8" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/14/31/24c4d8d0d15f5a596fefb39a45e5628e2a4ac4b9c0a6044b4710d118673a/uv-0.5.8.tar.gz", hash = "sha256:2ee40bc9c08fea0e71092838c0fc36df83f741807d8be9acf2fd4c4757b3171e", size = 2494559 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/da/46/7a1310877b6ae012461c0bcc72629ee34a7c78749235ebf67d7856f24a91/uv-0.5.8-py3-none-linux_armv6l.whl", hash = "sha256:defd5da3685f43f74698634ffc197aaf9b836b8ba0de0e57b34d7bc74d856fa9", size = 14287864 }, + { url = "https://files.pythonhosted.org/packages/0f/b5/d02c8ce6bf46d648e9ef912308718a30ecff631904ba03acd11e5ec6412d/uv-0.5.8-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:e146062e4cc39db334cbde38d56d2c6301dd9cf6739ce07ce5a4d71b4cbc2d00", size = 14290268 }, + { url = "https://files.pythonhosted.org/packages/fb/5e/7277f92ee0aa8549e41152d9a0a7863d84e7b7b8de9b08cb397bfe1e37f6/uv-0.5.8-py3-none-macosx_11_0_arm64.whl", hash = "sha256:0f2bcdd00a49ad1669e217a2787448cac1653c9968d74bfa3732f3c25ca26f69", size = 13255149 }, + { url = "https://files.pythonhosted.org/packages/08/5b/72be4ba38e8e6cd2be60e97fd799629228afd3f46404767b0e1cfcf1236e/uv-0.5.8-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.musllinux_1_1_aarch64.whl", hash = "sha256:c91d0a2b8218af2aa0385b867da8c13a620db22077686793c7231f012cb40619", size = 13541600 }, + { url = "https://files.pythonhosted.org/packages/4d/cb/92485fea5f3fffb0f93820fe808b56ceeef1020ae234f8e2ba64f091ed4e/uv-0.5.8-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:8058ab06d2f69355694f6e9a36edc45164474c516b4e2895bd67f8232d9022ed", size = 14090419 }, + { url = "https://files.pythonhosted.org/packages/ac/b0/09a3a3d93299728485121b975a84b893aebdb6b712f65f43491bba7f82d0/uv-0.5.8-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c56022edc0f61febbdef89e6f699a0e991932c493b7293635b4814e102d040d2", size = 14638200 }, + { url = "https://files.pythonhosted.org/packages/3c/52/1082d3ca50d336035b5ef6c54caa4936aa2a6ad050ea61fca3068dd986b3/uv-0.5.8-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:84f26ce1736d075d1df34f7c3f6b0b728cecd9a4da3e5160d5d887587830e7ce", size = 15336063 }, + { url = "https://files.pythonhosted.org/packages/06/b5/d9d9a95646ca2404da11fa8f1e9953827ad793d8b92b65bb870f4c0de541/uv-0.5.8-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a7956787658fb9253fba49741886409402a48039bee64b1697397d27284919af", size = 15068797 }, + { url = "https://files.pythonhosted.org/packages/96/18/f92f7bf7b8769f8010ae4a9b545a0a183a806133174f65c46996e23c8268/uv-0.5.8-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:5989bbbbca072edc1875036c76aed74ec3dfc4741de7d1f060e181717efea6ac", size = 19540106 }, + { url = "https://files.pythonhosted.org/packages/a4/d8/757959dc58abfbf09afe024fbcf1ffb639b8537ea830d09a99d0300ee53c/uv-0.5.8-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2b3076c79746d4f83257c9dea5ba0833b0711aeff8e6695670eadd140a0cf67f", size = 14760582 }, + { url = "https://files.pythonhosted.org/packages/be/20/8b97777fbe6b983a845237c3132e4b540b9dcde73c2bc7c7c6f96ff46f29/uv-0.5.8-py3-none-manylinux_2_28_aarch64.whl", hash = "sha256:aa03c338e19456d3a6544a94293bd2905837ae22720cc161c83ea0fd13c3b09f", size = 13738416 }, + { url = "https://files.pythonhosted.org/packages/b4/fe/fd462516eeb6d58acf5736ea4e7b1b397454344d99c9a0c279bb96436c7b/uv-0.5.8-py3-none-musllinux_1_1_armv7l.whl", hash = "sha256:8a8cbe1ffa0ef5c2f1c90622e07211a8f93f48daa2be1bd4592bb8cda52b0285", size = 14044658 }, + { url = "https://files.pythonhosted.org/packages/be/d0/215c4fcd68e02f39c50557829365e75e60de2c246884753f1382bd75513e/uv-0.5.8-py3-none-musllinux_1_1_i686.whl", hash = "sha256:365eb6bbb551c5623a73b1ed530f4e69083016f70f0cf5ca1a30ec66413bcda2", size = 14359764 }, + { url = "https://files.pythonhosted.org/packages/41/3e/3d96e9c41cee4acf16aee39f4cae81f5651754ac6ca383be2031efc90eeb/uv-0.5.8-py3-none-musllinux_1_1_x86_64.whl", hash = "sha256:56715389d240ac989af2188cd3bfc2b603d31b42330e915dacfe113b34d8e65b", size = 14943042 }, + { url = "https://files.pythonhosted.org/packages/51/3e/3826d2e7c653649eec649262d5548b7ed6bdb5af7bed2a8bb5a127ac67bd/uv-0.5.8-py3-none-win32.whl", hash = "sha256:f8ade0430b6618ae0e21e52f61f6f3943dd6f3184ef6dc4491087b27940427f9", size = 14201492 }, + { url = "https://files.pythonhosted.org/packages/2f/d3/8ab1383ceccbc9f31bb9a265f90dfda4f6214229768ea9608df8a8c66e15/uv-0.5.8-py3-none-win_amd64.whl", hash = "sha256:4a3325af8ed1effa7076967472c063b0000d609fd6f561c7751e43bab30297f1", size = 15995992 }, ] [[package]] From 9044abc9bb0306dac101f96222aac27ec63821f6 Mon Sep 17 00:00:00 2001 From: phact Date: Thu, 12 Dec 2024 13:50:22 -0500 Subject: [PATCH 05/27] handle disconnect better --- src/backend/base/langflow/api/v1/mcp.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index 92494702b40b..bc1d8d01b0a5 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -234,6 +234,9 @@ async def handle_sse(request: Request, current_user: Annotated[User, Depends(get logger.debug(f"Failed validation model: {e.model.__name__}") if hasattr(e, "raw_errors"): logger.debug(f"Raw validation errors: {e.raw_errors}") + except BrokenResourceError: + # Handle gracefully when client disconnects + logger.info("Client disconnected from SSE connection") except asyncio.CancelledError: logger.info("SSE connection was cancelled") except Exception as e: From 55c86f9b131cc15b8143f17e393beb5181d653af Mon Sep 17 00:00:00 2001 From: phact Date: Thu, 12 Dec 2024 16:13:01 -0500 Subject: [PATCH 06/27] initialization --- src/backend/base/langflow/api/v1/mcp.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index bc1d8d01b0a5..9e072cbcae76 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -6,10 +6,11 @@ from typing import Annotated from uuid import UUID, uuid4 +from anyio import BrokenResourceError from fastapi import APIRouter, Depends, Request from fastapi.responses import StreamingResponse from mcp import types -from mcp.server import Server +from mcp.server import Server, NotificationOptions from mcp.server.sse import SseServerTransport from pydantic import ValidationError from sqlmodel import select @@ -76,6 +77,16 @@ def json_schema_from_flow(flow: Flow) -> dict: return {"type": "object", "properties": properties, "required": required} +@server.list_prompts() +async def handle_list_prompts(): + return [] + + +@server.list_resources() +async def handle_list_resources(): + return [] + + @server.list_tools() async def handle_list_tools(): try: @@ -220,8 +231,12 @@ async def handle_sse(request: Request, current_user: Annotated[User, Depends(get logger.debug("Starting SSE connection") logger.debug(f"Stream types: read={type(streams[0])}, write={type(streams[1])}") - # Let's look at the initialization options - init_options = server.create_initialization_options() + notification_options = NotificationOptions( + prompts_changed=True, + resources_changed=True, + tools_changed=True + ) + init_options = server.create_initialization_options(notification_options) logger.debug(f"Initialization options: {init_options}") await server.run(streams[0], streams[1], init_options) From f30b8dac1175f0b7b31cc5df29100d67008ef836 Mon Sep 17 00:00:00 2001 From: phact Date: Thu, 12 Dec 2024 16:57:57 -0500 Subject: [PATCH 07/27] session fix and type fix --- src/backend/base/langflow/api/v1/mcp.py | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index 9e072cbcae76..0b76825482ae 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -70,6 +70,19 @@ def json_schema_from_flow(flow: Flow) -> dict: "type": field_type, "description": field_data.get("info", f"Input for {field_name}"), } + # Update field_type in properties after determining the JSON Schema type + if field_type == "str": + field_type = "string" + elif field_type == "int": + field_type = "integer" + elif field_type == "float": + field_type = "number" + elif field_type == "bool": + field_type = "boolean" + else: + logger.warning(f"Unknown field type: {field_type} defaulting to string") + field_type = "string" + properties[field_name]["type"] = field_type if field_data.get("required", False): required.append(field_name) @@ -90,8 +103,8 @@ async def handle_list_resources(): @server.list_tools() async def handle_list_tools(): try: - session = next(get_session()) - flows = session.exec(select(Flow)).all() + session = await anext(get_session()) + flows = (await session.exec(select(Flow))).all() tools = [] name_count = {} # Track name occurrences @@ -128,7 +141,7 @@ async def handle_list_tools(): async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent]: """Handle tool execution requests.""" try: - session = next(get_session()) + session = await anext(get_session()) background_tasks = BackgroundTasks() try: @@ -136,7 +149,7 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent except LookupError: raise ValueError("No authenticated user found in context") - flow = session.exec(select(Flow).where(Flow.id == UUID(name))).first() + flow = (await session.exec(select(Flow).where(Flow.id == UUID(name)))).first() if not flow: raise ValueError(f"Flow with id '{name}' not found") @@ -232,9 +245,7 @@ async def handle_sse(request: Request, current_user: Annotated[User, Depends(get logger.debug(f"Stream types: read={type(streams[0])}, write={type(streams[1])}") notification_options = NotificationOptions( - prompts_changed=True, - resources_changed=True, - tools_changed=True + prompts_changed=True, resources_changed=True, tools_changed=True ) init_options = server.create_initialization_options(notification_options) logger.debug(f"Initialization options: {init_options}") From e7d51ffe1c28dfe760e8676c1f404468a948e21a Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 12 Dec 2024 21:59:33 +0000 Subject: [PATCH 08/27] [autofix.ci] apply automated fixes --- src/backend/base/langflow/api/v1/__init__.py | 2 +- src/backend/base/langflow/api/v1/mcp.py | 2 +- src/backend/base/langflow/components/tools/__init__.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/backend/base/langflow/api/v1/__init__.py b/src/backend/base/langflow/api/v1/__init__.py index eef95c4bd860..567415af18c1 100644 --- a/src/backend/base/langflow/api/v1/__init__.py +++ b/src/backend/base/langflow/api/v1/__init__.py @@ -21,11 +21,11 @@ "flows_router", "folders_router", "login_router", + "mcp_router", "monitor_router", "starter_projects_router", "store_router", "users_router", "validate_router", "variables_router", - "mcp_router", ] diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index 0b76825482ae..14d41183cd09 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -10,7 +10,7 @@ from fastapi import APIRouter, Depends, Request from fastapi.responses import StreamingResponse from mcp import types -from mcp.server import Server, NotificationOptions +from mcp.server import NotificationOptions, Server from mcp.server.sse import SseServerTransport from pydantic import ValidationError from sqlmodel import select diff --git a/src/backend/base/langflow/components/tools/__init__.py b/src/backend/base/langflow/components/tools/__init__.py index 41f8c108413f..707ecd075f6a 100644 --- a/src/backend/base/langflow/components/tools/__init__.py +++ b/src/backend/base/langflow/components/tools/__init__.py @@ -37,6 +37,7 @@ "GleanSearchAPIComponent", "GoogleSearchAPIComponent", "GoogleSerperAPIComponent", + "MCPStdio", "PythonCodeStructuredTool", "PythonREPLToolComponent", "SearXNGToolComponent", @@ -48,5 +49,4 @@ "WolframAlphaAPIComponent", "YfinanceToolComponent", "YouTubeTranscriptsComponent", - "MCPStdio", ] From 20bf6dfc77c8e18c8237e03f5b45c8cf962238a0 Mon Sep 17 00:00:00 2001 From: phact Date: Tue, 17 Dec 2024 03:15:29 -0500 Subject: [PATCH 09/27] defensive against mcp server bugs --- src/backend/base/langflow/api/v1/mcp.py | 44 ++++++++++++++++++------- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index 0b76825482ae..0088eba48342 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -6,6 +6,7 @@ from typing import Annotated from uuid import UUID, uuid4 +import pydantic from anyio import BrokenResourceError from fastapi import APIRouter, Depends, Request from fastapi.responses import StreamingResponse @@ -169,7 +170,10 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent conversation_id = str(uuid4()) input_request = InputValueRequest( - input_value=processed_inputs["input_value"], components=[], type="chat", session=conversation_id + input_value=processed_inputs.get("input_value", ""), + components=[], + type="chat", + session=conversation_id ) result = "" @@ -190,7 +194,7 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent try: event_data = json.loads(line) if event_data.get("event") == "end_vertex": - result += ( + message = ( event_data.get("data", {}) .get("build_data", {}) .get("data", {}) @@ -198,6 +202,8 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent .get("message", {}) .get("text", "") ) + if message: + result += str(message) except json.JSONDecodeError: logger.warning(f"Failed to parse event data: {line}") continue @@ -250,16 +256,30 @@ async def handle_sse(request: Request, current_user: Annotated[User, Depends(get init_options = server.create_initialization_options(notification_options) logger.debug(f"Initialization options: {init_options}") - await server.run(streams[0], streams[1], init_options) - except ValidationError as e: - logger.warning(f"Validation error in MCP: {e}") - logger.debug(f"Failed message type: {type(e).__name__}") - logger.debug(f"Validation error details: {e.errors()}") - # Add more details about the failed validation - if hasattr(e, "model"): - logger.debug(f"Failed validation model: {e.model.__name__}") - if hasattr(e, "raw_errors"): - logger.debug(f"Raw validation errors: {e.raw_errors}") + try: + await server.run(streams[0], streams[1], init_options) + except (pydantic.ValidationError, ExceptionGroup) as exc: + validation_error = None + + # For ExceptionGroup, find the validation error if present + if isinstance(exc, ExceptionGroup): + for inner_exc in exc.exceptions: + if isinstance(inner_exc, pydantic.ValidationError): + validation_error = inner_exc + break + elif isinstance(exc, pydantic.ValidationError): + validation_error = exc + + # Handle the validation error if found + if validation_error and any("cancelled" in err["input"] for err in validation_error.errors()): + logger.debug("Ignoring validation error for cancelled notification") + else: + # For other errors, log as error but don't crash + logger.error(f"Validation error in MCP: {exc}") + logger.debug(f"Failed message type: {type(exc).__name__}") + if validation_error: + logger.debug(f"Validation error details: {validation_error.errors()}") + return except BrokenResourceError: # Handle gracefully when client disconnects logger.info("Client disconnected from SSE connection") From 2167e7800a0537ed07c0f41ffad04fda62e29a98 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Tue, 17 Dec 2024 08:19:23 +0000 Subject: [PATCH 10/27] [autofix.ci] apply automated fixes --- src/backend/base/langflow/api/v1/mcp.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index 1db85777cbf2..0cc195e8e381 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -13,7 +13,6 @@ from mcp import types from mcp.server import NotificationOptions, Server from mcp.server.sse import SseServerTransport -from pydantic import ValidationError from sqlmodel import select from starlette.background import BackgroundTasks @@ -170,10 +169,7 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent conversation_id = str(uuid4()) input_request = InputValueRequest( - input_value=processed_inputs.get("input_value", ""), - components=[], - type="chat", - session=conversation_id + input_value=processed_inputs.get("input_value", ""), components=[], type="chat", session=conversation_id ) result = "" @@ -260,7 +256,7 @@ async def handle_sse(request: Request, current_user: Annotated[User, Depends(get await server.run(streams[0], streams[1], init_options) except (pydantic.ValidationError, ExceptionGroup) as exc: validation_error = None - + # For ExceptionGroup, find the validation error if present if isinstance(exc, ExceptionGroup): for inner_exc in exc.exceptions: @@ -269,7 +265,7 @@ async def handle_sse(request: Request, current_user: Annotated[User, Depends(get break elif isinstance(exc, pydantic.ValidationError): validation_error = exc - + # Handle the validation error if found if validation_error and any("cancelled" in err["input"] for err in validation_error.errors()): logger.debug("Ignoring validation error for cancelled notification") From 738de8929d0e64ed58e633b89415ed102544391b Mon Sep 17 00:00:00 2001 From: phact Date: Wed, 18 Dec 2024 11:44:10 -0500 Subject: [PATCH 11/27] notifications and sse component --- src/backend/base/langflow/api/v1/mcp.py | 138 ++++++++++-------- .../base/langflow/components/tools/mcp_sse.py | 116 +++++++++++++++ .../langflow/components/tools/mcp_stdio.py | 8 +- 3 files changed, 195 insertions(+), 67 deletions(-) create mode 100644 src/backend/base/langflow/components/tools/mcp_sse.py diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index 1db85777cbf2..98a74d1dcfc5 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -11,7 +11,7 @@ from fastapi import APIRouter, Depends, Request from fastapi.responses import StreamingResponse from mcp import types -from mcp.server import NotificationOptions, Server +from mcp.server import Server, NotificationOptions from mcp.server.sse import SseServerTransport from pydantic import ValidationError from sqlmodel import select @@ -161,84 +161,95 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent processed_inputs[key] = value # Send initial progress notification - # if progress_token := context.meta.progressToken: - # await context.session.send_progress_notification( - # progress_token=progress_token, - # progress=0.5, - # total=1.0 - # ) + if progress_token := server.request_context.meta.progressToken: + await server.request_context.session.send_progress_notification( + progress_token=progress_token, progress=0.0, total=1.0 + ) conversation_id = str(uuid4()) input_request = InputValueRequest( - input_value=processed_inputs.get("input_value", ""), - components=[], - type="chat", - session=conversation_id + input_value=processed_inputs.get("input_value", ""), components=[], type="chat", session=conversation_id ) - result = "" + async def send_progress_updates(): + if not (progress_token := server.request_context.meta.progressToken): + return + + try: + progress = 0.0 + while True: + await server.request_context.session.send_progress_notification( + progress_token=progress_token, progress=min(0.9, progress), total=1.0 + ) + progress += 0.1 + await asyncio.sleep(1.0) + except asyncio.CancelledError: + # Send final 100% progress + await server.request_context.session.send_progress_notification( + progress_token=progress_token, progress=1.0, total=1.0 + ) + raise + db_service = get_db_service() + collected_results = [] async with db_service.with_async_session() as async_session: try: - response = await build_flow( - flow_id=UUID(name), - inputs=input_request, - background_tasks=background_tasks, - current_user=current_user, - session=async_session, - ) + progress_task = asyncio.create_task(send_progress_updates()) - async for line in response.body_iterator: - if not line: - continue + try: + response = await build_flow( + flow_id=UUID(name), + inputs=input_request, + background_tasks=background_tasks, + current_user=current_user, + session=async_session, + ) + + async for line in response.body_iterator: + if not line: + continue + try: + event_data = json.loads(line) + if event_data.get("event") == "end_vertex": + message = ( + event_data.get("data", {}) + .get("build_data", {}) + .get("data", {}) + .get("results", {}) + .get("message", {}) + .get("text", "") + ) + if message: + collected_results.append(types.TextContent(type="text", text=str(message))) + except json.JSONDecodeError: + logger.warning(f"Failed to parse event data: {line}") + continue + + return collected_results + finally: + progress_task.cancel() try: - event_data = json.loads(line) - if event_data.get("event") == "end_vertex": - message = ( - event_data.get("data", {}) - .get("build_data", {}) - .get("data", {}) - .get("results", {}) - .get("message", {}) - .get("text", "") - ) - if message: - result += str(message) - except json.JSONDecodeError: - logger.warning(f"Failed to parse event data: {line}") - continue - except asyncio.CancelledError as e: - logger.info(f"Request was cancelled: {e!s}") - # Create a proper cancellation notification - # notification = types.ProgressNotification( - # method="notifications/progress", - # params=types.ProgressNotificationParams( - # progressToken=str(uuid4()), - # progress=1.0 - # ), - # ) - # await server.request_context.session.send_notification(notification) - return [types.TextContent(type="text", text=f"Request cancelled: {e!s}")] - - # Send final progress notification - # if progress_token: - # await context.session.send_progress_notification( - # progress_token=progress_token, - # progress=1.0, - # total=1.0 - # ) - print(result) - - return [types.TextContent(type="text", text=result)] + await progress_task + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Error in async session: {e}") + raise except Exception as e: + context = server.request_context + # Send error progress if there's an exception + if progress_token := context.meta.progressToken: + await server.request_context.session.send_progress_notification( + progress_token=progress_token, progress=1.0, total=1.0 + ) logger.error(f"Error executing tool {name}: {e!s}") trace = traceback.format_exc() logger.error(trace) raise -sse = SseServerTransport("/api/v1/mcp") +sse = SseServerTransport("/api/v1/mcp/") @router.get("/sse", response_class=StreamingResponse) @@ -260,7 +271,7 @@ async def handle_sse(request: Request, current_user: Annotated[User, Depends(get await server.run(streams[0], streams[1], init_options) except (pydantic.ValidationError, ExceptionGroup) as exc: validation_error = None - + # For ExceptionGroup, find the validation error if present if isinstance(exc, ExceptionGroup): for inner_exc in exc.exceptions: @@ -269,7 +280,7 @@ async def handle_sse(request: Request, current_user: Annotated[User, Depends(get break elif isinstance(exc, pydantic.ValidationError): validation_error = exc - + # Handle the validation error if found if validation_error and any("cancelled" in err["input"] for err in validation_error.errors()): logger.debug("Ignoring validation error for cancelled notification") @@ -296,4 +307,5 @@ async def handle_sse(request: Request, current_user: Annotated[User, Depends(get @router.post("/") async def handle_messages(request: Request, current_user: Annotated[User, Depends(get_current_active_user)]): + print(f"Handling message request {request}") await sse.handle_post_message(request.scope, request.receive, request._send) diff --git a/src/backend/base/langflow/components/tools/mcp_sse.py b/src/backend/base/langflow/components/tools/mcp_sse.py new file mode 100644 index 000000000000..e9b953afd93f --- /dev/null +++ b/src/backend/base/langflow/components/tools/mcp_sse.py @@ -0,0 +1,116 @@ +# from langflow.field_typing import Data +import traceback +from collections.abc import Awaitable, Callable +from contextlib import AsyncExitStack + +import httpx +from anthropic import Anthropic, BaseModel +from dotenv import load_dotenv +from langchain_core.tools import StructuredTool +from mcp import ClientSession +from mcp.client.sse import sse_client + +from langflow.components.tools.mcp_stdio import create_input_schema_from_json_schema +from langflow.custom import Component +from langflow.io import MessageTextInput, Output + +load_dotenv() # load environment variables from .env + + +class MCPSseClient: + def __init__(self): + # Initialize session and client objects + self.write = None + self.sse = None + self.session: ClientSession | None = None + self.exit_stack = AsyncExitStack() + self.anthropic = Anthropic() + + async def pre_check_redirect(self, url: str): + """Check if the URL responds with a 307 Redirect.""" + async with httpx.AsyncClient(follow_redirects=False) as client: + response = await client.request("HEAD", url) + if response.status_code == 307: + return response.headers.get("Location") # Return the redirect URL + return url # Return the original URL if no redirect + + async def connect_to_server( + self, url: str, headers: dict[str, str] = {}, timeout: int = 500, sse_read_timeout: int = 500 + ): + print("connecting") + url = await self.pre_check_redirect(url) + sse_transport = await self.exit_stack.enter_async_context(sse_client(url, headers, timeout, sse_read_timeout)) + self.sse, self.write = sse_transport + print("creating session") + self.session = await self.exit_stack.enter_async_context(ClientSession(self.sse, self.write)) + + print("initializing") + try: + await self.session.initialize() + except Exception as e: + print("Error initializing session:", e) + trace = traceback.format_exc() + print(trace) + raise + + # List available tools + print("listing tools") + response = await self.session.list_tools() + tools = response.tools + print("\nConnected to server with tools:", [tool.name for tool in tools]) + return tools + + +class MCPSse(Component): + client = MCPSseClient() + tools = None + tool_names = [] + display_name = "MCP Tools (SSE)" + description = "Use as a template to create your own component." + documentation: str = "http://docs.langflow.org/components/custom" + icon = "code" + name = "MCPSse" + + inputs = [ + MessageTextInput( + name="url", + display_name="mcp sse url", + info="sse url", + value="http://localhost:7860/api/v1/mcp/sse", + tool_mode=True, + ), + ] + + outputs = [ + Output(display_name="Output", name="output", method="build_output"), + ] + + def create_tool_coroutine(self, tool_name: str) -> Callable[[dict], Awaitable]: + async def tool_coroutine(*args, **kwargs): + return await self.client.session.call_tool(tool_name, arguments=kwargs) + + return tool_coroutine + + async def build_output(self) -> list[StructuredTool]: + if self.client.session is None: + self.tools = await self.client.connect_to_server(self.url) + + tool_list = [] + + for tool in self.tools: + args_schema = create_input_schema_from_json_schema(tool.inputSchema) + callbacks = self.get_langchain_callbacks() + tool_list.append( + StructuredTool( + name=tool.name, # maybe format this + description=tool.description, + coroutine=self.create_tool_coroutine(tool.name), + args_schema=args_schema, + # args_schema=DataSchema, + handle_tool_error=True, + callbacks=callbacks, + ) + ) + + self.tool_names = [tool.name for tool in self.tools] + return tool_list diff --git a/src/backend/base/langflow/components/tools/mcp_stdio.py b/src/backend/base/langflow/components/tools/mcp_stdio.py index 03cdf8699e5d..ffffb304fcdf 100644 --- a/src/backend/base/langflow/components/tools/mcp_stdio.py +++ b/src/backend/base/langflow/components/tools/mcp_stdio.py @@ -17,7 +17,7 @@ load_dotenv() # load environment variables from .env -class MCPClient: +class MCPStdioClient: def __init__(self): # Initialize session and client objects self.session: ClientSession | None = None @@ -82,14 +82,14 @@ def create_input_schema_from_json_schema(schema: dict[str, Any]) -> type[BaseMod class MCPStdio(Component): - client = MCPClient() + client = MCPStdioClient() tools = None tool_names = [] - display_name = "Get Tools from MCP" + display_name = "MCP Tools (stdio)" description = "Use as a template to create your own component." documentation: str = "http://docs.langflow.org/components/custom" icon = "code" - name = "CustomComponent" + name = "MCPStdio" inputs = [ MessageTextInput( From f85f8f0147cfc9fcc4cc44d92ab15dcece0aad7b Mon Sep 17 00:00:00 2001 From: phact Date: Thu, 19 Dec 2024 12:44:22 -0500 Subject: [PATCH 12/27] enabled flags and resource support --- src/backend/base/langflow/api/router.py | 5 +- src/backend/base/langflow/api/v1/mcp.py | 100 ++++++++++++++++-- .../base/langflow/services/settings/base.py | 6 ++ 3 files changed, 99 insertions(+), 12 deletions(-) diff --git a/src/backend/base/langflow/api/router.py b/src/backend/base/langflow/api/router.py index 2e6ea01acdba..2d6756b15aaa 100644 --- a/src/backend/base/langflow/api/router.py +++ b/src/backend/base/langflow/api/router.py @@ -1,5 +1,6 @@ # Router for base api from fastapi import APIRouter +from langflow.services.deps import get_settings_service from langflow.api.v1 import ( api_key_router, @@ -34,4 +35,6 @@ router.include_router(monitor_router) router.include_router(folders_router) router.include_router(starter_projects_router) -router.include_router(mcp_router) + +if get_settings_service().settings.mcp_server_enabled: + router.include_router(mcp_router) diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index 98a74d1dcfc5..58ce7c3faf64 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -1,4 +1,6 @@ import asyncio +import base64 +from urllib.parse import quote, urlparse, unquote import json import logging import traceback @@ -11,9 +13,8 @@ from fastapi import APIRouter, Depends, Request from fastapi.responses import StreamingResponse from mcp import types -from mcp.server import Server, NotificationOptions +from mcp.server import NotificationOptions, Server from mcp.server.sse import SseServerTransport -from pydantic import ValidationError from sqlmodel import select from starlette.background import BackgroundTasks @@ -22,7 +23,8 @@ from langflow.graph import Graph from langflow.services.auth.utils import get_current_active_user from langflow.services.database.models import Flow, User -from langflow.services.deps import get_db_service, get_session +from langflow.services.deps import get_db_service, get_session, get_settings_service, get_storage_service +from langflow.services.storage.utils import build_content_type_from_extension logger = logging.getLogger(__name__) if False: @@ -42,6 +44,8 @@ logger.debug("MCP module loaded - debug logging enabled") +enable_progress_notifications = get_settings_service().settings.mcp_server_enable_progress_notifications + router = APIRouter(prefix="/mcp", tags=["mcp"]) server = Server("langflow-mcp-server") @@ -98,7 +102,80 @@ async def handle_list_prompts(): @server.list_resources() async def handle_list_resources(): - return [] + try: + session = await anext(get_session()) + storage_service = get_storage_service() + settings_service = get_settings_service() + + # Build full URL from settings + host = getattr(settings_service.settings, "holst", "localhost") + port = getattr(settings_service.settings, "port", 3000) + + base_url = f"http://{host}:{port}".rstrip("/") + + flows = (await session.exec(select(Flow))).all() + resources = [] + + for flow in flows: + if flow.id: + try: + files = await storage_service.list_files(flow_id=str(flow.id)) + for file_name in files: + # URL encode the filename + safe_filename = quote(file_name) + resource = types.Resource( + uri=f"{base_url}/api/v1/files/{flow.id}/{safe_filename}", + name=file_name, + description=f"File in flow: {flow.name}", + mimeType=build_content_type_from_extension(file_name), + ) + resources.append(resource) + except Exception as e: + logger.debug(f"Error listing files for flow {flow.id}: {e}") + continue + + return resources + except Exception as e: + logger.error(f"Error in listing resources: {e!s}") + trace = traceback.format_exc() + logger.error(trace) + raise + + +@server.read_resource() +async def handle_read_resource(uri: str) -> bytes: + """Handle resource read requests.""" + try: + # Parse the URI properly + parsed_uri = urlparse(str(uri)) + # Path will be like /api/v1/files/{flow_id}/{filename} + path_parts = parsed_uri.path.split("/") + # Remove empty strings from split + path_parts = [p for p in path_parts if p] + + # The flow_id and filename should be the last two parts + if len(path_parts) < 2: + raise ValueError(f"Invalid URI format: {uri}") + + flow_id = path_parts[-2] + filename = unquote(path_parts[-1]) # URL decode the filename + + storage_service = get_storage_service() + + # Read the file content + content = await storage_service.get_file(flow_id=flow_id, file_name=filename) + if not content: + raise ValueError(f"File {filename} not found in flow {flow_id}") + + # Ensure content is base64 encoded + if isinstance(content, str): + content = content.encode() + return base64.b64encode(content) + except Exception as e: + logger.error(f"Error reading resource {uri}: {e!s}") + trace = traceback.format_exc() + logger.error(trace) + raise @server.list_tools() @@ -160,8 +237,8 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent for key, value in arguments.items(): processed_inputs[key] = value - # Send initial progress notification - if progress_token := server.request_context.meta.progressToken: + # Initial progress notification + if enable_progress_notifications and (progress_token := server.request_context.meta.progressToken): await server.request_context.session.send_progress_notification( progress_token=progress_token, progress=0.0, total=1.0 ) @@ -172,7 +249,7 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent ) async def send_progress_updates(): - if not (progress_token := server.request_context.meta.progressToken): + if not (enable_progress_notifications and server.request_context.meta.progressToken): return try: @@ -185,9 +262,10 @@ async def send_progress_updates(): await asyncio.sleep(1.0) except asyncio.CancelledError: # Send final 100% progress - await server.request_context.session.send_progress_notification( - progress_token=progress_token, progress=1.0, total=1.0 - ) + if enable_progress_notifications: + await server.request_context.session.send_progress_notification( + progress_token=progress_token, progress=1.0, total=1.0 + ) raise db_service = get_db_service() @@ -239,7 +317,7 @@ async def send_progress_updates(): except Exception as e: context = server.request_context # Send error progress if there's an exception - if progress_token := context.meta.progressToken: + if enable_progress_notifications and (progress_token := context.meta.progressToken): await server.request_context.session.send_progress_notification( progress_token=progress_token, progress=1.0, total=1.0 ) diff --git a/src/backend/base/langflow/services/settings/base.py b/src/backend/base/langflow/services/settings/base.py index f9b9db365d7f..ff90fed00858 100644 --- a/src/backend/base/langflow/services/settings/base.py +++ b/src/backend/base/langflow/services/settings/base.py @@ -180,6 +180,12 @@ class Settings(BaseSettings): max_vertex_builds_to_keep: int = 3000 """The maximum number of vertex builds to keep in the database.""" + # MCP Server + mcp_server_enabled: bool = True + """If set to False, Langflow will not enable the MCP server.""" + mcp_server_enable_progress_notifications: bool = False + """If set to False, Langflow will not send progress notifications in the MCP server.""" + @field_validator("dev") @classmethod def set_dev(cls, value): From bd24663acf010c2351834b2d3c601978caf282f0 Mon Sep 17 00:00:00 2001 From: phact Date: Thu, 19 Dec 2024 13:48:28 -0500 Subject: [PATCH 13/27] remove unneeded print --- src/backend/base/langflow/api/v1/mcp.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index 58ce7c3faf64..459dca0217b7 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -184,25 +184,16 @@ async def handle_list_tools(): session = await anext(get_session()) flows = (await session.exec(select(Flow))).all() tools = [] - name_count = {} # Track name occurrences for flow in flows: if flow.user_id is None: continue - # Generate unique name by appending _N if needed - base_name = flow.name - if base_name in name_count: - name_count[base_name] += 1 - unique_name = f"{base_name}_{name_count[base_name]}" - else: - name_count[base_name] = 0 - unique_name = base_name tool = types.Tool( name=str(flow.id), # Use flow.id instead of name - description=f"{unique_name}: {flow.description}" + description=f"{flow.name}: {flow.description}" if flow.description - else f"Tool generated from flow: {unique_name}", + else f"Tool generated from flow: {flow.name}", inputSchema=json_schema_from_flow(flow), ) tools.append(tool) @@ -385,5 +376,4 @@ async def handle_sse(request: Request, current_user: Annotated[User, Depends(get @router.post("/") async def handle_messages(request: Request, current_user: Annotated[User, Depends(get_current_active_user)]): - print(f"Handling message request {request}") await sse.handle_post_message(request.scope, request.receive, request._send) From 70e8a1602d07661ea71c7ad001993e050a7863d9 Mon Sep 17 00:00:00 2001 From: phact Date: Thu, 19 Dec 2024 14:07:26 -0500 Subject: [PATCH 14/27] extract json schema util --- src/backend/base/langflow/api/v1/mcp.py | 43 +---------------------- src/backend/base/langflow/helpers/flow.py | 43 +++++++++++++++++++++++ 2 files changed, 44 insertions(+), 42 deletions(-) diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index 459dca0217b7..4ca2ea6af13e 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -20,11 +20,11 @@ from langflow.api.v1.chat import build_flow from langflow.api.v1.schemas import InputValueRequest -from langflow.graph import Graph from langflow.services.auth.utils import get_current_active_user from langflow.services.database.models import Flow, User from langflow.services.deps import get_db_service, get_session, get_settings_service, get_storage_service from langflow.services.storage.utils import build_content_type_from_extension +from langflow.helpers.flow import json_schema_from_flow logger = logging.getLogger(__name__) if False: @@ -54,47 +54,6 @@ current_user_ctx: ContextVar[User] = ContextVar("current_user_ctx") -def json_schema_from_flow(flow: Flow) -> dict: - """Generate JSON schema from flow input nodes.""" - # Get the flow's data which contains the nodes and their configurations - flow_data = flow.data if flow.data else {} - - graph = Graph.from_payload(flow_data) - input_nodes = [vertex for vertex in graph.vertices if vertex.is_input] - - properties = {} - required = [] - for node in input_nodes: - node_data = node.data["node"] - template = node_data["template"] - - for field_name, field_data in template.items(): - if field_data != "Component" and field_data.get("show", False) and not field_data.get("advanced", False): - field_type = field_data.get("type", "string") - properties[field_name] = { - "type": field_type, - "description": field_data.get("info", f"Input for {field_name}"), - } - # Update field_type in properties after determining the JSON Schema type - if field_type == "str": - field_type = "string" - elif field_type == "int": - field_type = "integer" - elif field_type == "float": - field_type = "number" - elif field_type == "bool": - field_type = "boolean" - else: - logger.warning(f"Unknown field type: {field_type} defaulting to string") - field_type = "string" - properties[field_name]["type"] = field_type - - if field_data.get("required", False): - required.append(field_name) - - return {"type": "object", "properties": properties, "required": required} - - @server.list_prompts() async def handle_list_prompts(): return [] diff --git a/src/backend/base/langflow/helpers/flow.py b/src/backend/base/langflow/helpers/flow.py index 1e379c1c61eb..973fa2c8bb86 100644 --- a/src/backend/base/langflow/helpers/flow.py +++ b/src/backend/base/langflow/helpers/flow.py @@ -6,6 +6,7 @@ from fastapi import HTTPException from pydantic.v1 import BaseModel, Field, create_model from sqlmodel import select +from loguru import logger from langflow.schema.schema import INPUT_FIELD_NAME from langflow.services.database.models.flow import Flow @@ -312,3 +313,45 @@ async def generate_unique_flow_name(flow_name, user_id, session): # If a flow with the name already exists, append (n) to the name and increment n flow_name = f"{original_name} ({n})" n += 1 + + +def json_schema_from_flow(flow: Flow) -> dict: + """Generate JSON schema from flow input nodes.""" + from langflow.graph.graph.base import Graph + # Get the flow's data which contains the nodes and their configurations + flow_data = flow.data if flow.data else {} + + graph = Graph.from_payload(flow_data) + input_nodes = [vertex for vertex in graph.vertices if vertex.is_input] + + properties = {} + required = [] + for node in input_nodes: + node_data = node.data["node"] + template = node_data["template"] + + for field_name, field_data in template.items(): + if field_data != "Component" and field_data.get("show", False) and not field_data.get("advanced", False): + field_type = field_data.get("type", "string") + properties[field_name] = { + "type": field_type, + "description": field_data.get("info", f"Input for {field_name}"), + } + # Update field_type in properties after determining the JSON Schema type + if field_type == "str": + field_type = "string" + elif field_type == "int": + field_type = "integer" + elif field_type == "float": + field_type = "number" + elif field_type == "bool": + field_type = "boolean" + else: + logger.warning(f"Unknown field type: {field_type} defaulting to string") + field_type = "string" + properties[field_name]["type"] = field_type + + if field_data.get("required", False): + required.append(field_name) + + return {"type": "object", "properties": properties, "required": required} From 701c620f0209ec5c0470793174b19a1fd196dd97 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Thu, 19 Dec 2024 19:17:59 +0000 Subject: [PATCH 15/27] [autofix.ci] apply automated fixes --- src/backend/base/langflow/api/router.py | 2 +- src/backend/base/langflow/api/v1/mcp.py | 4 ++-- src/backend/base/langflow/components/tools/mcp_sse.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/backend/base/langflow/api/router.py b/src/backend/base/langflow/api/router.py index 2d6756b15aaa..39339035e1a6 100644 --- a/src/backend/base/langflow/api/router.py +++ b/src/backend/base/langflow/api/router.py @@ -1,6 +1,5 @@ # Router for base api from fastapi import APIRouter -from langflow.services.deps import get_settings_service from langflow.api.v1 import ( api_key_router, @@ -18,6 +17,7 @@ validate_router, variables_router, ) +from langflow.services.deps import get_settings_service router = APIRouter( prefix="/api/v1", diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index 4ca2ea6af13e..0af7e4ab3277 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -1,11 +1,11 @@ import asyncio import base64 -from urllib.parse import quote, urlparse, unquote import json import logging import traceback from contextvars import ContextVar from typing import Annotated +from urllib.parse import quote, unquote, urlparse from uuid import UUID, uuid4 import pydantic @@ -20,11 +20,11 @@ from langflow.api.v1.chat import build_flow from langflow.api.v1.schemas import InputValueRequest +from langflow.helpers.flow import json_schema_from_flow from langflow.services.auth.utils import get_current_active_user from langflow.services.database.models import Flow, User from langflow.services.deps import get_db_service, get_session, get_settings_service, get_storage_service from langflow.services.storage.utils import build_content_type_from_extension -from langflow.helpers.flow import json_schema_from_flow logger = logging.getLogger(__name__) if False: diff --git a/src/backend/base/langflow/components/tools/mcp_sse.py b/src/backend/base/langflow/components/tools/mcp_sse.py index e9b953afd93f..bd816671e3df 100644 --- a/src/backend/base/langflow/components/tools/mcp_sse.py +++ b/src/backend/base/langflow/components/tools/mcp_sse.py @@ -4,7 +4,7 @@ from contextlib import AsyncExitStack import httpx -from anthropic import Anthropic, BaseModel +from anthropic import Anthropic from dotenv import load_dotenv from langchain_core.tools import StructuredTool from mcp import ClientSession From f618688a3cc2b8eb5f0f9b618c225eeef596ed59 Mon Sep 17 00:00:00 2001 From: phact Date: Thu, 19 Dec 2024 15:20:31 -0500 Subject: [PATCH 16/27] ruff --- src/backend/base/langflow/api/router.py | 2 +- src/backend/base/langflow/api/v1/mcp.py | 133 ++++++++++-------- .../base/langflow/components/tools/mcp_sse.py | 30 ++-- .../langflow/components/tools/mcp_stdio.py | 18 ++- 4 files changed, 92 insertions(+), 91 deletions(-) diff --git a/src/backend/base/langflow/api/router.py b/src/backend/base/langflow/api/router.py index 2d6756b15aaa..39339035e1a6 100644 --- a/src/backend/base/langflow/api/router.py +++ b/src/backend/base/langflow/api/router.py @@ -1,6 +1,5 @@ # Router for base api from fastapi import APIRouter -from langflow.services.deps import get_settings_service from langflow.api.v1 import ( api_key_router, @@ -18,6 +17,7 @@ validate_router, variables_router, ) +from langflow.services.deps import get_settings_service router = APIRouter( prefix="/api/v1", diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index 4ca2ea6af13e..f9affd239344 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -1,11 +1,11 @@ import asyncio import base64 -from urllib.parse import quote, urlparse, unquote import json import logging import traceback from contextvars import ContextVar from typing import Annotated +from urllib.parse import quote, unquote, urlparse from uuid import UUID, uuid4 import pydantic @@ -20,11 +20,11 @@ from langflow.api.v1.chat import build_flow from langflow.api.v1.schemas import InputValueRequest +from langflow.helpers.flow import json_schema_from_flow from langflow.services.auth.utils import get_current_active_user from langflow.services.database.models import Flow, User from langflow.services.deps import get_db_service, get_session, get_settings_service, get_storage_service from langflow.services.storage.utils import build_content_type_from_extension -from langflow.helpers.flow import json_schema_from_flow logger = logging.getLogger(__name__) if False: @@ -53,6 +53,9 @@ # Create a context variable to store the current user current_user_ctx: ContextVar[User] = ContextVar("current_user_ctx") +# Define constants +MAX_RETRIES = 2 + @server.list_prompts() async def handle_list_prompts(): @@ -61,6 +64,7 @@ async def handle_list_prompts(): @server.list_resources() async def handle_list_resources(): + resources = [] try: session = await anext(get_session()) storage_service = get_storage_service() @@ -73,7 +77,6 @@ async def handle_list_resources(): base_url = f"http://{host}:{port}".rstrip("/") flows = (await session.exec(select(Flow))).all() - resources = [] for flow in flows: if flow.id: @@ -89,16 +92,17 @@ async def handle_list_resources(): mimeType=build_content_type_from_extension(file_name), ) resources.append(resource) - except Exception as e: - logger.debug(f"Error listing files for flow {flow.id}: {e}") + except FileNotFoundError as e: + msg = f"Error listing files for flow {flow.id}: {e}" + logger.debug(msg) continue - - return resources except Exception as e: - logger.error(f"Error in listing resources: {e!s}") + msg = f"Error in listing resources: {e!s}" + logger.exception(msg) trace = traceback.format_exc() - logger.error(trace) + logger.exception(trace) raise + return resources @server.read_resource() @@ -113,8 +117,10 @@ async def handle_read_resource(uri: str) -> bytes: path_parts = [p for p in path_parts if p] # The flow_id and filename should be the last two parts - if len(path_parts) < 2: - raise ValueError(f"Invalid URI format: {uri}") + two = 2 + if len(path_parts) < two: + msg = f"Invalid URI format: {uri}" + raise ValueError(msg) flow_id = path_parts[-2] filename = unquote(path_parts[-1]) # URL decode the filename @@ -124,27 +130,30 @@ async def handle_read_resource(uri: str) -> bytes: # Read the file content content = await storage_service.get_file(flow_id=flow_id, file_name=filename) if not content: - raise ValueError(f"File {filename} not found in flow {flow_id}") + msg = f"File {filename} not found in flow {flow_id}" + raise ValueError(msg) # Ensure content is base64 encoded if isinstance(content, str): content = content.encode() return base64.b64encode(content) except Exception as e: - logger.error(f"Error reading resource {uri}: {e!s}") + msg = f"Error reading resource {uri}: {e!s}" + logger.exception(msg) trace = traceback.format_exc() - logger.error(trace) + logger.exception(trace) raise @server.list_tools() async def handle_list_tools(): + tools = [] try: session = await anext(get_session()) flows = (await session.exec(select(Flow))).all() - tools = [] for flow in flows: + tools = [] if flow.user_id is None: continue @@ -156,13 +165,13 @@ async def handle_list_tools(): inputSchema=json_schema_from_flow(flow), ) tools.append(tool) - - return tools except Exception as e: - logger.error(f"Error in listing tools: {e!s}") + msg = f"Error in listing tools: {e!s}" + logger.exception(msg) trace = traceback.format_exc() - logger.error(trace) - raise e + logger.exception(trace) + raise + return tools @server.call_tool() @@ -172,20 +181,15 @@ async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent session = await anext(get_session()) background_tasks = BackgroundTasks() - try: - current_user = current_user_ctx.get() - except LookupError: - raise ValueError("No authenticated user found in context") - + current_user = current_user_ctx.get() flow = (await session.exec(select(Flow).where(Flow.id == UUID(name)))).first() if not flow: - raise ValueError(f"Flow with id '{name}' not found") + msg = f"Flow with id '{name}' not found" + raise ValueError(msg) # Process inputs - processed_inputs = {} - for key, value in arguments.items(): - processed_inputs[key] = value + processed_inputs = dict(arguments) # Initial progress notification if enable_progress_notifications and (progress_token := server.request_context.meta.progressToken): @@ -250,7 +254,8 @@ async def send_progress_updates(): if message: collected_results.append(types.TextContent(type="text", text=str(message))) except json.JSONDecodeError: - logger.warning(f"Failed to parse event data: {line}") + msg = f"Failed to parse event data: {line}" + logger.warning(msg) continue return collected_results @@ -261,7 +266,8 @@ async def send_progress_updates(): except asyncio.CancelledError: pass except Exception as e: - logger.error(f"Error in async session: {e}") + msg = f"Error in async session: {e}" + logger.exception(msg) raise except Exception as e: @@ -271,68 +277,75 @@ async def send_progress_updates(): await server.request_context.session.send_progress_notification( progress_token=progress_token, progress=1.0, total=1.0 ) - logger.error(f"Error executing tool {name}: {e!s}") + msg = f"Error executing tool {name}: {e!s}" + logger.exception(msg) trace = traceback.format_exc() - logger.error(trace) + logger.exception(trace) raise sse = SseServerTransport("/api/v1/mcp/") +def find_validation_error(exc): + """Recursively searches for a pydantic.ValidationError in nested exceptions.""" + if isinstance(exc, pydantic.ValidationError): + return exc + + # Check for nested exceptions in __cause__ or __context__ + if hasattr(exc, "__cause__") and exc.__cause__: + return find_validation_error(exc.__cause__) + if hasattr(exc, "__context__") and exc.__context__: + return find_validation_error(exc.__context__) + + # No validation error found + return None + + @router.get("/sse", response_class=StreamingResponse) async def handle_sse(request: Request, current_user: Annotated[User, Depends(get_current_active_user)]): token = current_user_ctx.set(current_user) try: async with sse.connect_sse(request.scope, request.receive, request._send) as streams: try: - logger.debug("Starting SSE connection") - logger.debug(f"Stream types: read={type(streams[0])}, write={type(streams[1])}") + msg = "Starting SSE connection" + logger.debug(msg) + msg = f"Stream types: read={type(streams[0])}, write={type(streams[1])}" + logger.debug(msg) notification_options = NotificationOptions( prompts_changed=True, resources_changed=True, tools_changed=True ) init_options = server.create_initialization_options(notification_options) - logger.debug(f"Initialization options: {init_options}") + msg = f"Initialization options: {init_options}" + logger.debug(msg) try: await server.run(streams[0], streams[1], init_options) - except (pydantic.ValidationError, ExceptionGroup) as exc: - validation_error = None - - # For ExceptionGroup, find the validation error if present - if isinstance(exc, ExceptionGroup): - for inner_exc in exc.exceptions: - if isinstance(inner_exc, pydantic.ValidationError): - validation_error = inner_exc - break - elif isinstance(exc, pydantic.ValidationError): - validation_error = exc - - # Handle the validation error if found - if validation_error and any("cancelled" in err["input"] for err in validation_error.errors()): - logger.debug("Ignoring validation error for cancelled notification") + except Exception as exc: # noqa: BLE001 + validation_error = find_validation_error(exc) + if validation_error: + msg = "Validation error in MCP:" + str(validation_error) + logger.debug(msg) else: - # For other errors, log as error but don't crash - logger.error(f"Validation error in MCP: {exc}") - logger.debug(f"Failed message type: {type(exc).__name__}") - if validation_error: - logger.debug(f"Validation error details: {validation_error.errors()}") - return + msg = f"Error in MCP: {exc!s}" + logger.debug(msg) + return except BrokenResourceError: # Handle gracefully when client disconnects logger.info("Client disconnected from SSE connection") except asyncio.CancelledError: logger.info("SSE connection was cancelled") except Exception as e: - logger.error(f"Error in MCP: {e!s}") + msg = f"Error in MCP: {e!s}" + logger.exception(msg) trace = traceback.format_exc() - logger.error(trace) + logger.exception(trace) raise finally: current_user_ctx.reset(token) @router.post("/") -async def handle_messages(request: Request, current_user: Annotated[User, Depends(get_current_active_user)]): +async def handle_messages(request: Request): await sse.handle_post_message(request.scope, request.receive, request._send) diff --git a/src/backend/base/langflow/components/tools/mcp_sse.py b/src/backend/base/langflow/components/tools/mcp_sse.py index e9b953afd93f..be93f96af7b0 100644 --- a/src/backend/base/langflow/components/tools/mcp_sse.py +++ b/src/backend/base/langflow/components/tools/mcp_sse.py @@ -1,10 +1,8 @@ # from langflow.field_typing import Data -import traceback from collections.abc import Awaitable, Callable from contextlib import AsyncExitStack import httpx -from anthropic import Anthropic, BaseModel from dotenv import load_dotenv from langchain_core.tools import StructuredTool from mcp import ClientSession @@ -16,6 +14,9 @@ load_dotenv() # load environment variables from .env +# Define constant for status code +HTTP_TEMPORARY_REDIRECT = 307 + class MCPSseClient: def __init__(self): @@ -24,41 +25,30 @@ def __init__(self): self.sse = None self.session: ClientSession | None = None self.exit_stack = AsyncExitStack() - self.anthropic = Anthropic() async def pre_check_redirect(self, url: str): """Check if the URL responds with a 307 Redirect.""" async with httpx.AsyncClient(follow_redirects=False) as client: response = await client.request("HEAD", url) - if response.status_code == 307: + if response.status_code == HTTP_TEMPORARY_REDIRECT: return response.headers.get("Location") # Return the redirect URL return url # Return the original URL if no redirect async def connect_to_server( - self, url: str, headers: dict[str, str] = {}, timeout: int = 500, sse_read_timeout: int = 500 + self, url: str, headers: dict[str, str] | None, timeout: int = 500, sse_read_timeout: int = 500 ): - print("connecting") + if headers is None: + headers = {} url = await self.pre_check_redirect(url) sse_transport = await self.exit_stack.enter_async_context(sse_client(url, headers, timeout, sse_read_timeout)) self.sse, self.write = sse_transport - print("creating session") self.session = await self.exit_stack.enter_async_context(ClientSession(self.sse, self.write)) - print("initializing") - try: - await self.session.initialize() - except Exception as e: - print("Error initializing session:", e) - trace = traceback.format_exc() - print(trace) - raise + await self.session.initialize() # List available tools - print("listing tools") response = await self.session.list_tools() - tools = response.tools - print("\nConnected to server with tools:", [tool.name for tool in tools]) - return tools + return response.tools class MCPSse(Component): @@ -86,7 +76,7 @@ class MCPSse(Component): ] def create_tool_coroutine(self, tool_name: str) -> Callable[[dict], Awaitable]: - async def tool_coroutine(*args, **kwargs): + async def tool_coroutine(**kwargs): return await self.client.session.call_tool(tool_name, arguments=kwargs) return tool_coroutine diff --git a/src/backend/base/langflow/components/tools/mcp_stdio.py b/src/backend/base/langflow/components/tools/mcp_stdio.py index ffffb304fcdf..2eaeec30761d 100644 --- a/src/backend/base/langflow/components/tools/mcp_stdio.py +++ b/src/backend/base/langflow/components/tools/mcp_stdio.py @@ -4,12 +4,11 @@ from contextlib import AsyncExitStack from typing import Any -from anthropic import Anthropic, BaseModel from dotenv import load_dotenv from langchain_core.tools import StructuredTool from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client -from pydantic import Field, create_model +from pydantic import BaseModel, Field, create_model from langflow.custom import Component from langflow.io import MessageTextInput, Output @@ -22,11 +21,11 @@ def __init__(self): # Initialize session and client objects self.session: ClientSession | None = None self.exit_stack = AsyncExitStack() - self.anthropic = Anthropic() - async def connect_to_server(self, command: str): + async def connect_to_server(self, command_str: str): + command = command_str.split(" ") server_params = StdioServerParameters( - command="uvx", args=["mcp-sse-shim"], env={"DEBUG": "true", "PATH": os.environ["PATH"]} + command=command[0], args=command[1:], env={"DEBUG": "true", "PATH": os.environ["PATH"]} ) stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params)) @@ -37,9 +36,7 @@ async def connect_to_server(self, command: str): # List available tools response = await self.session.list_tools() - tools = response.tools - print("\nConnected to server with tools:", [tool.name for tool in tools]) - return tools + return response.tools def create_input_schema_from_json_schema(schema: dict[str, Any]) -> type[BaseModel]: @@ -49,7 +46,8 @@ def create_input_schema_from_json_schema(schema: dict[str, Any]) -> type[BaseMod :return: A Pydantic model class. """ if schema.get("type") != "object": - raise ValueError("JSON schema must be of type 'object' at the root level.") + msg = "JSON schema must be of type 'object' at the root level." + raise ValueError(msg) fields = {} properties = schema.get("properties", {}) @@ -106,7 +104,7 @@ class MCPStdio(Component): ] def create_tool_coroutine(self, tool_name: str) -> Callable[[dict], Awaitable]: - async def tool_coroutine(*args, **kwargs): + async def tool_coroutine(**kwargs): return await self.client.session.call_tool(tool_name, arguments=kwargs) return tool_coroutine From 21effbb1bdc273ff01fe3533c6cfba07146c964d Mon Sep 17 00:00:00 2001 From: phact Date: Thu, 19 Dec 2024 16:10:30 -0500 Subject: [PATCH 17/27] fix tools [] bug and db asysnc session api change --- src/backend/base/langflow/api/v1/mcp.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index f9affd239344..e8705df655ee 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -153,7 +153,6 @@ async def handle_list_tools(): flows = (await session.exec(select(Flow))).all() for flow in flows: - tools = [] if flow.user_id is None: continue @@ -224,7 +223,7 @@ async def send_progress_updates(): db_service = get_db_service() collected_results = [] - async with db_service.with_async_session() as async_session: + async with db_service.with_session() as async_session: try: progress_task = asyncio.create_task(send_progress_updates()) From 72253baa5b51e4a7e33e07d168055d082bba515d Mon Sep 17 00:00:00 2001 From: phact Date: Mon, 23 Dec 2024 09:35:00 -0500 Subject: [PATCH 18/27] Tool instead of StructuredTool --- .../base/langflow/base/mcp/__init__.py | 0 src/backend/base/langflow/base/mcp/util.py | 23 +++++++++++++++ .../base/langflow/components/tools/mcp_sse.py | 26 +++++------------ .../langflow/components/tools/mcp_stdio.py | 29 ++++++------------- 4 files changed, 40 insertions(+), 38 deletions(-) create mode 100644 src/backend/base/langflow/base/mcp/__init__.py create mode 100644 src/backend/base/langflow/base/mcp/util.py diff --git a/src/backend/base/langflow/base/mcp/__init__.py b/src/backend/base/langflow/base/mcp/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/src/backend/base/langflow/base/mcp/util.py b/src/backend/base/langflow/base/mcp/util.py new file mode 100644 index 000000000000..ca508fcb61bf --- /dev/null +++ b/src/backend/base/langflow/base/mcp/util.py @@ -0,0 +1,23 @@ +import asyncio +from collections.abc import Awaitable, Callable + +from langflow.helpers.base_model import BaseModel + + +def create_tool_coroutine(tool_name: str, arg_schema: type[BaseModel], session) -> Callable[[dict], Awaitable]: + async def tool_coroutine(*args): + assert len(args) > 0, f"at least one positional argument is required {args}" + arg_dict = dict(zip(arg_schema.model_fields.keys(), args, strict=False)) + return await session.call_tool(tool_name, arguments=arg_dict) + + return tool_coroutine + + +def create_tool_func(tool_name: str, arg_schema: type[BaseModel], session) -> [Callable[..., str]]: + def tool_func(**kwargs): + assert len(kwargs) > 0, f"at least one named argument is required {kwargs}" + + loop = asyncio.get_event_loop() + return loop.run_until_complete(session.call_tool(tool_name, arguments=kwargs)) + + return tool_func diff --git a/src/backend/base/langflow/components/tools/mcp_sse.py b/src/backend/base/langflow/components/tools/mcp_sse.py index be93f96af7b0..ce3468e815c5 100644 --- a/src/backend/base/langflow/components/tools/mcp_sse.py +++ b/src/backend/base/langflow/components/tools/mcp_sse.py @@ -1,15 +1,15 @@ # from langflow.field_typing import Data -from collections.abc import Awaitable, Callable from contextlib import AsyncExitStack import httpx from dotenv import load_dotenv -from langchain_core.tools import StructuredTool from mcp import ClientSession from mcp.client.sse import sse_client +from langflow.base.mcp.util import create_tool_coroutine, create_tool_func from langflow.components.tools.mcp_stdio import create_input_schema_from_json_schema from langflow.custom import Component +from langflow.field_typing import Tool from langflow.io import MessageTextInput, Output load_dotenv() # load environment variables from .env @@ -72,33 +72,23 @@ class MCPSse(Component): ] outputs = [ - Output(display_name="Output", name="output", method="build_output"), + Output(display_name="Tools", name="tools", method="build_output"), ] - def create_tool_coroutine(self, tool_name: str) -> Callable[[dict], Awaitable]: - async def tool_coroutine(**kwargs): - return await self.client.session.call_tool(tool_name, arguments=kwargs) - - return tool_coroutine - - async def build_output(self) -> list[StructuredTool]: + async def build_output(self) -> list[Tool]: if self.client.session is None: - self.tools = await self.client.connect_to_server(self.url) + self.tools = await self.client.connect_to_server(self.url, {}) tool_list = [] for tool in self.tools: args_schema = create_input_schema_from_json_schema(tool.inputSchema) - callbacks = self.get_langchain_callbacks() tool_list.append( - StructuredTool( + Tool( name=tool.name, # maybe format this description=tool.description, - coroutine=self.create_tool_coroutine(tool.name), - args_schema=args_schema, - # args_schema=DataSchema, - handle_tool_error=True, - callbacks=callbacks, + coroutine=create_tool_coroutine(tool.name, args_schema, self.client.session), + func=create_tool_func(tool.name, args_schema, self.client.session), ) ) diff --git a/src/backend/base/langflow/components/tools/mcp_stdio.py b/src/backend/base/langflow/components/tools/mcp_stdio.py index 2eaeec30761d..bb23d69625b2 100644 --- a/src/backend/base/langflow/components/tools/mcp_stdio.py +++ b/src/backend/base/langflow/components/tools/mcp_stdio.py @@ -1,16 +1,16 @@ # from langflow.field_typing import Data import os -from collections.abc import Awaitable, Callable from contextlib import AsyncExitStack from typing import Any from dotenv import load_dotenv -from langchain_core.tools import StructuredTool from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client from pydantic import BaseModel, Field, create_model +from langflow.base.mcp.util import create_tool_coroutine, create_tool_func from langflow.custom import Component +from langflow.field_typing import Tool from langflow.io import MessageTextInput, Output load_dotenv() # load environment variables from .env @@ -94,22 +94,16 @@ class MCPStdio(Component): name="command", display_name="mcp command", info="mcp command", - value="uv mcp-sse-shim@latest", + value="uvx mcp-sse-shim@latest", tool_mode=True, ), ] outputs = [ - Output(display_name="Output", name="output", method="build_output"), + Output(display_name="Tools", name="tools", method="build_output"), ] - def create_tool_coroutine(self, tool_name: str) -> Callable[[dict], Awaitable]: - async def tool_coroutine(**kwargs): - return await self.client.session.call_tool(tool_name, arguments=kwargs) - - return tool_coroutine - - async def build_output(self) -> list[StructuredTool]: + async def build_output(self) -> list[Tool]: if self.client.session is None: self.tools = await self.client.connect_to_server(self.command) @@ -117,18 +111,13 @@ async def build_output(self) -> list[StructuredTool]: for tool in self.tools: args_schema = create_input_schema_from_json_schema(tool.inputSchema) - callbacks = self.get_langchain_callbacks() tool_list.append( - StructuredTool( - name=tool.name, # maybe format this + Tool( + name=tool.name, description=tool.description, - coroutine=self.create_tool_coroutine(tool.name), - args_schema=args_schema, - # args_schema=DataSchema, - handle_tool_error=True, - callbacks=callbacks, + coroutine=create_tool_coroutine(tool.name, args_schema, self.client.session), + func=create_tool_func(tool.name, args_schema, self.client.session), ) ) - self.tool_names = [tool.name for tool in self.tools] return tool_list From 86c83d7cef0fc7ddead9eb4f8cee1dc9a2f289de Mon Sep 17 00:00:00 2001 From: phact Date: Tue, 24 Dec 2024 23:26:35 -0500 Subject: [PATCH 19/27] ruff fixes --- src/backend/base/langflow/base/mcp/util.py | 11 +++++---- .../base/langflow/components/tools/mcp_sse.py | 23 +++++++++++-------- .../langflow/components/tools/mcp_stdio.py | 2 +- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/src/backend/base/langflow/base/mcp/util.py b/src/backend/base/langflow/base/mcp/util.py index ca508fcb61bf..454fd3e90c92 100644 --- a/src/backend/base/langflow/base/mcp/util.py +++ b/src/backend/base/langflow/base/mcp/util.py @@ -6,17 +6,20 @@ def create_tool_coroutine(tool_name: str, arg_schema: type[BaseModel], session) -> Callable[[dict], Awaitable]: async def tool_coroutine(*args): - assert len(args) > 0, f"at least one positional argument is required {args}" + if len(args) == 0: + msg = f"at least one positional argument is required {args}" + raise ValueError(msg) arg_dict = dict(zip(arg_schema.model_fields.keys(), args, strict=False)) return await session.call_tool(tool_name, arguments=arg_dict) return tool_coroutine -def create_tool_func(tool_name: str, arg_schema: type[BaseModel], session) -> [Callable[..., str]]: +def create_tool_func(tool_name: str, session) -> [Callable[..., str]]: def tool_func(**kwargs): - assert len(kwargs) > 0, f"at least one named argument is required {kwargs}" - + if len(kwargs) == 0: + msg = f"at least one named argument is required {kwargs}" + raise ValueError(msg) loop = asyncio.get_event_loop() return loop.run_until_complete(session.call_tool(tool_name, arguments=kwargs)) diff --git a/src/backend/base/langflow/components/tools/mcp_sse.py b/src/backend/base/langflow/components/tools/mcp_sse.py index ce3468e815c5..174750bde6fd 100644 --- a/src/backend/base/langflow/components/tools/mcp_sse.py +++ b/src/backend/base/langflow/components/tools/mcp_sse.py @@ -1,4 +1,5 @@ # from langflow.field_typing import Data +import asyncio from contextlib import AsyncExitStack import httpx @@ -35,20 +36,24 @@ async def pre_check_redirect(self, url: str): return url # Return the original URL if no redirect async def connect_to_server( - self, url: str, headers: dict[str, str] | None, timeout: int = 500, sse_read_timeout: int = 500 + self, url: str, headers: dict[str, str] | None, timeout_seconds: int = 500, sse_read_timeout_seconds: int = 500 ): if headers is None: headers = {} url = await self.pre_check_redirect(url) - sse_transport = await self.exit_stack.enter_async_context(sse_client(url, headers, timeout, sse_read_timeout)) - self.sse, self.write = sse_transport - self.session = await self.exit_stack.enter_async_context(ClientSession(self.sse, self.write)) - await self.session.initialize() + async with asyncio.timeout(timeout_seconds): + sse_transport = await self.exit_stack.enter_async_context( + sse_client(url, headers, timeout_seconds, sse_read_timeout_seconds) + ) + self.sse, self.write = sse_transport + self.session = await self.exit_stack.enter_async_context(ClientSession(self.sse, self.write)) + + await self.session.initialize() - # List available tools - response = await self.session.list_tools() - return response.tools + # List available tools + response = await self.session.list_tools() + return response.tools class MCPSse(Component): @@ -88,7 +93,7 @@ async def build_output(self) -> list[Tool]: name=tool.name, # maybe format this description=tool.description, coroutine=create_tool_coroutine(tool.name, args_schema, self.client.session), - func=create_tool_func(tool.name, args_schema, self.client.session), + func=create_tool_func(tool.name, self.client.session), ) ) diff --git a/src/backend/base/langflow/components/tools/mcp_stdio.py b/src/backend/base/langflow/components/tools/mcp_stdio.py index bb23d69625b2..f0a89e2b0b98 100644 --- a/src/backend/base/langflow/components/tools/mcp_stdio.py +++ b/src/backend/base/langflow/components/tools/mcp_stdio.py @@ -116,7 +116,7 @@ async def build_output(self) -> list[Tool]: name=tool.name, description=tool.description, coroutine=create_tool_coroutine(tool.name, args_schema, self.client.session), - func=create_tool_func(tool.name, args_schema, self.client.session), + func=create_tool_func(tool.name, args_schema), ) ) self.tool_names = [tool.name for tool in self.tools] From bf4bf4c423046783ddd34e39fe341f829f452a0b Mon Sep 17 00:00:00 2001 From: phact Date: Tue, 24 Dec 2024 23:43:47 -0500 Subject: [PATCH 20/27] ruff --- src/backend/base/langflow/api/v1/mcp.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index e8705df655ee..cfb817804968 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -3,6 +3,7 @@ import json import logging import traceback +from contextlib import suppress from contextvars import ContextVar from typing import Annotated from urllib.parse import quote, unquote, urlparse @@ -260,10 +261,8 @@ async def send_progress_updates(): return collected_results finally: progress_task.cancel() - try: + with suppress(asyncio.CancelledError): await progress_task - except asyncio.CancelledError: - pass except Exception as e: msg = f"Error in async session: {e}" logger.exception(msg) From 5cb1cb7e9e56bff73296473472d765ac83299b1b Mon Sep 17 00:00:00 2001 From: phact Date: Wed, 25 Dec 2024 08:03:21 -0500 Subject: [PATCH 21/27] validation optimization --- src/backend/base/langflow/api/v1/mcp.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/src/backend/base/langflow/api/v1/mcp.py b/src/backend/base/langflow/api/v1/mcp.py index cfb817804968..0fbf08ee4fe5 100644 --- a/src/backend/base/langflow/api/v1/mcp.py +++ b/src/backend/base/langflow/api/v1/mcp.py @@ -286,17 +286,11 @@ async def send_progress_updates(): def find_validation_error(exc): - """Recursively searches for a pydantic.ValidationError in nested exceptions.""" - if isinstance(exc, pydantic.ValidationError): - return exc - - # Check for nested exceptions in __cause__ or __context__ - if hasattr(exc, "__cause__") and exc.__cause__: - return find_validation_error(exc.__cause__) - if hasattr(exc, "__context__") and exc.__context__: - return find_validation_error(exc.__context__) - - # No validation error found + """Searches for a pydantic.ValidationError in the exception chain.""" + while exc: + if isinstance(exc, pydantic.ValidationError): + return exc + exc = getattr(exc, "__cause__", None) or getattr(exc, "__context__", None) return None From 8a64437849e50160669e9c082006e8c624d1bcb7 Mon Sep 17 00:00:00 2001 From: phact Date: Thu, 26 Dec 2024 18:44:16 -0500 Subject: [PATCH 22/27] fix frontend test --- src/frontend/tests/extended/features/notifications.spec.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/frontend/tests/extended/features/notifications.spec.ts b/src/frontend/tests/extended/features/notifications.spec.ts index 47fcbe88c5f0..6800253bdafb 100644 --- a/src/frontend/tests/extended/features/notifications.spec.ts +++ b/src/frontend/tests/extended/features/notifications.spec.ts @@ -22,6 +22,8 @@ test( .hover() .then(async () => { await page.getByTestId("add-component-button-chat-input").click(); + await page.getByTestId(/rf__node-ChatInput-/).click(); + await page.keyboard.press('Control+.'); await page.getByTestId("button_run_chat input").click(); }); From b23d35884bc82f7bc635dbd072d10f84d02f0d1e Mon Sep 17 00:00:00 2001 From: phact Date: Fri, 27 Dec 2024 08:37:00 -0500 Subject: [PATCH 23/27] another playwright fix --- .../tests/extended/regression/generalBugs-shard-3.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/frontend/tests/extended/regression/generalBugs-shard-3.spec.ts b/src/frontend/tests/extended/regression/generalBugs-shard-3.spec.ts index 3e5bda8aae5a..6b0780eacde3 100644 --- a/src/frontend/tests/extended/regression/generalBugs-shard-3.spec.ts +++ b/src/frontend/tests/extended/regression/generalBugs-shard-3.spec.ts @@ -90,7 +90,7 @@ test( } await page.locator(".react-flow__pane").click(); - + await adjustScreenView(page, { numberOfZoomOut: 1 }); await visibleElementHandle.hover(); await page.mouse.down(); From b225e5f8543ef06eeeef7ce97354c55547fee750 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebasti=C3=A1n=20Est=C3=A9vez?= Date: Wed, 1 Jan 2025 23:55:00 -0500 Subject: [PATCH 24/27] Update src/frontend/tests/extended/features/notifications.spec.ts Co-authored-by: Gabriel Luiz Freitas Almeida --- src/frontend/tests/extended/features/notifications.spec.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/frontend/tests/extended/features/notifications.spec.ts b/src/frontend/tests/extended/features/notifications.spec.ts index 397ecabbdf0f..8bde6ad07135 100644 --- a/src/frontend/tests/extended/features/notifications.spec.ts +++ b/src/frontend/tests/extended/features/notifications.spec.ts @@ -21,10 +21,8 @@ test( .getByTestId("inputsText Input") .hover() .then(async () => { - await page.getByTestId("add-component-button-chat-input").click(); - await page.getByTestId(/rf__node-ChatInput-/).click(); - await page.keyboard.press('Control+.'); - await page.getByTestId("button_run_chat input").click(); + await page.getByTestId("add-component-button-text-input").click(); + await page.getByTestId("button_run_text input").click(); }); await page.waitForSelector("text=built successfully", { timeout: 30000 }); From 3287eac66b46e0c4db3f507feff9ef2288d2415a Mon Sep 17 00:00:00 2001 From: phact Date: Wed, 1 Jan 2025 23:57:12 -0500 Subject: [PATCH 25/27] mcp component descriptions --- src/backend/base/langflow/components/tools/mcp_sse.py | 5 +---- src/backend/base/langflow/components/tools/mcp_stdio.py | 7 +++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/backend/base/langflow/components/tools/mcp_sse.py b/src/backend/base/langflow/components/tools/mcp_sse.py index 174750bde6fd..489a7349b042 100644 --- a/src/backend/base/langflow/components/tools/mcp_sse.py +++ b/src/backend/base/langflow/components/tools/mcp_sse.py @@ -3,7 +3,6 @@ from contextlib import AsyncExitStack import httpx -from dotenv import load_dotenv from mcp import ClientSession from mcp.client.sse import sse_client @@ -13,8 +12,6 @@ from langflow.field_typing import Tool from langflow.io import MessageTextInput, Output -load_dotenv() # load environment variables from .env - # Define constant for status code HTTP_TEMPORARY_REDIRECT = 307 @@ -61,7 +58,7 @@ class MCPSse(Component): tools = None tool_names = [] display_name = "MCP Tools (SSE)" - description = "Use as a template to create your own component." + description = "Connects to an MCP server over SSE and exposes it's tools as langflow tools to be used by an Agent." documentation: str = "http://docs.langflow.org/components/custom" icon = "code" name = "MCPSse" diff --git a/src/backend/base/langflow/components/tools/mcp_stdio.py b/src/backend/base/langflow/components/tools/mcp_stdio.py index f0a89e2b0b98..4abd5c18a142 100644 --- a/src/backend/base/langflow/components/tools/mcp_stdio.py +++ b/src/backend/base/langflow/components/tools/mcp_stdio.py @@ -3,7 +3,6 @@ from contextlib import AsyncExitStack from typing import Any -from dotenv import load_dotenv from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client from pydantic import BaseModel, Field, create_model @@ -13,8 +12,6 @@ from langflow.field_typing import Tool from langflow.io import MessageTextInput, Output -load_dotenv() # load environment variables from .env - class MCPStdioClient: def __init__(self): @@ -84,7 +81,9 @@ class MCPStdio(Component): tools = None tool_names = [] display_name = "MCP Tools (stdio)" - description = "Use as a template to create your own component." + description = ( + "Connects to an MCP server over stdio and exposes it's tools as langflow tools to be used by an Agent." + ) documentation: str = "http://docs.langflow.org/components/custom" icon = "code" name = "MCPStdio" From 51b54ec1a2934c868579c3d722cadd5c1a23f84b Mon Sep 17 00:00:00 2001 From: phact Date: Thu, 2 Jan 2025 08:48:52 -0500 Subject: [PATCH 26/27] mypy fixes --- src/backend/base/langflow/base/mcp/util.py | 2 +- src/backend/base/langflow/components/tools/mcp_sse.py | 6 +++--- src/backend/base/langflow/components/tools/mcp_stdio.py | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/backend/base/langflow/base/mcp/util.py b/src/backend/base/langflow/base/mcp/util.py index 454fd3e90c92..e4b867b0d34e 100644 --- a/src/backend/base/langflow/base/mcp/util.py +++ b/src/backend/base/langflow/base/mcp/util.py @@ -15,7 +15,7 @@ async def tool_coroutine(*args): return tool_coroutine -def create_tool_func(tool_name: str, session) -> [Callable[..., str]]: +def create_tool_func(tool_name: str, session) -> Callable[..., str]: def tool_func(**kwargs): if len(kwargs) == 0: msg = f"at least one named argument is required {kwargs}" diff --git a/src/backend/base/langflow/components/tools/mcp_sse.py b/src/backend/base/langflow/components/tools/mcp_sse.py index 489a7349b042..876c05c7d736 100644 --- a/src/backend/base/langflow/components/tools/mcp_sse.py +++ b/src/backend/base/langflow/components/tools/mcp_sse.py @@ -3,7 +3,7 @@ from contextlib import AsyncExitStack import httpx -from mcp import ClientSession +from mcp import ClientSession, types from mcp.client.sse import sse_client from langflow.base.mcp.util import create_tool_coroutine, create_tool_func @@ -55,8 +55,8 @@ async def connect_to_server( class MCPSse(Component): client = MCPSseClient() - tools = None - tool_names = [] + tools = types.ListToolsResult + tool_names = [str] display_name = "MCP Tools (SSE)" description = "Connects to an MCP server over SSE and exposes it's tools as langflow tools to be used by an Agent." documentation: str = "http://docs.langflow.org/components/custom" diff --git a/src/backend/base/langflow/components/tools/mcp_stdio.py b/src/backend/base/langflow/components/tools/mcp_stdio.py index 4abd5c18a142..a6ca98be838b 100644 --- a/src/backend/base/langflow/components/tools/mcp_stdio.py +++ b/src/backend/base/langflow/components/tools/mcp_stdio.py @@ -3,7 +3,7 @@ from contextlib import AsyncExitStack from typing import Any -from mcp import ClientSession, StdioServerParameters +from mcp import ClientSession, StdioServerParameters, types from mcp.client.stdio import stdio_client from pydantic import BaseModel, Field, create_model @@ -78,8 +78,8 @@ def create_input_schema_from_json_schema(schema: dict[str, Any]) -> type[BaseMod class MCPStdio(Component): client = MCPStdioClient() - tools = None - tool_names = [] + tools = types.ListToolsResult + tool_names = [str] display_name = "MCP Tools (stdio)" description = ( "Connects to an MCP server over stdio and exposes it's tools as langflow tools to be used by an Agent." From 83448e601e690a72d635e30cf7de6dfdc95e61cf Mon Sep 17 00:00:00 2001 From: phact Date: Thu, 2 Jan 2025 23:37:37 -0500 Subject: [PATCH 27/27] fix setup_database_url test --- src/backend/tests/performance/test_server_init.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/backend/tests/performance/test_server_init.py b/src/backend/tests/performance/test_server_init.py index 858e9c8c60d5..42f7e15a340d 100644 --- a/src/backend/tests/performance/test_server_init.py +++ b/src/backend/tests/performance/test_server_init.py @@ -1,3 +1,5 @@ +import os + import pytest from langflow.services.deps import get_settings_service @@ -5,14 +7,18 @@ @pytest.fixture(autouse=True) def setup_database_url(tmp_path, monkeypatch): """Setup a temporary database URL for testing.""" + settings_service = get_settings_service() db_path = tmp_path / "test_performance.db" - original_value = monkeypatch.delenv("LANGFLOW_DATABASE_URL", raising=False) + original_value = os.getenv("LANGFLOW_DATABASE_URL") + monkeypatch.delenv("LANGFLOW_DATABASE_URL", raising=False) test_db_url = f"sqlite:///{db_path}" monkeypatch.setenv("LANGFLOW_DATABASE_URL", test_db_url) + settings_service.set("database_url", test_db_url) yield # Restore original value if it existed if original_value is not None: monkeypatch.setenv("LANGFLOW_DATABASE_URL", original_value) + settings_service.set("database_url", original_value) else: monkeypatch.delenv("LANGFLOW_DATABASE_URL", raising=False)