• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mongodb-js / mongodb-mcp-server / 19374881356

14 Nov 2025 07:07PM UTC coverage: 80.199% (+0.05%) from 80.151%
19374881356

Pull #716

github

web-flow
Merge 13978612d into d462ff9f0
Pull Request #716: chore: add connection metadata to telemetry

1382 of 1835 branches covered (75.31%)

Branch coverage included in aggregate %.

20 of 36 new or added lines in 7 files covered. (55.56%)

111 existing lines in 5 files now uncovered.

6508 of 8003 relevant lines covered (81.32%)

71.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.71
/src/tools/mongodb/read/aggregate.ts
1
import { z } from "zod";
3✔
2
import type { AggregationCursor } from "mongodb";
3
import type { CallToolResult } from "@modelcontextprotocol/sdk/types.js";
4
import type { NodeDriverServiceProvider } from "@mongosh/service-provider-node-driver";
5
import { DbOperationArgs, MongoDBToolBase } from "../mongodbTool.js";
3✔
6
import type { ToolArgs, OperationType, ToolExecutionContext } from "../../tool.js";
7
import { formatUntrustedData } from "../../tool.js";
3✔
8
import { checkIndexUsage } from "../../../helpers/indexCheck.js";
3✔
9
import { type Document, EJSON } from "bson";
3✔
10
import { ErrorCodes, MongoDBError } from "../../../common/errors.js";
3✔
11
import { collectCursorUntilMaxBytesLimit } from "../../../helpers/collectCursorUntilMaxBytes.js";
3✔
12
import { operationWithFallback } from "../../../helpers/operationWithFallback.js";
3✔
13
import { AGG_COUNT_MAX_TIME_MS_CAP, ONE_MB, CURSOR_LIMITS_TO_LLM_TEXT } from "../../../helpers/constants.js";
3✔
14
import { LogId } from "../../../common/logger.js";
3✔
15
import { AnyAggregateStage, VectorSearchStage } from "../mongodbSchemas.js";
3✔
16
import {
3✔
17
    assertVectorSearchFilterFieldsAreIndexed,
18
    type SearchIndex,
19
} from "../../../helpers/assertVectorSearchFilterFieldsAreIndexed.js";
20

21
const pipelineDescriptionWithVectorSearch = `\
3✔
22
An array of aggregation stages to execute.
23
\`$vectorSearch\` **MUST** be the first stage of the pipeline, or the first stage of a \`$unionWith\` subpipeline.
24
### Usage Rules for \`$vectorSearch\`
25
- **Unset embeddings:**
26
  Unless the user explicitly requests the embeddings, add an \`$unset\` stage **at the end of the pipeline** to remove the embedding field and avoid context limits. **The $unset stage in this situation is mandatory**.
27
- **Pre-filtering:**
28
If the user requests additional filtering, include filters in \`$vectorSearch.filter\` only for pre-filter fields in the vector index.
29
    NEVER include fields in $vectorSearch.filter that are not part of the vector index.
30
- **Post-filtering:**
31
    For all remaining filters, add a $match stage after $vectorSearch.
32
### Note to LLM
33
- If unsure which fields are filterable, use the collection-indexes tool to determine valid prefilter fields.
34
- If no requested filters are valid prefilters, omit the filter key from $vectorSearch.\
35
`;
36

37
const genericPipelineDescription = "An array of aggregation stages to execute.";
3✔
38

39
export const getAggregateArgs = (vectorSearchEnabled: boolean) =>
3✔
40
    ({
309✔
41
        pipeline: z
309✔
42
            .array(vectorSearchEnabled ? z.union([AnyAggregateStage, VectorSearchStage]) : AnyAggregateStage)
309✔
43
            .describe(vectorSearchEnabled ? pipelineDescriptionWithVectorSearch : genericPipelineDescription),
309!
44
        responseBytesLimit: z.number().optional().default(ONE_MB).describe(`\
309✔
45
The maximum number of bytes to return in the response. This value is capped by the server's configured maxBytesPerQuery and cannot be exceeded. \
46
Note to LLM: If the entire aggregation result is required, use the "export" tool instead of increasing this limit.\
47
`),
309✔
48
    }) as const;
309✔
49

50
export class AggregateTool extends MongoDBToolBase {
3✔
51
    public name = "aggregate";
103✔
52
    protected description = "Run an aggregation against a MongoDB collection";
103✔
53
    protected argsShape = {
103✔
54
        ...DbOperationArgs,
103✔
55
        ...getAggregateArgs(this.isFeatureEnabled("vectorSearch")),
103✔
56
    };
103✔
57
    public operationType: OperationType = "read";
103✔
58

59
    protected async execute(
3✔
60
        { database, collection, pipeline, responseBytesLimit }: ToolArgs<typeof this.argsShape>,
21✔
61
        { signal }: ToolExecutionContext
21✔
62
    ): Promise<CallToolResult> {
21✔
63
        let aggregationCursor: AggregationCursor | undefined = undefined;
21✔
64
        try {
21✔
65
            const provider = await this.ensureConnected();
21✔
66
            await this.assertOnlyUsesPermittedStages(pipeline);
20✔
67
            if (await this.session.isSearchSupported()) {
21!
UNCOV
68
                assertVectorSearchFilterFieldsAreIndexed({
×
UNCOV
69
                    searchIndexes: (await provider.getSearchIndexes(database, collection)) as SearchIndex[],
×
UNCOV
70
                    pipeline,
×
UNCOV
71
                    logger: this.session.logger,
×
UNCOV
72
                });
×
UNCOV
73
            }
✔
74

75
            // Check if aggregate operation uses an index if enabled
76
            if (this.config.indexCheck) {
21✔
77
                const [usesVectorSearchIndex, indexName] = await this.isVectorSearchIndexUsed({
2✔
78
                    database,
2✔
79
                    collection,
2✔
80
                    pipeline,
2✔
81
                });
2✔
82
                switch (usesVectorSearchIndex) {
2✔
83
                    case "not-vector-search-query":
2✔
84
                        await checkIndexUsage(provider, database, collection, "aggregate", async () => {
2✔
85
                            return provider
2✔
86
                                .aggregate(database, collection, pipeline, {}, { writeConcern: undefined })
2✔
87
                                .explain("queryPlanner");
2✔
88
                        });
2✔
89
                        break;
1✔
90
                    case "non-existent-index":
2!
UNCOV
91
                        throw new MongoDBError(
×
UNCOV
92
                            ErrorCodes.AtlasVectorSearchIndexNotFound,
×
UNCOV
93
                            `Could not find an index with name "${indexName}" in namespace "${database}.${collection}".`
×
UNCOV
94
                        );
×
95
                    case "valid-index":
2!
96
                    // nothing to do, everything is correct so ready to run the query
97
                }
2✔
98
            }
2✔
99

100
            pipeline = await this.replaceRawValuesWithEmbeddingsIfNecessary({
11✔
101
                database,
11✔
102
                collection,
11✔
103
                pipeline,
11✔
104
            });
11✔
105

106
            const cappedResultsPipeline = [...pipeline];
11✔
107
            if (this.config.maxDocumentsPerQuery > 0) {
20✔
108
                cappedResultsPipeline.push({ $limit: this.config.maxDocumentsPerQuery });
10✔
109
            }
10✔
110
            aggregationCursor = provider.aggregate(database, collection, cappedResultsPipeline);
11✔
111

112
            const [totalDocuments, cursorResults] = await Promise.all([
11✔
113
                this.countAggregationResultDocuments({ provider, database, collection, pipeline }),
11✔
114
                collectCursorUntilMaxBytesLimit({
11✔
115
                    cursor: aggregationCursor,
11✔
116
                    configuredMaxBytesPerQuery: this.config.maxBytesPerQuery,
11✔
117
                    toolResponseBytesLimit: responseBytesLimit,
11✔
118
                    abortSignal: signal,
11✔
119
                }),
11✔
120
            ]);
11✔
121

122
            // If the total number of documents that the aggregation would've
123
            // resulted in would be greater than the configured
124
            // maxDocumentsPerQuery then we know for sure that the results were
125
            // capped.
126
            const aggregationResultsCappedByMaxDocumentsLimit =
11✔
127
                this.config.maxDocumentsPerQuery > 0 &&
11✔
128
                !!totalDocuments &&
10✔
129
                totalDocuments > this.config.maxDocumentsPerQuery;
6✔
130

131
            return {
21✔
132
                content: formatUntrustedData(
21✔
133
                    this.generateMessage({
21✔
134
                        aggResultsCount: totalDocuments,
21✔
135
                        documents: cursorResults.documents,
21✔
136
                        appliedLimits: [
21✔
137
                            aggregationResultsCappedByMaxDocumentsLimit ? "config.maxDocumentsPerQuery" : undefined,
21✔
138
                            cursorResults.cappedBy,
21✔
139
                        ].filter((limit): limit is keyof typeof CURSOR_LIMITS_TO_LLM_TEXT => !!limit),
21✔
140
                    }),
21✔
141
                    ...(cursorResults.documents.length > 0 ? [EJSON.stringify(cursorResults.documents)] : [])
21✔
142
                ),
21✔
143
            };
21✔
144
        } finally {
21✔
145
            if (aggregationCursor) {
21✔
146
                void this.safeCloseCursor(aggregationCursor);
11✔
147
            }
11✔
148
        }
21✔
149
    }
21✔
150

151
    private async safeCloseCursor(cursor: AggregationCursor<unknown>): Promise<void> {
3✔
152
        try {
11✔
153
            await cursor.close();
11✔
154
        } catch (error) {
11!
UNCOV
155
            this.session.logger.warning({
×
UNCOV
156
                id: LogId.mongodbCursorCloseError,
×
UNCOV
157
                context: "aggregate tool",
×
UNCOV
158
                message: `Error when closing the cursor - ${error instanceof Error ? error.message : String(error)}`,
×
UNCOV
159
            });
×
UNCOV
160
        }
×
161
    }
11✔
162

163
    private async assertOnlyUsesPermittedStages(pipeline: Record<string, unknown>[]): Promise<void> {
3✔
164
        const writeOperations: OperationType[] = ["update", "create", "delete"];
20✔
165
        const isSearchSupported = await this.session.isSearchSupported();
20✔
166

167
        let writeStageForbiddenError = "";
20✔
168

169
        if (this.config.readOnly) {
20✔
170
            writeStageForbiddenError = "In readOnly mode you can not run pipelines with $out or $merge stages.";
2✔
171
        } else if (this.config.disabledTools.some((t) => writeOperations.includes(t as OperationType))) {
20✔
172
            writeStageForbiddenError =
6✔
173
                "When 'create', 'update', or 'delete' operations are disabled, you can not run pipelines with $out or $merge stages.";
6✔
174
        }
6✔
175

176
        for (const stage of pipeline) {
20✔
177
            // This validates that in readOnly mode or "write" operations are disabled, we can't use $out or $merge.
178
            // This is really important because aggregates are the only "multi-faceted" tool in the MQL, where you
179
            // can both read and write.
180
            if ((stage.$out || stage.$merge) && writeStageForbiddenError) {
28✔
181
                throw new MongoDBError(ErrorCodes.ForbiddenWriteOperation, writeStageForbiddenError);
8✔
182
            }
8✔
183

184
            // This ensure that you can't use $vectorSearch if the cluster does not support MongoDB Search
185
            // either in Atlas or in a local cluster.
186
            if (stage.$vectorSearch && !isSearchSupported) {
28!
UNCOV
187
                throw new MongoDBError(
×
UNCOV
188
                    ErrorCodes.AtlasSearchNotSupported,
×
UNCOV
189
                    "Atlas Search is not supported in this cluster."
×
UNCOV
190
                );
×
UNCOV
191
            }
×
192
        }
28✔
193
    }
20✔
194

195
    private async countAggregationResultDocuments({
3✔
196
        provider,
11✔
197
        database,
11✔
198
        collection,
11✔
199
        pipeline,
11✔
200
    }: {
11✔
201
        provider: NodeDriverServiceProvider;
202
        database: string;
203
        collection: string;
204
        pipeline: Document[];
205
    }): Promise<number | undefined> {
11✔
206
        const resultsCountAggregation = [...pipeline, { $count: "totalDocuments" }];
11✔
207
        return await operationWithFallback(async (): Promise<number | undefined> => {
11✔
208
            const aggregationResults = await provider
11✔
209
                .aggregate(database, collection, resultsCountAggregation)
11✔
210
                .maxTimeMS(AGG_COUNT_MAX_TIME_MS_CAP)
11✔
211
                .toArray();
11✔
212

213
            const documentWithCount: unknown = aggregationResults.length === 1 ? aggregationResults[0] : undefined;
11✔
214
            const totalDocuments =
11✔
215
                documentWithCount &&
11✔
216
                typeof documentWithCount === "object" &&
7✔
217
                "totalDocuments" in documentWithCount &&
7✔
218
                typeof documentWithCount.totalDocuments === "number"
7✔
219
                    ? documentWithCount.totalDocuments
7✔
220
                    : 0;
3✔
221

222
            return totalDocuments;
11✔
223
        }, undefined);
11✔
224
    }
11✔
225

226
    private async replaceRawValuesWithEmbeddingsIfNecessary({
3✔
227
        database,
11✔
228
        collection,
11✔
229
        pipeline,
11✔
230
    }: {
11✔
231
        database: string;
232
        collection: string;
233
        pipeline: Document[];
234
    }): Promise<Document[]> {
11✔
235
        for (const stage of pipeline) {
11✔
236
            if ("$vectorSearch" in stage) {
18!
237
                const { $vectorSearch: vectorSearchStage } = stage as z.infer<typeof VectorSearchStage>;
×
238

239
                if (Array.isArray(vectorSearchStage.queryVector)) {
×
240
                    continue;
×
241
                }
×
242

243
                if (!vectorSearchStage.embeddingParameters) {
×
244
                    throw new MongoDBError(
×
UNCOV
245
                        ErrorCodes.AtlasVectorSearchInvalidQuery,
×
246
                        "embeddingModel is mandatory if queryVector is a raw string."
×
247
                    );
×
248
                }
×
249

250
                const embeddingParameters = vectorSearchStage.embeddingParameters;
×
UNCOV
251
                delete vectorSearchStage.embeddingParameters;
×
252

253
                await this.session.vectorSearchEmbeddingsManager.assertVectorSearchIndexExists({
×
254
                    database,
×
255
                    collection,
×
256
                    path: vectorSearchStage.path,
×
UNCOV
257
                });
×
258

259
                const [embeddings] = await this.session.vectorSearchEmbeddingsManager.generateEmbeddings({
×
260
                    rawValues: [vectorSearchStage.queryVector],
×
261
                    embeddingParameters,
×
262
                    inputType: "query",
×
263
                });
×
264

UNCOV
265
                if (!embeddings) {
×
UNCOV
266
                    throw new MongoDBError(
×
UNCOV
267
                        ErrorCodes.AtlasVectorSearchInvalidQuery,
×
268
                        "Failed to generate embeddings for the query vector."
×
269
                    );
×
UNCOV
270
                }
×
271

272
                // $vectorSearch.queryVector can be a BSON.Binary: that it's not either number or an array.
273
                // It's not exactly valid from the LLM perspective (they can't provide binaries).
274
                // That's why we overwrite the stage in an untyped way, as what we expose and what LLMs can use is different.
UNCOV
275
                vectorSearchStage.queryVector = embeddings as string | number[];
×
UNCOV
276
            }
×
277
        }
18✔
278

279
        await this.session.vectorSearchEmbeddingsManager.assertFieldsHaveCorrectEmbeddings(
11✔
280
            { database, collection },
11✔
281
            pipeline
11✔
282
        );
11✔
283

284
        return pipeline;
11✔
285
    }
11✔
286

287
    private async isVectorSearchIndexUsed({
3✔
288
        database,
2✔
289
        collection,
2✔
290
        pipeline,
2✔
291
    }: {
2✔
292
        database: string;
293
        collection: string;
294
        pipeline: Document[];
295
    }): Promise<["valid-index" | "non-existent-index" | "not-vector-search-query", string?]> {
2✔
296
        // check if the pipeline contains a $vectorSearch stage
297
        let usesVectorSearch = false;
2✔
298
        let indexName: string = "default";
2✔
299

300
        for (const stage of pipeline) {
2✔
301
            if ("$vectorSearch" in stage) {
3!
UNCOV
302
                const { $vectorSearch: vectorSearchStage } = stage as z.infer<typeof VectorSearchStage>;
×
UNCOV
303
                usesVectorSearch = true;
×
UNCOV
304
                indexName = vectorSearchStage.index;
×
UNCOV
305
                break;
×
306
            }
×
307
        }
3✔
308

309
        if (!usesVectorSearch) {
2✔
310
            return ["not-vector-search-query"];
2✔
311
        }
2!
312

UNCOV
313
        const indexExists = await this.session.vectorSearchEmbeddingsManager.indexExists({
×
UNCOV
314
            database,
×
UNCOV
315
            collection,
×
UNCOV
316
            indexName,
×
UNCOV
317
        });
×
318

319
        return [indexExists ? "valid-index" : "non-existent-index", indexName];
2!
320
    }
2✔
321

322
    private generateMessage({
3✔
323
        aggResultsCount,
11✔
324
        documents,
11✔
325
        appliedLimits,
11✔
326
    }: {
11✔
327
        aggResultsCount: number | undefined;
328
        documents: unknown[];
329
        appliedLimits: (keyof typeof CURSOR_LIMITS_TO_LLM_TEXT)[];
330
    }): string {
11✔
331
        const appliedLimitText = appliedLimits.length
11✔
332
            ? `\
3✔
333
while respecting the applied limits of ${appliedLimits.map((limit) => CURSOR_LIMITS_TO_LLM_TEXT[limit]).join(", ")}. \
3✔
334
Note to LLM: If the entire query result is required then use "export" tool to export the query results.\
335
`
336
            : "";
8✔
337

338
        return `\
11✔
339
The aggregation resulted in ${aggResultsCount === undefined ? "indeterminable number of" : aggResultsCount} documents. \
11✔
340
Returning ${documents.length} documents${appliedLimitText ? ` ${appliedLimitText}` : "."}\
11✔
341
`;
342
    }
11✔
343
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc