Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat: add anthropic mcp endpoint #5148

Merged
merged 42 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
1d3f665
mcp WIP
phact Dec 7, 2024
1d4dd04
Merge branch 'main' into mcp
phact Dec 7, 2024
26dbf6f
[autofix.ci] apply automated fixes
autofix-ci[bot] Dec 7, 2024
eb5e41f
logging and flow user check
phact Dec 11, 2024
dcff1ef
merge
phact Dec 11, 2024
c8c0840
mcp stdio client component
phact Dec 12, 2024
9044abc
handle disconnect better
phact Dec 12, 2024
55c86f9
initialization
phact Dec 12, 2024
8e4525a
merge main
phact Dec 12, 2024
f30b8da
session fix and type fix
phact Dec 12, 2024
e7d51ff
[autofix.ci] apply automated fixes
autofix-ci[bot] Dec 12, 2024
70218c2
merge
phact Dec 16, 2024
20bf6df
defensive against mcp server bugs
phact Dec 17, 2024
cf642ed
Merge branch 'mcp' of github.com:logspace-ai/langflow into mcp
phact Dec 17, 2024
2167e78
[autofix.ci] apply automated fixes
autofix-ci[bot] Dec 17, 2024
738de89
notifications and sse component
phact Dec 18, 2024
f85f8f0
enabled flags and resource support
phact Dec 19, 2024
d05e925
Merge branch 'mcp' of github.com:logspace-ai/langflow into mcp
phact Dec 19, 2024
bd24663
remove unneeded print
phact Dec 19, 2024
70e8a16
extract json schema util
phact Dec 19, 2024
243e161
merge
phact Dec 19, 2024
701c620
[autofix.ci] apply automated fixes
autofix-ci[bot] Dec 19, 2024
f618688
ruff
phact Dec 19, 2024
21effbb
fix tools [] bug and db asysnc session api change
phact Dec 19, 2024
6b57202
merge
phact Dec 19, 2024
72253ba
Tool instead of StructuredTool
phact Dec 23, 2024
58b8cef
catch up to main
phact Dec 23, 2024
86c83d7
ruff fixes
phact Dec 25, 2024
bf4bf4c
ruff
phact Dec 25, 2024
d03fce9
merge main
phact Dec 25, 2024
5cb1cb7
validation optimization
phact Dec 25, 2024
8a64437
fix frontend test
phact Dec 26, 2024
b23d358
another playwright fix
phact Dec 27, 2024
0f26793
Merge branch 'main' into mcp
phact Dec 27, 2024
aaa2ebe
Merge branch 'main' into mcp
phact Dec 29, 2024
b225e5f
Update src/frontend/tests/extended/features/notifications.spec.ts
phact Jan 2, 2025
3287eac
mcp component descriptions
phact Jan 2, 2025
51b54ec
mypy fixes
phact Jan 2, 2025
f5428b7
Merge branch 'main' into mcp
phact Jan 2, 2025
23d0566
Merge branch 'main' into mcp
phact Jan 3, 2025
83448e6
fix setup_database_url test
phact Jan 3, 2025
7c8088f
Merge branch 'mcp' of github.com:logspace-ai/langflow into mcp
phact Jan 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ dependencies = [
"aiofile>=3.9.0,<4.0.0",
"sseclient-py==1.8.0",
"arize-phoenix-otel>=0.6.1",
"openinference-instrumentation-langchain==0.1.29",
"openinference-instrumentation-langchain>=0.1.29",
"crewai~=0.86.0",
"mcp>=0.9.1",
"uv>=0.5.7",
"ag2",
"pydantic-ai>=0.0.12",
]
Expand Down
5 changes: 5 additions & 0 deletions src/backend/base/langflow/api/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
flows_router,
folders_router,
login_router,
mcp_router,
monitor_router,
starter_projects_router,
store_router,
users_router,
validate_router,
variables_router,
)
from langflow.services.deps import get_settings_service

router = APIRouter(
prefix="/api/v1",
Expand All @@ -33,3 +35,6 @@
router.include_router(monitor_router)
router.include_router(folders_router)
router.include_router(starter_projects_router)

if get_settings_service().settings.mcp_server_enabled:
router.include_router(mcp_router)
2 changes: 2 additions & 0 deletions src/backend/base/langflow/api/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from langflow.api.v1.flows import router as flows_router
from langflow.api.v1.folders import router as folders_router
from langflow.api.v1.login import router as login_router
from langflow.api.v1.mcp import router as mcp_router
from langflow.api.v1.monitor import router as monitor_router
from langflow.api.v1.starter_projects import router as starter_projects_router
from langflow.api.v1.store import router as store_router
Expand All @@ -20,6 +21,7 @@
"flows_router",
"folders_router",
"login_router",
"mcp_router",
"monitor_router",
"starter_projects_router",
"store_router",
Expand Down
343 changes: 343 additions & 0 deletions src/backend/base/langflow/api/v1/mcp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,343 @@
import asyncio
import base64
import json
import logging
import traceback
from contextlib import suppress
from contextvars import ContextVar
from typing import Annotated
from urllib.parse import quote, unquote, urlparse
from uuid import UUID, uuid4

import pydantic
from anyio import BrokenResourceError
from fastapi import APIRouter, Depends, Request
from fastapi.responses import StreamingResponse
from mcp import types
from mcp.server import NotificationOptions, Server
from mcp.server.sse import SseServerTransport
from sqlmodel import select
from starlette.background import BackgroundTasks

from langflow.api.v1.chat import build_flow
from langflow.api.v1.schemas import InputValueRequest
from langflow.helpers.flow import json_schema_from_flow
from langflow.services.auth.utils import get_current_active_user
from langflow.services.database.models import Flow, User
from langflow.services.deps import get_db_service, get_session, get_settings_service, get_storage_service
from langflow.services.storage.utils import build_content_type_from_extension

logger = logging.getLogger(__name__)
if False:
logger.setLevel(logging.DEBUG)
if not logger.handlers:
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("[%(asctime)s][%(levelname)s] %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)

# Enable debug logging for MCP package
mcp_logger = logging.getLogger("mcp")
mcp_logger.setLevel(logging.DEBUG)
if not mcp_logger.handlers:
mcp_logger.addHandler(handler)

logger.debug("MCP module loaded - debug logging enabled")

enable_progress_notifications = get_settings_service().settings.mcp_server_enable_progress_notifications

router = APIRouter(prefix="/mcp", tags=["mcp"])

server = Server("langflow-mcp-server")

# Create a context variable to store the current user
current_user_ctx: ContextVar[User] = ContextVar("current_user_ctx")

# Define constants
MAX_RETRIES = 2


@server.list_prompts()
async def handle_list_prompts():
return []


@server.list_resources()
async def handle_list_resources():
resources = []
try:
session = await anext(get_session())
storage_service = get_storage_service()
settings_service = get_settings_service()

# Build full URL from settings
host = getattr(settings_service.settings, "holst", "localhost")
port = getattr(settings_service.settings, "port", 3000)

base_url = f"http://{host}:{port}".rstrip("/")

flows = (await session.exec(select(Flow))).all()

for flow in flows:
if flow.id:
try:
files = await storage_service.list_files(flow_id=str(flow.id))
for file_name in files:
# URL encode the filename
safe_filename = quote(file_name)
resource = types.Resource(
uri=f"{base_url}/api/v1/files/{flow.id}/{safe_filename}",
name=file_name,
description=f"File in flow: {flow.name}",
mimeType=build_content_type_from_extension(file_name),
)
resources.append(resource)
except FileNotFoundError as e:
msg = f"Error listing files for flow {flow.id}: {e}"
logger.debug(msg)
continue
except Exception as e:
msg = f"Error in listing resources: {e!s}"
logger.exception(msg)
trace = traceback.format_exc()
logger.exception(trace)
raise
return resources


@server.read_resource()
async def handle_read_resource(uri: str) -> bytes:
"""Handle resource read requests."""
try:
# Parse the URI properly
parsed_uri = urlparse(str(uri))
# Path will be like /api/v1/files/{flow_id}/{filename}
path_parts = parsed_uri.path.split("/")
# Remove empty strings from split
path_parts = [p for p in path_parts if p]

# The flow_id and filename should be the last two parts
two = 2
if len(path_parts) < two:
msg = f"Invalid URI format: {uri}"
raise ValueError(msg)

flow_id = path_parts[-2]
filename = unquote(path_parts[-1]) # URL decode the filename

storage_service = get_storage_service()

# Read the file content
content = await storage_service.get_file(flow_id=flow_id, file_name=filename)
if not content:
msg = f"File {filename} not found in flow {flow_id}"
raise ValueError(msg)

# Ensure content is base64 encoded
if isinstance(content, str):
content = content.encode()
return base64.b64encode(content)
except Exception as e:
msg = f"Error reading resource {uri}: {e!s}"
logger.exception(msg)
trace = traceback.format_exc()
logger.exception(trace)
raise


@server.list_tools()
async def handle_list_tools():
tools = []
try:
session = await anext(get_session())
flows = (await session.exec(select(Flow))).all()

for flow in flows:
if flow.user_id is None:
continue

tool = types.Tool(
name=str(flow.id), # Use flow.id instead of name
description=f"{flow.name}: {flow.description}"
if flow.description
else f"Tool generated from flow: {flow.name}",
inputSchema=json_schema_from_flow(flow),
)
tools.append(tool)
except Exception as e:
msg = f"Error in listing tools: {e!s}"
logger.exception(msg)
trace = traceback.format_exc()
logger.exception(trace)
raise
return tools


@server.call_tool()
async def handle_call_tool(name: str, arguments: dict) -> list[types.TextContent]:
"""Handle tool execution requests."""
try:
session = await anext(get_session())
background_tasks = BackgroundTasks()

current_user = current_user_ctx.get()
flow = (await session.exec(select(Flow).where(Flow.id == UUID(name)))).first()

if not flow:
msg = f"Flow with id '{name}' not found"
raise ValueError(msg)

# Process inputs
processed_inputs = dict(arguments)

# Initial progress notification
if enable_progress_notifications and (progress_token := server.request_context.meta.progressToken):
await server.request_context.session.send_progress_notification(
progress_token=progress_token, progress=0.0, total=1.0
)

conversation_id = str(uuid4())
input_request = InputValueRequest(
input_value=processed_inputs.get("input_value", ""), components=[], type="chat", session=conversation_id
)

async def send_progress_updates():
if not (enable_progress_notifications and server.request_context.meta.progressToken):
return

try:
progress = 0.0
while True:
await server.request_context.session.send_progress_notification(
progress_token=progress_token, progress=min(0.9, progress), total=1.0
)
progress += 0.1
await asyncio.sleep(1.0)
except asyncio.CancelledError:
# Send final 100% progress
if enable_progress_notifications:
await server.request_context.session.send_progress_notification(
progress_token=progress_token, progress=1.0, total=1.0
)
raise

db_service = get_db_service()
collected_results = []
async with db_service.with_session() as async_session:
try:
progress_task = asyncio.create_task(send_progress_updates())

try:
response = await build_flow(
flow_id=UUID(name),
inputs=input_request,
background_tasks=background_tasks,
current_user=current_user,
session=async_session,
)

async for line in response.body_iterator:
if not line:
continue
try:
event_data = json.loads(line)
if event_data.get("event") == "end_vertex":
message = (
event_data.get("data", {})
.get("build_data", {})
.get("data", {})
.get("results", {})
.get("message", {})
.get("text", "")
)
if message:
collected_results.append(types.TextContent(type="text", text=str(message)))
except json.JSONDecodeError:
msg = f"Failed to parse event data: {line}"
logger.warning(msg)
continue

return collected_results
finally:
progress_task.cancel()
with suppress(asyncio.CancelledError):
await progress_task
except Exception as e:
msg = f"Error in async session: {e}"
logger.exception(msg)
raise

except Exception as e:
context = server.request_context
# Send error progress if there's an exception
if enable_progress_notifications and (progress_token := context.meta.progressToken):
await server.request_context.session.send_progress_notification(
progress_token=progress_token, progress=1.0, total=1.0
)
msg = f"Error executing tool {name}: {e!s}"
logger.exception(msg)
trace = traceback.format_exc()
logger.exception(trace)
raise


sse = SseServerTransport("/api/v1/mcp/")


def find_validation_error(exc):
"""Searches for a pydantic.ValidationError in the exception chain."""
while exc:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while exc:
fields = {"__cause__", "__context__"}
for field in fields:
exc = getattr(exc, field, None)
if exc:
break

if isinstance(exc, pydantic.ValidationError):
return exc
exc = getattr(exc, "__cause__", None) or getattr(exc, "__context__", None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
exc = getattr(exc, "__cause__", None) or getattr(exc, "__context__", None)
exc = getattr(exc, "__cause__", None)
if exc is None:
exc = getattr(exc, "__context__", None)

return None


@router.get("/sse", response_class=StreamingResponse)
async def handle_sse(request: Request, current_user: Annotated[User, Depends(get_current_active_user)]):
token = current_user_ctx.set(current_user)
try:
async with sse.connect_sse(request.scope, request.receive, request._send) as streams:
try:
msg = "Starting SSE connection"
logger.debug(msg)
msg = f"Stream types: read={type(streams[0])}, write={type(streams[1])}"
logger.debug(msg)

notification_options = NotificationOptions(
prompts_changed=True, resources_changed=True, tools_changed=True
)
init_options = server.create_initialization_options(notification_options)
msg = f"Initialization options: {init_options}"
logger.debug(msg)

try:
await server.run(streams[0], streams[1], init_options)
except Exception as exc: # noqa: BLE001
validation_error = find_validation_error(exc)
if validation_error:
msg = "Validation error in MCP:" + str(validation_error)
logger.debug(msg)
else:
msg = f"Error in MCP: {exc!s}"
logger.debug(msg)
return
except BrokenResourceError:
# Handle gracefully when client disconnects
logger.info("Client disconnected from SSE connection")
except asyncio.CancelledError:
logger.info("SSE connection was cancelled")
except Exception as e:
msg = f"Error in MCP: {e!s}"
logger.exception(msg)
trace = traceback.format_exc()
logger.exception(trace)
raise
finally:
current_user_ctx.reset(token)


@router.post("/")
async def handle_messages(request: Request):
await sse.handle_post_message(request.scope, request.receive, request._send)
Empty file.
Loading
Loading