Skip to content

Commit 25caee0

Browse files
committed
refactor: improve async handling and response management in StreamableHTTPServerTransport
1 parent afb99f1 commit 25caee0

File tree

1 file changed

+133
-139
lines changed

1 file changed

+133
-139
lines changed

src/transports/streamable-http.ts

Lines changed: 133 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
* StreamableHTTPServerTransport - MCP transport over HTTP
33
*
44
* Implements the MCP Server transport interface using HTTP instead of stdio.
5-
* Supports multiple clients connecting via HTTP POST requests.
5+
* Properly handles the async nature of MCP over synchronous HTTP.
66
*/
77

88
import { createServer, IncomingMessage, ServerResponse } from 'http';
@@ -12,81 +12,32 @@ import { debugLog } from '../utils/debug.js';
1212
interface HTTPTransportOptions {
1313
port?: number;
1414
host?: string;
15-
}
16-
17-
/**
18-
* Simple request-scoped response buffer for storing responses
19-
* Each HTTP request gets its own buffer to collect responses
20-
*/
21-
class ResponseBuffer {
22-
private responses: JSONRPCMessage[] = [];
23-
24-
add(message: JSONRPCMessage): void {
25-
this.responses.push(message);
26-
}
27-
28-
getAll(): JSONRPCMessage[] {
29-
return this.responses;
30-
}
31-
}
32-
33-
// Thread-local style buffer (request-scoped)
34-
let currentBuffer: ResponseBuffer | null = null;
35-
36-
/**
37-
* Helper functions for HTTP responses
38-
*/
39-
function sendJSON(res: ServerResponse, statusCode: number, data: any): void {
40-
res.writeHead(statusCode, { 'Content-Type': 'application/json' });
41-
res.end(JSON.stringify(data));
42-
}
43-
44-
function sendError(res: ServerResponse, statusCode: number, code: number, message: string, id: any = null): void {
45-
sendJSON(res, statusCode, {
46-
jsonrpc: '2.0',
47-
error: { code, message },
48-
id
49-
});
50-
}
51-
52-
function sendSuccess(res: ServerResponse, data: any): void {
53-
sendJSON(res, 200, data);
54-
}
55-
56-
function sendNotFound(res: ServerResponse): void {
57-
sendJSON(res, 404, { error: 'Not found' });
58-
}
59-
60-
function sendPayloadTooLarge(res: ServerResponse): void {
61-
sendJSON(res, 413, {
62-
error: 'Payload too large',
63-
code: -32600
64-
});
65-
}
66-
67-
function sendAccepted(res: ServerResponse): void {
68-
res.writeHead(202);
69-
res.end();
15+
timeout?: number; // Request timeout in ms
7016
}
7117

7218
export class StreamableHTTPServerTransport {
7319
private port: number;
7420
private host: string;
21+
private timeout: number;
7522
private httpServer: any = null;
76-
77-
// Transport interface properties (standard MCP interface)
23+
24+
// Map request IDs to their response handlers
25+
private pendingResponses = new Map<string | number, {
26+
resolve: (message: JSONRPCMessage) => void;
27+
timer: NodeJS.Timeout;
28+
}>();
29+
30+
// Transport interface properties
7831
onmessage?: (message: JSONRPCMessage) => void;
7932
onclose?: () => void;
8033
onerror?: (error: Error) => void;
8134

8235
constructor(options: HTTPTransportOptions = {}) {
8336
this.port = options.port || 3000;
8437
this.host = options.host || 'localhost';
38+
this.timeout = options.timeout || 30000; // 30 second default
8539
}
8640

87-
/**
88-
* Start the HTTP server
89-
*/
9041
async start(): Promise<void> {
9142
if (this.httpServer) {
9243
throw new Error('Server already started');
@@ -113,10 +64,18 @@ export class StreamableHTTPServerTransport {
11364
});
11465
}
11566

116-
/**
117-
* Close the HTTP server
118-
*/
11967
async close(): Promise<void> {
68+
// Clear all pending responses
69+
for (const [id, pending] of this.pendingResponses) {
70+
clearTimeout(pending.timer);
71+
pending.resolve({
72+
jsonrpc: '2.0',
73+
error: { code: -32603, message: 'Server shutting down' },
74+
id
75+
});
76+
}
77+
this.pendingResponses.clear();
78+
12079
return new Promise((resolve) => {
12180
if (this.httpServer) {
12281
this.httpServer.close(() => {
@@ -130,23 +89,25 @@ export class StreamableHTTPServerTransport {
13089
}
13190

13291
/**
133-
* Send a message to the client
134-
* In HTTP mode, responses are buffered and sent in the response body
92+
* Send a message (response) back to the waiting HTTP request
13593
*/
13694
async send(message: JSONRPCMessage): Promise<void> {
13795
debugLog('[HTTP Transport] Sending message:', message);
13896

139-
// If we're in a request context, buffer the response
140-
if (currentBuffer) {
141-
currentBuffer.add(message);
142-
} else {
143-
debugLog('[HTTP Transport] Warning: send() called outside of request context');
97+
// Check if this is a response to a pending request
98+
if ('id' in message && message.id !== null) {
99+
const pending = this.pendingResponses.get(message.id);
100+
if (pending) {
101+
clearTimeout(pending.timer);
102+
this.pendingResponses.delete(message.id);
103+
pending.resolve(message);
104+
} else {
105+
debugLog('[HTTP Transport] No pending request for response:', message.id);
106+
}
144107
}
108+
// Notifications don't need handling in HTTP context
145109
}
146110

147-
/**
148-
* Handle incoming HTTP requests
149-
*/
150111
private async handleRequest(req: IncomingMessage, res: ServerResponse): Promise<void> {
151112
// Add CORS headers
152113
res.setHeader('Access-Control-Allow-Origin', '*');
@@ -162,7 +123,7 @@ export class StreamableHTTPServerTransport {
162123

163124
// Health check endpoint
164125
if (req.url === '/health' && req.method === 'GET') {
165-
sendSuccess(res, { status: 'ok', transport: 'http' });
126+
this.sendJSON(res, 200, { status: 'ok', transport: 'http' });
166127
return;
167128
}
168129

@@ -173,80 +134,113 @@ export class StreamableHTTPServerTransport {
173134
}
174135

175136
// Unknown endpoint
176-
sendNotFound(res);
137+
this.sendJSON(res, 404, { error: 'Not found' });
177138
}
178139

179-
/**
180-
* Handle MCP protocol POST requests
181-
*/
182140
private async handleMCPRequest(req: IncomingMessage, res: ServerResponse): Promise<void> {
183-
let body = '';
141+
try {
142+
const body = await this.readBody(req);
143+
const request = this.parseRequest(body);
144+
145+
if (!request) {
146+
this.sendError(res, 400, -32700, 'Parse error');
147+
return;
148+
}
184149

185-
// Read request body
186-
req.on('data', (chunk: Buffer) => {
187-
body += chunk.toString('utf-8');
150+
debugLog('[HTTP Transport] Received request:', request);
188151

189-
// Prevent body from growing too large
190-
if (body.length > 1024 * 1024) {
191-
req.removeAllListeners('data');
192-
sendPayloadTooLarge(res);
152+
// Handle based on whether it's a request or notification
153+
if ('id' in request && request.id !== null) {
154+
// Request - expects a response
155+
const response = await this.waitForResponse(request);
156+
this.sendJSON(res, 200, response);
157+
} else {
158+
// Notification - no response expected
159+
if (this.onmessage) {
160+
this.onmessage(request);
193161
}
194-
});
162+
res.writeHead(202);
163+
res.end();
164+
}
165+
} catch (error: any) {
166+
debugLog('[HTTP Transport] Request error:', error);
167+
if (!res.headersSent) {
168+
this.sendError(res, 500, -32603, error.message || 'Internal server error');
169+
}
170+
}
171+
}
172+
173+
private async waitForResponse(request: JSONRPCMessage): Promise<JSONRPCMessage> {
174+
return new Promise((resolve) => {
175+
// Type narrowing - we know this request has an id because we checked before calling this method
176+
if (!('id' in request) || request.id === null || request.id === undefined) {
177+
resolve({
178+
jsonrpc: '2.0',
179+
error: { code: -32600, message: 'Invalid request - missing id' }
180+
} as JSONRPCMessage);
181+
return;
182+
}
183+
184+
const id = request.id;
185+
186+
// Set up timeout
187+
const timer = setTimeout(() => {
188+
this.pendingResponses.delete(id);
189+
resolve({
190+
jsonrpc: '2.0',
191+
error: { code: -32603, message: 'Request timeout' },
192+
id
193+
} as JSONRPCMessage);
194+
}, this.timeout);
195+
196+
// Store the response handler
197+
this.pendingResponses.set(id, { resolve, timer });
198+
199+
// Process the request
200+
if (this.onmessage) {
201+
this.onmessage(request);
202+
}
203+
});
204+
}
195205

196-
req.on('end', async () => {
197-
try {
198-
// Parse JSON-RPC request
199-
let request: JSONRPCMessage;
200-
201-
try {
202-
request = JSON.parse(body);
203-
debugLog('[HTTP Transport] Received request:', request);
204-
} catch (parseError) {
205-
debugLog('[HTTP Transport] Parse error:', parseError);
206-
sendError(res, 400, -32700, 'Parse error');
207-
return;
208-
}
209-
210-
// Create response buffer for this request
211-
const buffer = new ResponseBuffer();
212-
currentBuffer = buffer;
213-
214-
try {
215-
// Call the message handler (set by MCP Server)
216-
if (this.onmessage) {
217-
this.onmessage(request);
218-
219-
// Wait for handler to process and buffer responses
220-
// TODO: Replace with proper async handling
221-
await new Promise(resolve => setTimeout(resolve, 100));
222-
}
223-
224-
// Send buffered responses
225-
const responses = buffer.getAll();
226-
227-
if (responses.length > 0) {
228-
sendJSON(res, 200, responses.length === 1 ? responses[0] : responses);
229-
} else {
230-
// No response (notification or async processing)
231-
sendAccepted(res);
232-
}
233-
} finally {
234-
currentBuffer = null;
235-
}
236-
} catch (error: any) {
237-
debugLog('[HTTP Transport] Request error:', error);
238-
239-
if (!res.headersSent) {
240-
sendError(res, 500, -32603, error.message || 'Internal server error');
241-
}
206+
private async readBody(req: IncomingMessage): Promise<string> {
207+
return new Promise((resolve, reject) => {
208+
let body = '';
209+
210+
req.on('data', (chunk: Buffer) => {
211+
body += chunk.toString('utf-8');
212+
213+
// Prevent body from growing too large (1MB limit)
214+
if (body.length > 1024 * 1024) {
215+
req.removeAllListeners();
216+
reject(new Error('Payload too large'));
242217
}
243218
});
244219

245-
req.on('error', (error: Error) => {
246-
debugLog('[HTTP Transport] Request error:', error);
247-
if (!res.headersSent) {
248-
sendError(res, 400, -32600, 'Invalid request');
249-
}
220+
req.on('end', () => resolve(body));
221+
req.on('error', reject);
222+
});
223+
}
224+
225+
private parseRequest(body: string): JSONRPCMessage | null {
226+
try {
227+
return JSON.parse(body);
228+
} catch {
229+
return null;
230+
}
231+
}
232+
233+
// Helper methods
234+
private sendJSON(res: ServerResponse, statusCode: number, data: any): void {
235+
res.writeHead(statusCode, { 'Content-Type': 'application/json' });
236+
res.end(JSON.stringify(data));
237+
}
238+
239+
private sendError(res: ServerResponse, statusCode: number, code: number, message: string, id: any = null): void {
240+
this.sendJSON(res, statusCode, {
241+
jsonrpc: '2.0',
242+
error: { code, message },
243+
id
250244
});
251245
}
252246
}

0 commit comments

Comments
 (0)