• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mongodb-js / mongodb-mcp-server / 18912204743

29 Oct 2025 02:55PM UTC coverage: 79.885% (-0.3%) from 80.227%
18912204743

Pull #688

github

web-flow
Merge 645ff8615 into 01f799ccb
Pull Request #688: feat: add support for automatic embeddings for the insert many tool

1364 of 1844 branches covered (73.97%)

Branch coverage included in aggregate %.

92 of 151 new or added lines in 4 files covered. (60.93%)

7 existing lines in 1 file now uncovered.

6412 of 7890 relevant lines covered (81.27%)

70.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.82
/src/tools/mongodb/read/aggregate.ts
1
import { z } from "zod";
3✔
2
import type { AggregationCursor } from "mongodb";
3
import type { CallToolResult } from "@modelcontextprotocol/sdk/types.js";
4
import type { NodeDriverServiceProvider } from "@mongosh/service-provider-node-driver";
5
import { DbOperationArgs, MongoDBToolBase } from "../mongodbTool.js";
3✔
6
import type { ToolArgs, OperationType, ToolExecutionContext } from "../../tool.js";
7
import { formatUntrustedData } from "../../tool.js";
3✔
8
import { checkIndexUsage } from "../../../helpers/indexCheck.js";
3✔
9
import { type Document, EJSON } from "bson";
3✔
10
import { ErrorCodes, MongoDBError } from "../../../common/errors.js";
3✔
11
import { collectCursorUntilMaxBytesLimit } from "../../../helpers/collectCursorUntilMaxBytes.js";
3✔
12
import { operationWithFallback } from "../../../helpers/operationWithFallback.js";
3✔
13
import { AGG_COUNT_MAX_TIME_MS_CAP, ONE_MB, CURSOR_LIMITS_TO_LLM_TEXT } from "../../../helpers/constants.js";
3✔
14
import { zEJSON } from "../../args.js";
3✔
15
import { LogId } from "../../../common/logger.js";
3✔
16
import { zSupportedEmbeddingParameters } from "../../../common/search/embeddingsProvider.js";
3✔
17

18
const AnyStage = zEJSON();
3✔
19
const VectorSearchStage = z.object({
3✔
20
    $vectorSearch: z
3✔
21
        .object({
3✔
22
            exact: z
3✔
23
                .boolean()
3✔
24
                .optional()
3✔
25
                .default(false)
3✔
26
                .describe(
3✔
27
                    "When true, uses an ENN algorithm, otherwise uses ANN. Using ENN is not compatible with numCandidates, in that case, numCandidates must be left empty."
3✔
28
                ),
3✔
29
            index: z.string().describe("Name of the index, as retrieved from the `collection-indexes` tool."),
3✔
30
            path: z
3✔
31
                .string()
3✔
32
                .describe(
3✔
33
                    "Field, in dot notation, where to search. There must be a vector search index for that field. Note to LLM: When unsure, use the 'collection-indexes' tool to validate that the field is indexed with a vector search index."
3✔
34
                ),
3✔
35
            queryVector: z
3✔
36
                .union([z.string(), z.array(z.number())])
3✔
37
                .describe(
3✔
38
                    "The content to search for. The embeddingParameters field is mandatory if the queryVector is a string, in that case, the tool generates the embedding automatically using the provided configuration."
3✔
39
                ),
3✔
40
            numCandidates: z
3✔
41
                .number()
3✔
42
                .int()
3✔
43
                .positive()
3✔
44
                .optional()
3✔
45
                .describe("Number of candidates for the ANN algorithm. Mandatory when exact is false."),
3✔
46
            limit: z.number().int().positive().optional().default(10),
3✔
47
            filter: zEJSON()
3✔
48
                .optional()
3✔
49
                .describe(
3✔
50
                    "MQL filter that can only use filter fields from the index definition. Note to LLM: If unsure, use the `collection-indexes` tool to learn which fields can be used for filtering."
3✔
51
                ),
3✔
52
            embeddingParameters: zSupportedEmbeddingParameters
3✔
53
                .optional()
3✔
54
                .describe(
3✔
55
                    "The embedding model and its parameters to use to generate embeddings before searching. It is mandatory if queryVector is a string value. Note to LLM: If unsure, ask the user before providing one."
3✔
56
                ),
3✔
57
        })
3✔
58
        .passthrough(),
3✔
59
});
3✔
60

61
export const AggregateArgs = {
3✔
62
    pipeline: z.array(z.union([AnyStage, VectorSearchStage])).describe(
3✔
63
        `An array of aggregation stages to execute.  
3✔
64
\`$vectorSearch\` **MUST** be the first stage of the pipeline, or the first stage of a \`$unionWith\` subpipeline.
65
### Usage Rules for \`$vectorSearch\`
66
- **Unset embeddings:**  
67
  Unless the user explicitly requests the embeddings, add an \`$unset\` stage **at the end of the pipeline** to remove the embedding field and avoid context limits. **The $unset stage in this situation is mandatory**.
68
- **Pre-filtering:**
69
If the user requests additional filtering, include filters in \`$vectorSearch.filter\` only for pre-filter fields in the vector index.
70
    NEVER include fields in $vectorSearch.filter that are not part of the vector index.
71
- **Post-filtering:**
72
    For all remaining filters, add a $match stage after $vectorSearch.
73
### Note to LLM
74
- If unsure which fields are filterable, use the collection-indexes tool to determine valid prefilter fields.
75
- If no requested filters are valid prefilters, omit the filter key from $vectorSearch.`
76
    ),
3✔
77
    responseBytesLimit: z.number().optional().default(ONE_MB).describe(`\
3✔
78
The maximum number of bytes to return in the response. This value is capped by the server's configured maxBytesPerQuery and cannot be exceeded. \
79
Note to LLM: If the entire aggregation result is required, use the "export" tool instead of increasing this limit.\
80
`),
3✔
81
};
3✔
82

83
export class AggregateTool extends MongoDBToolBase {
3✔
84
    public name = "aggregate";
99✔
85
    protected description = "Run an aggregation against a MongoDB collection";
99✔
86
    protected argsShape = {
99✔
87
        ...DbOperationArgs,
99✔
88
        ...AggregateArgs,
99✔
89
    };
99✔
90
    public operationType: OperationType = "read";
99✔
91

92
    protected async execute(
3✔
93
        { database, collection, pipeline, responseBytesLimit }: ToolArgs<typeof this.argsShape>,
22✔
94
        { signal }: ToolExecutionContext
22✔
95
    ): Promise<CallToolResult> {
22✔
96
        let aggregationCursor: AggregationCursor | undefined = undefined;
22✔
97
        try {
22✔
98
            const provider = await this.ensureConnected();
22✔
99
            await this.assertOnlyUsesPermittedStages(pipeline);
21✔
100

101
            // Check if aggregate operation uses an index if enabled
102
            if (this.config.indexCheck) {
22✔
103
                const [usesVectorSearchIndex, indexName] = await this.isVectorSearchIndexUsed({
3✔
104
                    database,
3✔
105
                    collection,
3✔
106
                    pipeline,
3✔
107
                });
3✔
108
                switch (usesVectorSearchIndex) {
3✔
109
                    case "not-vector-search-query":
3✔
110
                        await checkIndexUsage(provider, database, collection, "aggregate", async () => {
2✔
111
                            return provider
2✔
112
                                .aggregate(database, collection, pipeline, {}, { writeConcern: undefined })
2✔
113
                                .explain("queryPlanner");
2✔
114
                        });
2✔
115
                        break;
1✔
116
                    case "non-existent-index":
3✔
117
                        throw new MongoDBError(
1✔
118
                            ErrorCodes.AtlasVectorSearchIndexNotFound,
1✔
119
                            `Could not find an index with name "${indexName}" in namespace "${database}.${collection}".`
1✔
120
                        );
1✔
121
                    case "valid-index":
3!
122
                    // nothing to do, everything is correct so ready to run the query
123
                }
3✔
124
            }
3✔
125

126
            pipeline = await this.replaceRawValuesWithEmbeddingsIfNecessary({
11✔
127
                database,
11✔
128
                collection,
11✔
129
                pipeline,
11✔
130
            });
11✔
131

132
            const cappedResultsPipeline = [...pipeline];
11✔
133
            if (this.config.maxDocumentsPerQuery > 0) {
21✔
134
                cappedResultsPipeline.push({ $limit: this.config.maxDocumentsPerQuery });
10✔
135
            }
10✔
136
            aggregationCursor = provider.aggregate(database, collection, cappedResultsPipeline);
11✔
137

138
            const [totalDocuments, cursorResults] = await Promise.all([
11✔
139
                this.countAggregationResultDocuments({ provider, database, collection, pipeline }),
11✔
140
                collectCursorUntilMaxBytesLimit({
11✔
141
                    cursor: aggregationCursor,
11✔
142
                    configuredMaxBytesPerQuery: this.config.maxBytesPerQuery,
11✔
143
                    toolResponseBytesLimit: responseBytesLimit,
11✔
144
                    abortSignal: signal,
11✔
145
                }),
11✔
146
            ]);
11✔
147

148
            // If the total number of documents that the aggregation would've
149
            // resulted in would be greater than the configured
150
            // maxDocumentsPerQuery then we know for sure that the results were
151
            // capped.
152
            const aggregationResultsCappedByMaxDocumentsLimit =
11✔
153
                this.config.maxDocumentsPerQuery > 0 &&
11✔
154
                !!totalDocuments &&
10✔
155
                totalDocuments > this.config.maxDocumentsPerQuery;
6✔
156

157
            return {
22✔
158
                content: formatUntrustedData(
22✔
159
                    this.generateMessage({
22✔
160
                        aggResultsCount: totalDocuments,
22✔
161
                        documents: cursorResults.documents,
22✔
162
                        appliedLimits: [
22✔
163
                            aggregationResultsCappedByMaxDocumentsLimit ? "config.maxDocumentsPerQuery" : undefined,
22✔
164
                            cursorResults.cappedBy,
22✔
165
                        ].filter((limit): limit is keyof typeof CURSOR_LIMITS_TO_LLM_TEXT => !!limit),
22✔
166
                    }),
22✔
167
                    ...(cursorResults.documents.length > 0 ? [EJSON.stringify(cursorResults.documents)] : [])
22✔
168
                ),
22✔
169
            };
22✔
170
        } finally {
22✔
171
            if (aggregationCursor) {
22✔
172
                void this.safeCloseCursor(aggregationCursor);
11✔
173
            }
11✔
174
        }
22✔
175
    }
22✔
176

177
    private async safeCloseCursor(cursor: AggregationCursor<unknown>): Promise<void> {
3✔
178
        try {
11✔
179
            await cursor.close();
11✔
180
        } catch (error) {
11!
181
            this.session.logger.warning({
×
182
                id: LogId.mongodbCursorCloseError,
×
183
                context: "aggregate tool",
×
184
                message: `Error when closing the cursor - ${error instanceof Error ? error.message : String(error)}`,
×
185
            });
×
186
        }
×
187
    }
11✔
188

189
    private async assertOnlyUsesPermittedStages(pipeline: Record<string, unknown>[]): Promise<void> {
3✔
190
        const writeOperations: OperationType[] = ["update", "create", "delete"];
21✔
191
        const isSearchSupported = await this.session.isSearchSupported();
21✔
192

193
        let writeStageForbiddenError = "";
21✔
194

195
        if (this.config.readOnly) {
21✔
196
            writeStageForbiddenError = "In readOnly mode you can not run pipelines with $out or $merge stages.";
2✔
197
        } else if (this.config.disabledTools.some((t) => writeOperations.includes(t as OperationType))) {
21✔
198
            writeStageForbiddenError =
6✔
199
                "When 'create', 'update', or 'delete' operations are disabled, you can not run pipelines with $out or $merge stages.";
6✔
200
        }
6✔
201

202
        for (const stage of pipeline) {
21✔
203
            // This validates that in readOnly mode or "write" operations are disabled, we can't use $out or $merge.
204
            // This is really important because aggregates are the only "multi-faceted" tool in the MQL, where you
205
            // can both read and write.
206
            if ((stage.$out || stage.$merge) && writeStageForbiddenError) {
30✔
207
                throw new MongoDBError(ErrorCodes.ForbiddenWriteOperation, writeStageForbiddenError);
8✔
208
            }
8✔
209

210
            // This ensure that you can't use $vectorSearch if the cluster does not support MongoDB Search
211
            // either in Atlas or in a local cluster.
212
            if (stage.$vectorSearch && !isSearchSupported) {
30!
213
                throw new MongoDBError(
×
214
                    ErrorCodes.AtlasSearchNotSupported,
×
215
                    "Atlas Search is not supported in this cluster."
×
216
                );
×
217
            }
×
218
        }
30✔
219
    }
21✔
220

221
    private async countAggregationResultDocuments({
3✔
222
        provider,
11✔
223
        database,
11✔
224
        collection,
11✔
225
        pipeline,
11✔
226
    }: {
11✔
227
        provider: NodeDriverServiceProvider;
228
        database: string;
229
        collection: string;
230
        pipeline: Document[];
231
    }): Promise<number | undefined> {
11✔
232
        const resultsCountAggregation = [...pipeline, { $count: "totalDocuments" }];
11✔
233
        return await operationWithFallback(async (): Promise<number | undefined> => {
11✔
234
            const aggregationResults = await provider
11✔
235
                .aggregate(database, collection, resultsCountAggregation)
11✔
236
                .maxTimeMS(AGG_COUNT_MAX_TIME_MS_CAP)
11✔
237
                .toArray();
11✔
238

239
            const documentWithCount: unknown = aggregationResults.length === 1 ? aggregationResults[0] : undefined;
11✔
240
            const totalDocuments =
11✔
241
                documentWithCount &&
11✔
242
                typeof documentWithCount === "object" &&
7✔
243
                "totalDocuments" in documentWithCount &&
7✔
244
                typeof documentWithCount.totalDocuments === "number"
7✔
245
                    ? documentWithCount.totalDocuments
7✔
246
                    : 0;
3✔
247

248
            return totalDocuments;
11✔
249
        }, undefined);
11✔
250
    }
11✔
251

252
    private async replaceRawValuesWithEmbeddingsIfNecessary({
3✔
253
        database,
11✔
254
        collection,
11✔
255
        pipeline,
11✔
256
    }: {
11✔
257
        database: string;
258
        collection: string;
259
        pipeline: Document[];
260
    }): Promise<Document[]> {
11✔
261
        for (const stage of pipeline) {
11✔
262
            if ("$vectorSearch" in stage) {
18!
263
                const { $vectorSearch: vectorSearchStage } = stage as z.infer<typeof VectorSearchStage>;
×
264

265
                if (Array.isArray(vectorSearchStage.queryVector)) {
×
266
                    continue;
×
267
                }
×
268

269
                if (!vectorSearchStage.embeddingParameters) {
×
270
                    throw new MongoDBError(
×
271
                        ErrorCodes.AtlasVectorSearchInvalidQuery,
×
272
                        "embeddingModel is mandatory if queryVector is a raw string."
×
273
                    );
×
274
                }
×
275

276
                const embeddingParameters = vectorSearchStage.embeddingParameters;
×
277
                delete vectorSearchStage.embeddingParameters;
×
278

NEW
279
                await this.session.vectorSearchEmbeddingsManager.assertVectorSearchIndexExists({
×
280
                    database,
×
281
                    collection,
×
282
                    path: vectorSearchStage.path,
×
NEW
283
                });
×
284

NEW
285
                const [embeddings] = await this.session.vectorSearchEmbeddingsManager.generateEmbeddings({
×
286
                    rawValues: [vectorSearchStage.queryVector],
×
287
                    embeddingParameters,
×
288
                    inputType: "query",
×
289
                });
×
290

NEW
291
                if (!embeddings) {
×
NEW
292
                    throw new MongoDBError(
×
NEW
293
                        ErrorCodes.AtlasVectorSearchInvalidQuery,
×
NEW
294
                        "Failed to generate embeddings for the query vector."
×
NEW
295
                    );
×
NEW
296
                }
×
297

298
                // $vectorSearch.queryVector can be a BSON.Binary: that it's not either number or an array.
299
                // It's not exactly valid from the LLM perspective (they can't provide binaries).
300
                // That's why we overwrite the stage in an untyped way, as what we expose and what LLMs can use is different.
NEW
301
                vectorSearchStage.queryVector = embeddings;
×
302
            }
×
303
        }
18✔
304

305
        await this.session.vectorSearchEmbeddingsManager.assertFieldsHaveCorrectEmbeddings(
11✔
306
            { database, collection },
11✔
307
            pipeline
11✔
308
        );
11✔
309

310
        return pipeline;
11✔
311
    }
11✔
312

313
    private async isVectorSearchIndexUsed({
3✔
314
        database,
3✔
315
        collection,
3✔
316
        pipeline,
3✔
317
    }: {
3✔
318
        database: string;
319
        collection: string;
320
        pipeline: Document[];
321
    }): Promise<["valid-index" | "non-existent-index" | "not-vector-search-query", string?]> {
3✔
322
        // check if the pipeline contains a $vectorSearch stage
323
        let usesVectorSearch = false;
3✔
324
        let indexName: string = "default";
3✔
325

326
        for (const stage of pipeline) {
3✔
327
            if ("$vectorSearch" in stage) {
4✔
328
                const { $vectorSearch: vectorSearchStage } = stage as z.infer<typeof VectorSearchStage>;
1✔
329
                usesVectorSearch = true;
1✔
330
                indexName = vectorSearchStage.index;
1✔
331
                break;
1✔
332
            }
1✔
333
        }
4✔
334

335
        if (!usesVectorSearch) {
3✔
336
            return ["not-vector-search-query"];
2✔
337
        }
2✔
338

339
        const indexExists = await this.session.vectorSearchEmbeddingsManager.indexExists({
1✔
340
            database,
1✔
341
            collection,
1✔
342
            indexName,
1✔
343
        });
1✔
344

345
        return [indexExists ? "valid-index" : "non-existent-index", indexName];
3!
346
    }
3✔
347

348
    private generateMessage({
3✔
349
        aggResultsCount,
11✔
350
        documents,
11✔
351
        appliedLimits,
11✔
352
    }: {
11✔
353
        aggResultsCount: number | undefined;
354
        documents: unknown[];
355
        appliedLimits: (keyof typeof CURSOR_LIMITS_TO_LLM_TEXT)[];
356
    }): string {
11✔
357
        const appliedLimitText = appliedLimits.length
11✔
358
            ? `\
3✔
359
while respecting the applied limits of ${appliedLimits.map((limit) => CURSOR_LIMITS_TO_LLM_TEXT[limit]).join(", ")}. \
3✔
360
Note to LLM: If the entire query result is required then use "export" tool to export the query results.\
361
`
362
            : "";
8✔
363

364
        return `\
11✔
365
The aggregation resulted in ${aggResultsCount === undefined ? "indeterminable number of" : aggResultsCount} documents. \
11✔
366
Returning ${documents.length} documents${appliedLimitText ? ` ${appliedLimitText}` : "."}\
11✔
367
`;
368
    }
11✔
369
}
3✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc