• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mongodb-js / mongodb-mcp-server / 17486339999

05 Sep 2025 07:10AM UTC coverage: 81.12% (-0.07%) from 81.188%
17486339999

Pull #521

github

web-flow
Merge 724add974 into 1f68c3e5d
Pull Request #521: fix: don't wait for telemetry events

925 of 1235 branches covered (74.9%)

Branch coverage included in aggregate %.

49 of 60 new or added lines in 4 files covered. (81.67%)

1 existing line in 1 file now uncovered.

4682 of 5677 relevant lines covered (82.47%)

90.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.28
/src/transports/streamableHttp.ts
1
import express from "express";
2!
2
import type http from "http";
3
import { randomUUID } from "crypto";
2✔
4
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
2✔
5
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
2✔
6
import { LogId } from "../common/logger.js";
2✔
7
import { SessionStore } from "../common/sessionStore.js";
2✔
8
import { TransportRunnerBase, type TransportRunnerConfig } from "./base.js";
2✔
9

10
const JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED = -32000;
2✔
11
const JSON_RPC_ERROR_CODE_SESSION_ID_REQUIRED = -32001;
2✔
12
const JSON_RPC_ERROR_CODE_SESSION_ID_INVALID = -32002;
2✔
13
const JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND = -32003;
2✔
14
const JSON_RPC_ERROR_CODE_INVALID_REQUEST = -32004;
2✔
15

16
export class StreamableHttpRunner extends TransportRunnerBase {
2✔
17
    private httpServer: http.Server | undefined;
18
    private sessionStore!: SessionStore;
19

20
    constructor(config: TransportRunnerConfig) {
2✔
21
        super(config);
14✔
22
    }
14✔
23

24
    public get serverAddress(): string {
2✔
25
        const result = this.httpServer?.address();
36✔
26
        if (typeof result === "string") {
36!
27
            return result;
×
28
        }
×
29
        if (typeof result === "object" && result) {
36✔
30
            return `http://${result.address}:${result.port}`;
36✔
31
        }
36!
32

33
        throw new Error("Server is not started yet");
×
34
    }
36✔
35

36
    async start(): Promise<void> {
2✔
37
        const app = express();
14✔
38
        this.sessionStore = new SessionStore(
14✔
39
            this.userConfig.idleTimeoutMs,
14✔
40
            this.userConfig.notificationTimeoutMs,
14✔
41
            this.logger
14✔
42
        );
14✔
43

44
        app.enable("trust proxy"); // needed for reverse proxy support
14✔
45
        app.use(express.json());
14✔
46
        app.use((req, res, next) => {
14✔
47
            for (const [key, value] of Object.entries(this.userConfig.httpHeaders)) {
52✔
48
                const header = req.headers[key.toLowerCase()];
20✔
49
                if (!header || header !== value) {
20✔
50
                    res.status(403).send({ error: `Invalid value for header "${key}"` });
4✔
51
                    return;
4✔
52
                }
4✔
53
            }
20✔
54

55
            next();
48✔
56
        });
14✔
57

58
        const handleSessionRequest = async (req: express.Request, res: express.Response): Promise<void> => {
14✔
59
            const sessionId = req.headers["mcp-session-id"];
36✔
60
            if (!sessionId) {
36!
61
                res.status(400).json({
×
62
                    jsonrpc: "2.0",
×
63
                    error: {
×
64
                        code: JSON_RPC_ERROR_CODE_SESSION_ID_REQUIRED,
×
65
                        message: `session id is required`,
×
66
                    },
×
67
                });
×
68
                return;
×
69
            }
×
70
            if (typeof sessionId !== "string") {
36!
71
                res.status(400).json({
×
72
                    jsonrpc: "2.0",
×
73
                    error: {
×
74
                        code: JSON_RPC_ERROR_CODE_SESSION_ID_INVALID,
×
75
                        message: "session id is invalid",
×
76
                    },
×
77
                });
×
78
                return;
×
79
            }
×
80
            const transport = this.sessionStore.getSession(sessionId);
36✔
81
            if (!transport) {
36!
82
                res.status(404).json({
×
83
                    jsonrpc: "2.0",
×
84
                    error: {
×
85
                        code: JSON_RPC_ERROR_CODE_SESSION_NOT_FOUND,
×
86
                        message: "session not found",
×
87
                    },
×
88
                });
×
89
                return;
×
90
            }
×
91
            await transport.handleRequest(req, res, req.body);
36✔
92
        };
36✔
93

94
        app.post(
14✔
95
            "/mcp",
14✔
96
            this.withErrorHandling(async (req: express.Request, res: express.Response) => {
14✔
97
                const sessionId = req.headers["mcp-session-id"];
36✔
98
                if (sessionId) {
36✔
99
                    await handleSessionRequest(req, res);
24✔
100
                    return;
24✔
101
                }
24✔
102

103
                if (!isInitializeRequest(req.body)) {
36!
104
                    res.status(400).json({
×
105
                        jsonrpc: "2.0",
×
106
                        error: {
×
107
                            code: JSON_RPC_ERROR_CODE_INVALID_REQUEST,
×
108
                            message: `invalid request`,
×
109
                        },
×
110
                    });
×
111
                    return;
×
112
                }
✔
113

114
                const server = await this.setupServer();
12✔
115
                let keepAliveLoop: NodeJS.Timeout;
12✔
116
                const transport = new StreamableHTTPServerTransport({
12✔
117
                    sessionIdGenerator: (): string => randomUUID().toString(),
12✔
118
                    onsessioninitialized: (sessionId): void => {
12✔
119
                        server.session.logger.setAttribute("sessionId", sessionId);
12✔
120

121
                        this.sessionStore.setSession(sessionId, transport, server.session.logger);
12✔
122

123
                        let failedPings = 0;
12✔
124
                        // eslint-disable-next-line @typescript-eslint/no-misused-promises
125
                        keepAliveLoop = setInterval(async () => {
12✔
126
                            try {
×
NEW
127
                                server.session.logger.debug({
×
128
                                    id: LogId.streamableHttpTransportKeepAlive,
×
129
                                    context: "streamableHttpTransport",
×
130
                                    message: "Sending ping",
×
131
                                });
×
132

133
                                await transport.send({
×
134
                                    jsonrpc: "2.0",
×
135
                                    method: "ping",
×
136
                                });
×
137
                                failedPings = 0;
×
138
                            } catch (err) {
×
139
                                try {
×
140
                                    failedPings++;
×
NEW
141
                                    server.session.logger.warning({
×
142
                                        id: LogId.streamableHttpTransportKeepAliveFailure,
×
143
                                        context: "streamableHttpTransport",
×
144
                                        message: `Error sending ping (attempt #${failedPings}): ${err instanceof Error ? err.message : String(err)}`,
×
145
                                    });
×
146

147
                                    if (failedPings > 3) {
×
148
                                        clearInterval(keepAliveLoop);
×
149
                                        await transport.close();
×
150
                                    }
×
151
                                } catch {
×
152
                                    // Ignore the error of the transport close as there's nothing else
153
                                    // we can do at this point.
154
                                }
×
155
                            }
×
156
                        }, 30_000);
12✔
157
                    },
12✔
158
                    onsessionclosed: async (sessionId): Promise<void> => {
12✔
159
                        try {
×
160
                            await this.sessionStore.closeSession(sessionId, false);
×
161
                        } catch (error) {
×
162
                            this.logger.error({
×
163
                                id: LogId.streamableHttpTransportSessionCloseFailure,
×
164
                                context: "streamableHttpTransport",
×
NEW
165
                                message: `Error closing session ${sessionId}: ${error instanceof Error ? error.message : String(error)}`,
×
166
                            });
×
167
                        }
×
168
                    },
×
169
                });
12✔
170

171
                transport.onclose = (): void => {
12✔
172
                    clearInterval(keepAliveLoop);
12✔
173

174
                    server.close().catch((error) => {
12✔
175
                        this.logger.error({
×
176
                            id: LogId.streamableHttpTransportCloseFailure,
×
177
                            context: "streamableHttpTransport",
×
178
                            message: `Error closing server: ${error instanceof Error ? error.message : String(error)}`,
×
179
                        });
×
180
                    });
12✔
181
                };
12✔
182

183
                await server.connect(transport);
12✔
184

185
                await transport.handleRequest(req, res, req.body);
12✔
186
            })
14✔
187
        );
14✔
188

189
        app.get("/mcp", this.withErrorHandling(handleSessionRequest));
14✔
190
        app.delete("/mcp", this.withErrorHandling(handleSessionRequest));
14✔
191

192
        this.httpServer = await new Promise<http.Server>((resolve, reject) => {
14✔
193
            const result = app.listen(this.userConfig.httpPort, this.userConfig.httpHost, (err?: Error) => {
14✔
194
                if (err) {
14!
195
                    reject(err);
×
196
                    return;
×
197
                }
×
198
                resolve(result);
14✔
199
            });
14✔
200
        });
14✔
201

202
        this.logger.info({
14✔
203
            id: LogId.streamableHttpTransportStarted,
14✔
204
            context: "streamableHttpTransport",
14✔
205
            message: `Server started on ${this.serverAddress}`,
14✔
206
            noRedaction: true,
14✔
207
        });
14✔
208
    }
14✔
209

210
    async closeTransport(): Promise<void> {
2✔
211
        await Promise.all([
12✔
212
            this.sessionStore.closeAllSessions(),
12✔
213
            new Promise<void>((resolve, reject) => {
12✔
214
                this.httpServer?.close((err) => {
12✔
215
                    if (err) {
12!
216
                        reject(err);
×
217
                        return;
×
218
                    }
×
219
                    resolve();
12✔
220
                });
12✔
221
            }),
12✔
222
        ]);
12✔
223
    }
12✔
224

225
    private withErrorHandling(
2✔
226
        fn: (req: express.Request, res: express.Response, next: express.NextFunction) => Promise<void>
42✔
227
    ) {
42✔
228
        return (req: express.Request, res: express.Response, next: express.NextFunction): void => {
42✔
229
            fn(req, res, next).catch((error) => {
48✔
230
                this.logger.error({
×
231
                    id: LogId.streamableHttpTransportRequestFailure,
×
232
                    context: "streamableHttpTransport",
×
233
                    message: `Error handling request: ${error instanceof Error ? error.message : String(error)}`,
×
234
                });
×
235
                res.status(400).json({
×
236
                    jsonrpc: "2.0",
×
237
                    error: {
×
238
                        code: JSON_RPC_ERROR_CODE_PROCESSING_REQUEST_FAILED,
×
239
                        message: `failed to handle request`,
×
240
                        data: error instanceof Error ? error.message : String(error),
×
241
                    },
×
242
                });
×
243
            });
48✔
244
        };
48✔
245
    }
42✔
246
}
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc