• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mongodb-js / mongodb-mcp-server / 18710860243

22 Oct 2025 08:56AM UTC coverage: 81.332% (-0.6%) from 81.905%
18710860243

Pull #662

github

web-flow
Merge 89a556dbd into 17b595b2f
Pull Request #662: chore: When querying with vectorSearch use the generated embeddings MCP-245

1366 of 1859 branches covered (73.48%)

Branch coverage included in aggregate %.

97 of 182 new or added lines in 4 files covered. (53.3%)

8 existing lines in 2 files now uncovered.

6289 of 7553 relevant lines covered (83.26%)

146.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.15
/src/tools/mongodb/read/aggregate.ts
1
import { z } from "zod";
6✔
2
import type { AggregationCursor } from "mongodb";
3
import type { CallToolResult } from "@modelcontextprotocol/sdk/types.js";
4
import type { NodeDriverServiceProvider } from "@mongosh/service-provider-node-driver";
5
import { DbOperationArgs, MongoDBToolBase } from "../mongodbTool.js";
6✔
6
import type { ToolArgs, OperationType, ToolExecutionContext } from "../../tool.js";
7
import { formatUntrustedData } from "../../tool.js";
6✔
8
import { checkIndexUsage } from "../../../helpers/indexCheck.js";
6✔
9
import { type Document, EJSON } from "bson";
6✔
10
import { ErrorCodes, MongoDBError } from "../../../common/errors.js";
6✔
11
import { collectCursorUntilMaxBytesLimit } from "../../../helpers/collectCursorUntilMaxBytes.js";
6✔
12
import { operationWithFallback } from "../../../helpers/operationWithFallback.js";
6✔
13
import { AGG_COUNT_MAX_TIME_MS_CAP, ONE_MB, CURSOR_LIMITS_TO_LLM_TEXT } from "../../../helpers/constants.js";
6✔
14
import { zEJSON } from "../../args.js";
6✔
15
import { LogId } from "../../../common/logger.js";
6✔
16
import { zSupportedEmbeddingParameters } from "../../../common/search/embeddingsProvider.js";
6✔
17

18
const AnyStage = zEJSON();
6✔
19
const VectorSearchStage = z.object({
6✔
20
    $vectorSearch: z
6✔
21
        .object({
6✔
22
            exact: z
6✔
23
                .boolean()
6✔
24
                .optional()
6✔
25
                .default(false)
6✔
26
                .describe(
6✔
27
                    "When true, uses an ENN algorithm, otherwise uses ANN. Using ENN is not compatible with numCandidates, in that case, numCandidates must be left empty."
6✔
28
                ),
6✔
29
            index: z.string().describe("Name of the index, as retrieved from the `collection-indexes` tool."),
6✔
30
            path: z
6✔
31
                .string()
6✔
32
                .describe(
6✔
33
                    "Field, in dot notation, where to search. There must be a vector search index for that field. Note to LLM: When unsure, use the 'collection-indexes' tool to validate that the field is indexed with a vector search index."
6✔
34
                ),
6✔
35
            queryVector: z
6✔
36
                .union([z.string(), z.array(z.number())])
6✔
37
                .describe(
6✔
38
                    "The content to search for. The embeddingParameters field is mandatory if the queryVector is a string, in that case, the tool generates the embedding automatically using the provided configuration."
6✔
39
                ),
6✔
40
            numCandidates: z
6✔
41
                .number()
6✔
42
                .int()
6✔
43
                .positive()
6✔
44
                .optional()
6✔
45
                .describe("Number of candidates for the ANN algorithm. Mandatory when exact is false."),
6✔
46
            limit: z.number().int().positive().optional().default(10),
6✔
47
            filter: zEJSON()
6✔
48
                .optional()
6✔
49
                .describe(
6✔
50
                    "MQL filter that can only use pre-filter fields from the index definition. Note to LLM: If unsure, use the `collection-indexes` tool to learn which fields can be used for pre-filtering."
6✔
51
                ),
6✔
52
            embeddingParameters: zSupportedEmbeddingParameters
6✔
53
                .optional()
6✔
54
                .describe(
6✔
55
                    "The embedding model and its parameters to use to generate embeddings before searching. It is mandatory if queryVector is a string value. Note to LLM: If unsure, ask the user before providing one."
6✔
56
                ),
6✔
57
        })
6✔
58
        .passthrough(),
6✔
59
});
6✔
60

61
export const AggregateArgs = {
6✔
62
    pipeline: z
6✔
63
        .array(z.union([AnyStage, VectorSearchStage]))
6✔
64
        .describe(
6✔
65
            "An array of aggregation stages to execute. $vectorSearch can only appear as the first stage of the aggregation pipeline or as the first stage of a $unionWith subpipeline. When using $vectorSearch, unless the user explicitly asks for the embeddings, $unset any embedding field to avoid reaching context limits."
6✔
66
        ),
6✔
67
    responseBytesLimit: z.number().optional().default(ONE_MB).describe(`\
6✔
68
The maximum number of bytes to return in the response. This value is capped by the server’s configured maxBytesPerQuery and cannot be exceeded. \
69
Note to LLM: If the entire aggregation result is required, use the "export" tool instead of increasing this limit.\
70
`),
6✔
71
};
6✔
72

73
export class AggregateTool extends MongoDBToolBase {
6✔
74
    public name = "aggregate";
196✔
75
    protected description = "Run an aggregation against a MongoDB collection";
196✔
76
    protected argsShape = {
196✔
77
        ...DbOperationArgs,
196✔
78
        ...AggregateArgs,
196✔
79
    };
196✔
80
    public operationType: OperationType = "read";
196✔
81

82
    protected async execute(
6✔
83
        { database, collection, pipeline, responseBytesLimit }: ToolArgs<typeof this.argsShape>,
42✔
84
        { signal }: ToolExecutionContext
42✔
85
    ): Promise<CallToolResult> {
42✔
86
        let aggregationCursor: AggregationCursor | undefined = undefined;
42✔
87
        try {
42✔
88
            const provider = await this.ensureConnected();
42✔
89
            await this.assertOnlyUsesPermittedStages(pipeline);
40✔
90

91
            // Check if aggregate operation uses an index if enabled
92
            if (this.config.indexCheck) {
42✔
93
                await checkIndexUsage(provider, database, collection, "aggregate", async () => {
4✔
94
                    return provider
4✔
95
                        .aggregate(database, collection, pipeline, {}, { writeConcern: undefined })
4✔
96
                        .explain("queryPlanner");
4✔
97
                });
4✔
98
            }
2✔
99

100
            pipeline = await this.replaceRawValuesWithEmbeddingsIfNecessary({
22✔
101
                database,
22✔
102
                collection,
22✔
103
                pipeline,
22✔
104
            });
22✔
105

106
            const cappedResultsPipeline = [...pipeline];
22✔
107
            if (this.config.maxDocumentsPerQuery > 0) {
40✔
108
                cappedResultsPipeline.push({ $limit: this.config.maxDocumentsPerQuery });
20✔
109
            }
20✔
110
            aggregationCursor = provider.aggregate(database, collection, cappedResultsPipeline);
22✔
111

112
            const [totalDocuments, cursorResults] = await Promise.all([
22✔
113
                this.countAggregationResultDocuments({ provider, database, collection, pipeline }),
22✔
114
                collectCursorUntilMaxBytesLimit({
22✔
115
                    cursor: aggregationCursor,
22✔
116
                    configuredMaxBytesPerQuery: this.config.maxBytesPerQuery,
22✔
117
                    toolResponseBytesLimit: responseBytesLimit,
22✔
118
                    abortSignal: signal,
22✔
119
                }),
22✔
120
            ]);
22✔
121

122
            // If the total number of documents that the aggregation would've
123
            // resulted in would be greater than the configured
124
            // maxDocumentsPerQuery then we know for sure that the results were
125
            // capped.
126
            const aggregationResultsCappedByMaxDocumentsLimit =
22✔
127
                this.config.maxDocumentsPerQuery > 0 &&
22✔
128
                !!totalDocuments &&
20✔
129
                totalDocuments > this.config.maxDocumentsPerQuery;
12✔
130

131
            return {
42✔
132
                content: formatUntrustedData(
42✔
133
                    this.generateMessage({
42✔
134
                        aggResultsCount: totalDocuments,
42✔
135
                        documents: cursorResults.documents,
42✔
136
                        appliedLimits: [
42✔
137
                            aggregationResultsCappedByMaxDocumentsLimit ? "config.maxDocumentsPerQuery" : undefined,
42✔
138
                            cursorResults.cappedBy,
42✔
139
                        ].filter((limit): limit is keyof typeof CURSOR_LIMITS_TO_LLM_TEXT => !!limit),
42✔
140
                    }),
42✔
141
                    ...(cursorResults.documents.length > 0 ? [EJSON.stringify(cursorResults.documents)] : [])
42✔
142
                ),
42✔
143
            };
42✔
144
        } finally {
42✔
145
            if (aggregationCursor) {
42✔
146
                void this.safeCloseCursor(aggregationCursor);
22✔
147
            }
22✔
148
        }
42✔
149
    }
42✔
150

151
    private async safeCloseCursor(cursor: AggregationCursor<unknown>): Promise<void> {
6✔
152
        try {
22✔
153
            await cursor.close();
22✔
154
        } catch (error) {
22!
155
            this.session.logger.warning({
×
156
                id: LogId.mongodbCursorCloseError,
×
157
                context: "aggregate tool",
×
158
                message: `Error when closing the cursor - ${error instanceof Error ? error.message : String(error)}`,
×
159
            });
×
160
        }
×
161
    }
22✔
162

163
    private async assertOnlyUsesPermittedStages(pipeline: Record<string, unknown>[]): Promise<void> {
6✔
164
        const writeOperations: OperationType[] = ["update", "create", "delete"];
40✔
165
        const isSearchSupported = await this.session.isSearchSupported();
40✔
166

167
        let writeStageForbiddenError = "";
40✔
168

169
        if (this.config.readOnly) {
40✔
170
            writeStageForbiddenError = "In readOnly mode you can not run pipelines with $out or $merge stages.";
4✔
171
        } else if (this.config.disabledTools.some((t) => writeOperations.includes(t as OperationType))) {
40✔
172
            writeStageForbiddenError =
12✔
173
                "When 'create', 'update', or 'delete' operations are disabled, you can not run pipelines with $out or $merge stages.";
12✔
174
        }
12✔
175

176
        for (const stage of pipeline) {
40✔
177
            if ((stage.$out || stage.$merge) && writeStageForbiddenError) {
56✔
178
                throw new MongoDBError(ErrorCodes.ForbiddenWriteOperation, writeStageForbiddenError);
16✔
179
            }
16✔
180

181
            if (stage.$vectorSearch && !isSearchSupported) {
56!
NEW
UNCOV
182
                throw new MongoDBError(
×
NEW
183
                    ErrorCodes.AtlasSearchNotSupported,
×
NEW
184
                    "Atlas Search is not supported in this cluster."
×
NEW
185
                );
×
NEW
186
            }
×
187
        }
56✔
188
    }
40✔
189

190
    private async countAggregationResultDocuments({
6✔
191
        provider,
22✔
192
        database,
22✔
193
        collection,
22✔
194
        pipeline,
22✔
195
    }: {
22✔
196
        provider: NodeDriverServiceProvider;
197
        database: string;
198
        collection: string;
199
        pipeline: Document[];
200
    }): Promise<number | undefined> {
22✔
201
        const resultsCountAggregation = [...pipeline, { $count: "totalDocuments" }];
22✔
202
        return await operationWithFallback(async (): Promise<number | undefined> => {
22✔
203
            const aggregationResults = await provider
22✔
204
                .aggregate(database, collection, resultsCountAggregation)
22✔
205
                .maxTimeMS(AGG_COUNT_MAX_TIME_MS_CAP)
22✔
206
                .toArray();
22✔
207

208
            const documentWithCount: unknown = aggregationResults.length === 1 ? aggregationResults[0] : undefined;
22✔
209
            const totalDocuments =
22✔
210
                documentWithCount &&
22✔
211
                typeof documentWithCount === "object" &&
14✔
212
                "totalDocuments" in documentWithCount &&
14✔
213
                typeof documentWithCount.totalDocuments === "number"
14✔
214
                    ? documentWithCount.totalDocuments
14✔
215
                    : 0;
6✔
216

217
            return totalDocuments;
22✔
218
        }, undefined);
22✔
219
    }
22✔
220

221
    private async replaceRawValuesWithEmbeddingsIfNecessary({
6✔
222
        database,
22✔
223
        collection,
22✔
224
        pipeline,
22✔
225
    }: {
22✔
226
        database: string;
227
        collection: string;
228
        pipeline: Document[];
229
    }): Promise<Document[]> {
22✔
230
        for (const stage of pipeline) {
22✔
231
            if ("$vectorSearch" in stage) {
36!
NEW
232
                const { $vectorSearch: vectorSearchStage } = stage as z.infer<typeof VectorSearchStage>;
×
233

NEW
234
                if (Array.isArray(vectorSearchStage.queryVector)) {
×
NEW
235
                    continue;
×
NEW
236
                }
×
237

NEW
238
                if (!vectorSearchStage.embeddingParameters) {
×
NEW
239
                    throw new MongoDBError(
×
NEW
240
                        ErrorCodes.AtlasVectorSearchInvalidQuery,
×
NEW
241
                        "embeddingModel is mandatory if queryVector is a raw string."
×
NEW
242
                    );
×
NEW
243
                }
×
244

NEW
245
                const embeddingParameters = vectorSearchStage.embeddingParameters;
×
NEW
246
                delete vectorSearchStage.embeddingParameters;
×
247

NEW
248
                const [embeddings] = await this.session.vectorSearchEmbeddingsManager.generateEmbeddings({
×
NEW
249
                    database,
×
NEW
250
                    collection,
×
NEW
251
                    path: vectorSearchStage.path,
×
NEW
252
                    rawValues: [vectorSearchStage.queryVector],
×
NEW
253
                    embeddingParameters,
×
NEW
254
                    inputType: "query",
×
NEW
255
                });
×
256

257
                // $vectorSearch.queryVector can be a BSON.Binary: that it's not either number or an array.
258
                // It's not exactly valid from the LLM perspective (they can't provide binaries).
259
                // That's why we overwrite the stage in an untyped way, as what we expose and what LLMs can use is different.
NEW
260
                vectorSearchStage.queryVector = embeddings as number[];
×
NEW
261
            }
×
262
        }
36✔
263

264
        return pipeline;
22✔
265
    }
22✔
266

267
    private generateMessage({
6✔
268
        aggResultsCount,
22✔
269
        documents,
22✔
270
        appliedLimits,
22✔
271
    }: {
22✔
272
        aggResultsCount: number | undefined;
273
        documents: unknown[];
274
        appliedLimits: (keyof typeof CURSOR_LIMITS_TO_LLM_TEXT)[];
275
    }): string {
22✔
276
        const appliedLimitText = appliedLimits.length
22✔
277
            ? `\
6✔
278
while respecting the applied limits of ${appliedLimits.map((limit) => CURSOR_LIMITS_TO_LLM_TEXT[limit]).join(", ")}. \
6✔
279
Note to LLM: If the entire query result is required then use "export" tool to export the query results.\
280
`
281
            : "";
16✔
282

283
        return `\
22✔
284
The aggregation resulted in ${aggResultsCount === undefined ? "indeterminable number of" : aggResultsCount} documents. \
22✔
285
Returning ${documents.length} documents${appliedLimitText ? ` ${appliedLimitText}` : "."}\
22✔
286
`;
287
    }
22✔
288
}
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc