Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add MCP API handler, SSE routes, and Redis-based request routing #1

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -39,3 +39,4 @@ yarn-error.log*
# typescript
*.tsbuildinfo
next-env.d.ts
.env*.local
76 changes: 76 additions & 0 deletions app/message/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { NextRequest } from "next/server";
import { createClient } from "redis";

// Configure route to be dynamic
export const dynamic = 'force-dynamic';

// Handler for POST requests to send messages to SSE clients
export async function POST(request: NextRequest) {
console.log("Message POST received");

try {
// Get the message data
const data = await request.json();
console.log("Message data:", data);

// Validate required fields
if (!data.sessionId) {
console.error("Missing sessionId in message request");
return new Response(JSON.stringify({ error: "Missing sessionId" }), {
status: 400,
headers: { "Content-Type": "application/json" }
});
}

// Get Redis URL from environment
const redisUrl = process.env.REDIS_URL || process.env.KV_URL;
if (!redisUrl) {
console.error("No Redis URL available for message endpoint");
return new Response(JSON.stringify({ error: "Server configuration error" }), {
status: 500,
headers: { "Content-Type": "application/json" }
});
}

// Create Redis client
console.log("Creating Redis client for message endpoint");
const redisPublisher = createClient({ url: redisUrl });

// Connect to Redis
console.log("Connecting to Redis for message endpoint");
await redisPublisher.connect();
console.log("Redis connected for message endpoint");

// Publish message to the events channel for the session
const sessionId = data.sessionId;
console.log(`Publishing message to events:${sessionId}`);

// Create the event message
const eventMessage = JSON.stringify({
type: data.type || "message",
data: data.data || data,
timestamp: Date.now()
});

// Publish to Redis
await redisPublisher.publish(`events:${sessionId}`, eventMessage);
console.log(`Message published to events:${sessionId}`);

// Disconnect from Redis
await redisPublisher.disconnect();
console.log("Redis disconnected after message publish");

// Return success response
return new Response(JSON.stringify({ success: true }), {
status: 200,
headers: { "Content-Type": "application/json" }
});

} catch (error) {
console.error("Error in message endpoint:", error);
return new Response(JSON.stringify({ error: String(error) }), {
status: 500,
headers: { "Content-Type": "application/json" }
});
}
}
177 changes: 177 additions & 0 deletions app/sse/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import { NextRequest } from "next/server";
import { nextApiHandler } from "@/lib/mcp-handler";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { createClient } from "redis";
import crypto from "crypto";

// Configure route to handle streaming responses
export const dynamic = 'force-dynamic';
export const fetchCache = 'force-no-store';

export async function POST(request: NextRequest) {
console.log("POST request received");
return nextApiHandler(request);
}

export async function GET(request: NextRequest) {
console.log("GET SSE request received");

// If this is an SSE request, handle it with direct streaming
if (request.headers.get("accept") === "text/event-stream") {
return createSseStream(request);
}

// Otherwise use the regular handler
return nextApiHandler(request);
}

async function createSseStream(request: NextRequest) {
console.log("Creating direct SSE stream");

const encoder = new TextEncoder();
const sessionId = crypto.randomUUID();
console.log(`Created session ID for direct SSE stream: ${sessionId}`);

// Get Redis URL from environment
const redisUrl = process.env.REDIS_URL || process.env.KV_URL;
if (!redisUrl) {
console.error("No Redis URL available for SSE stream");
return new Response("Configuration error: No Redis connection available", { status: 500 });
}

// Create Redis clients
let redisSubscriber;
let redisPublisher;

try {
console.log("Creating Redis clients for SSE stream");
redisSubscriber = createClient({ url: redisUrl });
redisPublisher = createClient({ url: redisUrl });

// Connect to Redis
console.log("Connecting to Redis for SSE stream");
await Promise.all([
redisSubscriber.connect(),
redisPublisher.connect()
]);
console.log("Redis connected for SSE stream");
} catch (error) {
console.error("Failed to connect to Redis for SSE:", error);
return new Response("Failed to establish server connection", { status: 500 });
}

// Create a stream with a custom controller we can write to
const stream = new ReadableStream({
start(controller) {
console.log("SSE stream started");

// Function to send SSE formatted messages
const sendMessage = (data: string) => {
controller.enqueue(encoder.encode(`data: ${data}\n\n`));
};

// Send initial connection message with session ID
sendMessage(JSON.stringify({
type: "connection",
status: "established",
sessionId
}));

// Set up Redis subscriber
const setupRedisSubscription = async () => {
try {
console.log(`Subscribing to Redis channel events:${sessionId}`);

// Subscribe to events channel for this session
await redisSubscriber.subscribe(`events:${sessionId}`, (message) => {
console.log(`Received event for session ${sessionId}:`, message);
sendMessage(message);
});

// Publish the connection event
console.log(`Publishing connection event for session ${sessionId}`);
await redisPublisher.publish(
`session:${sessionId}`,
JSON.stringify({
type: "client_connected",
sessionId,
timestamp: Date.now()
})
);

console.log(`Redis subscription established for session ${sessionId}`);
} catch (error) {
console.error(`Redis subscription error for session ${sessionId}:`, error);
sendMessage(JSON.stringify({
type: "error",
message: "Failed to establish subscription",
error: String(error)
}));
}
};

// Set up the subscription
setupRedisSubscription();

// Set up keep-alive interval
const keepAlive = setInterval(() => {
sendMessage(JSON.stringify({ type: "ping", timestamp: Date.now() }));
}, 30000);

// Store references in request object for cleanup
(request as any).sseCleanup = async () => {
console.log(`Cleaning up SSE resources for session ${sessionId}`);
clearInterval(keepAlive);

try {
// Unsubscribe and publish disconnection event
console.log(`Unsubscribing from events:${sessionId}`);
await redisSubscriber.unsubscribe(`events:${sessionId}`);

console.log(`Publishing disconnection event for session ${sessionId}`);
await redisPublisher.publish(
`session:${sessionId}`,
JSON.stringify({
type: "client_disconnected",
sessionId,
timestamp: Date.now()
})
);

// Disconnect Redis clients
await Promise.all([
redisSubscriber.disconnect(),
redisPublisher.disconnect()
]);
console.log(`Redis clients disconnected for session ${sessionId}`);
} catch (cleanupError) {
console.error(`Error during SSE cleanup for session ${sessionId}:`, cleanupError);
}
};

// Handle stream closing
request.signal.addEventListener("abort", () => {
console.log(`SSE request aborted for session ${sessionId}`);
(request as any).sseCleanup();
});
},
async cancel() {
console.log(`SSE stream cancelled for session ${sessionId}`);
if ((request as any).sseCleanup) {
await (request as any).sseCleanup();
}
}
});

console.log("SSE stream created");

// Return a Response with the appropriate headers for SSE
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
});
}
Loading