• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bagerard / mongoengine / 16950450531

13 Aug 2025 10:07PM UTC coverage: 94.028% (-0.5%) from 94.481%
16950450531

push

github

bagerard
add useful cr comment for .path in install_mongo

5322 of 5660 relevant lines covered (94.03%)

1.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.76
/mongoengine/queryset/transform.py
1
from collections import defaultdict
2✔
2

3
import pymongo
2✔
4
from bson import SON, ObjectId
2✔
5
from bson.dbref import DBRef
2✔
6

7
from mongoengine.base import UPDATE_OPERATORS
2✔
8
from mongoengine.common import _import_class
2✔
9
from mongoengine.errors import InvalidQueryError
2✔
10

11
__all__ = ("query", "update", "STRING_OPERATORS")
2✔
12

13
COMPARISON_OPERATORS = (
2✔
14
    "ne",
15
    "gt",
16
    "gte",
17
    "lt",
18
    "lte",
19
    "in",
20
    "nin",
21
    "mod",
22
    "all",
23
    "size",
24
    "exists",
25
    "not",
26
    "elemMatch",
27
    "type",
28
)
29
GEO_OPERATORS = (
2✔
30
    "within_distance",
31
    "within_spherical_distance",
32
    "within_box",
33
    "within_polygon",
34
    "near",
35
    "near_sphere",
36
    "max_distance",
37
    "min_distance",
38
    "geo_within",
39
    "geo_within_box",
40
    "geo_within_polygon",
41
    "geo_within_center",
42
    "geo_within_sphere",
43
    "geo_intersects",
44
)
45
STRING_OPERATORS = (
2✔
46
    "contains",
47
    "icontains",
48
    "startswith",
49
    "istartswith",
50
    "endswith",
51
    "iendswith",
52
    "exact",
53
    "iexact",
54
    "regex",
55
    "iregex",
56
    "wholeword",
57
    "iwholeword",
58
)
59
CUSTOM_OPERATORS = ("match",)
2✔
60
MATCH_OPERATORS = (
2✔
61
    COMPARISON_OPERATORS + GEO_OPERATORS + STRING_OPERATORS + CUSTOM_OPERATORS
62
)
63

64

65
def handle_raw_query(value, mongo_query):
2✔
66
    """Combine a raw query with an existing one"""
67
    for op, v in value.items():
2✔
68
        if op not in mongo_query:
2✔
69
            mongo_query[op] = v
2✔
70
        elif op in mongo_query and isinstance(mongo_query[op], dict):
2✔
71
            mongo_query[op].update(v)
2✔
72

73

74
# TODO make this less complex
75
def query(_doc_cls=None, **kwargs):
2✔
76
    """Transform a query from Django-style format to Mongo format."""
77
    mongo_query = {}
2✔
78
    merge_query = defaultdict(list)
2✔
79
    for key, value in sorted(kwargs.items()):
2✔
80
        if key == "__raw__":
2✔
81
            handle_raw_query(value, mongo_query)
2✔
82
            continue
2✔
83

84
        parts = key.rsplit("__")
2✔
85
        indices = [(i, p) for i, p in enumerate(parts) if p.isdigit()]
2✔
86
        parts = [part for part in parts if not part.isdigit()]
2✔
87
        # Check for an operator and transform to mongo-style if there is
88
        op = None
2✔
89
        if len(parts) > 1 and parts[-1] in MATCH_OPERATORS:
2✔
90
            op = parts.pop()
2✔
91

92
        # Allow to escape operator-like field name by __
93
        if len(parts) > 1 and parts[-1] == "":
2✔
94
            parts.pop()
2✔
95

96
        negate = False
2✔
97
        if len(parts) > 1 and parts[-1] == "not":
2✔
98
            parts.pop()
2✔
99
            negate = True
2✔
100

101
        if _doc_cls:
2✔
102
            # Switch field names to proper names [set in Field(name='foo')]
103
            try:
2✔
104
                fields = _doc_cls._lookup_field(parts)
2✔
105
            except Exception as e:
2✔
106
                raise InvalidQueryError(e)
2✔
107
            parts = []
2✔
108

109
            CachedReferenceField = _import_class("CachedReferenceField")
2✔
110
            GenericReferenceField = _import_class("GenericReferenceField")
2✔
111

112
            cleaned_fields = []
2✔
113
            for field in fields:
2✔
114
                append_field = True
2✔
115
                if isinstance(field, str):
2✔
116
                    parts.append(field)
2✔
117
                    append_field = False
2✔
118
                # is last and CachedReferenceField
119
                elif isinstance(field, CachedReferenceField) and fields[-1] == field:
2✔
120
                    parts.append("%s._id" % field.db_field)
2✔
121
                else:
122
                    parts.append(field.db_field)
2✔
123

124
                if append_field:
2✔
125
                    cleaned_fields.append(field)
2✔
126

127
            # Convert value to proper value
128
            field = cleaned_fields[-1]
2✔
129

130
            singular_ops = [None, "ne", "gt", "gte", "lt", "lte", "not"]
2✔
131
            singular_ops += STRING_OPERATORS
2✔
132
            if op in singular_ops:
2✔
133
                value = field.prepare_query_value(op, value)
2✔
134

135
                if isinstance(field, CachedReferenceField) and value:
2✔
136
                    value = value["_id"]
2✔
137

138
            elif op in ("in", "nin", "all", "near") and not isinstance(value, dict):
2✔
139
                # Raise an error if the in/nin/all/near param is not iterable.
140
                value = _prepare_query_for_iterable(field, op, value)
2✔
141

142
            # If we're querying a GenericReferenceField, we need to alter the
143
            # key depending on the value:
144
            # * If the value is a DBRef, the key should be "field_name._ref".
145
            # * If the value is an ObjectId, the key should be "field_name._ref.$id".
146
            if isinstance(field, GenericReferenceField):
2✔
147
                if isinstance(value, DBRef):
2✔
148
                    parts[-1] += "._ref"
2✔
149
                elif isinstance(value, ObjectId):
2✔
150
                    parts[-1] += "._ref.$id"
2✔
151

152
        # if op and op not in COMPARISON_OPERATORS:
153
        if op:
2✔
154
            if op in GEO_OPERATORS:
2✔
155
                value = _geo_operator(field, op, value)
2✔
156
            elif op in ("match", "elemMatch"):
2✔
157
                ListField = _import_class("ListField")
2✔
158
                EmbeddedDocumentField = _import_class("EmbeddedDocumentField")
2✔
159
                if (
2✔
160
                    isinstance(value, dict)
161
                    and isinstance(field, ListField)
162
                    and isinstance(field.field, EmbeddedDocumentField)
163
                ):
164
                    value = query(field.field.document_type, **value)
2✔
165
                else:
166
                    value = field.prepare_query_value(op, value)
2✔
167
                value = {"$elemMatch": value}
2✔
168
            elif op in CUSTOM_OPERATORS:
2✔
169
                NotImplementedError(
×
170
                    'Custom method "%s" has not ' "been implemented" % op
171
                )
172
            elif op not in STRING_OPERATORS:
2✔
173
                value = {"$" + op: value}
2✔
174

175
        if negate:
2✔
176
            value = {"$not": value}
2✔
177

178
        for i, part in indices:
2✔
179
            parts.insert(i, part)
2✔
180

181
        key = ".".join(parts)
2✔
182

183
        if key not in mongo_query:
2✔
184
            mongo_query[key] = value
2✔
185
        else:
186
            if isinstance(mongo_query[key], dict) and isinstance(value, dict):
2✔
187
                mongo_query[key].update(value)
2✔
188
                # $max/minDistance needs to come last - convert to SON
189
                value_dict = mongo_query[key]
2✔
190
                if ("$maxDistance" in value_dict or "$minDistance" in value_dict) and (
2✔
191
                    "$near" in value_dict or "$nearSphere" in value_dict
192
                ):
193
                    value_son = SON()
2✔
194
                    for k, v in value_dict.items():
2✔
195
                        if k == "$maxDistance" or k == "$minDistance":
2✔
196
                            continue
2✔
197
                        value_son[k] = v
2✔
198
                    # Required for MongoDB >= 2.6, may fail when combining
199
                    # PyMongo 3+ and MongoDB < 2.6
200
                    near_embedded = False
2✔
201
                    for near_op in ("$near", "$nearSphere"):
2✔
202
                        if isinstance(value_dict.get(near_op), dict):
2✔
203
                            value_son[near_op] = SON(value_son[near_op])
2✔
204
                            if "$maxDistance" in value_dict:
2✔
205
                                value_son[near_op]["$maxDistance"] = value_dict[
2✔
206
                                    "$maxDistance"
207
                                ]
208
                            if "$minDistance" in value_dict:
2✔
209
                                value_son[near_op]["$minDistance"] = value_dict[
2✔
210
                                    "$minDistance"
211
                                ]
212
                            near_embedded = True
2✔
213

214
                    if not near_embedded:
2✔
215
                        if "$maxDistance" in value_dict:
2✔
216
                            value_son["$maxDistance"] = value_dict["$maxDistance"]
2✔
217
                        if "$minDistance" in value_dict:
2✔
218
                            value_son["$minDistance"] = value_dict["$minDistance"]
2✔
219
                    mongo_query[key] = value_son
2✔
220
            else:
221
                # Store for manually merging later
222
                merge_query[key].append(value)
2✔
223

224
    # The queryset has been filter in such a way we must manually merge
225
    for k, v in merge_query.items():
2✔
226
        merge_query[k].append(mongo_query[k])
2✔
227
        del mongo_query[k]
2✔
228
        if isinstance(v, list):
2✔
229
            value = [{k: val} for val in v]
2✔
230
            if "$and" in mongo_query.keys():
2✔
231
                mongo_query["$and"].extend(value)
×
232
            else:
233
                mongo_query["$and"] = value
2✔
234

235
    return mongo_query
2✔
236

237

238
def update(_doc_cls=None, **update):
2✔
239
    """Transform an update spec from Django-style format to Mongo
240
    format.
241
    """
242
    mongo_update = {}
2✔
243

244
    for key, value in update.items():
2✔
245
        if key == "__raw__":
2✔
246
            handle_raw_query(value, mongo_update)
2✔
247
            continue
2✔
248

249
        parts = key.split("__")
2✔
250

251
        # if there is no operator, default to 'set'
252
        if len(parts) < 3 and parts[0] not in UPDATE_OPERATORS:
2✔
253
            parts.insert(0, "set")
2✔
254

255
        # Check for an operator and transform to mongo-style if there is
256
        op = None
2✔
257
        if parts[0] in UPDATE_OPERATORS:
2✔
258
            op = parts.pop(0)
2✔
259
            # Convert Pythonic names to Mongo equivalents
260
            operator_map = {
2✔
261
                "push_all": "pushAll",
262
                "pull_all": "pullAll",
263
                "dec": "inc",
264
                "add_to_set": "addToSet",
265
                "set_on_insert": "setOnInsert",
266
            }
267
            if op == "dec":
2✔
268
                # Support decrement by flipping a positive value's sign
269
                # and using 'inc'
270
                value = -value
2✔
271
            # If the operator doesn't found from operator map, the op value
272
            # will stay unchanged
273
            op = operator_map.get(op, op)
2✔
274

275
        match = None
2✔
276

277
        if len(parts) == 1:
2✔
278
            # typical update like set__field
279
            # but also allows to update a field named like a comparison operator
280
            # like set__type = "something" (without clashing with the 'type' operator)
281
            pass
2✔
282
        elif len(parts) > 1:
2✔
283
            # can be either an embedded field like set__foo__bar
284
            # or a comparison operator as in pull__foo__in
285
            if parts[-1] in COMPARISON_OPERATORS:
2✔
286
                match = parts.pop()  # e.g. pop 'in' from pull__foo__in
2✔
287

288
            # Allow to escape operator-like field name by __
289
            # e.g. in the case of an embedded foo.type field
290
            # Doc.objects().update(set__foo__type="bar")
291
            # see https://github.com/MongoEngine/mongoengine/pull/1351
292
            if parts[-1] == "":
2✔
293
                match = parts.pop()  # e.g. pop last '__' from set__foo__type__
2✔
294

295
        if _doc_cls:
2✔
296
            # Switch field names to proper names [set in Field(name='foo')]
297
            try:
2✔
298
                fields = _doc_cls._lookup_field(parts)
2✔
299
            except Exception as e:
2✔
300
                raise InvalidQueryError(e)
2✔
301
            parts = []
2✔
302

303
            cleaned_fields = []
2✔
304
            appended_sub_field = False
2✔
305
            for field in fields:
2✔
306
                append_field = True
2✔
307
                if isinstance(field, str):
2✔
308
                    # Convert the S operator to $
309
                    if field == "S":
2✔
310
                        field = "$"
2✔
311
                    parts.append(field)
2✔
312
                    append_field = False
2✔
313
                else:
314
                    parts.append(field.db_field)
2✔
315
                if append_field:
2✔
316
                    appended_sub_field = False
2✔
317
                    cleaned_fields.append(field)
2✔
318
                    if hasattr(field, "field"):
2✔
319
                        cleaned_fields.append(field.field)
2✔
320
                        appended_sub_field = True
2✔
321

322
            # Convert value to proper value
323
            if appended_sub_field:
2✔
324
                field = cleaned_fields[-2]
2✔
325
            else:
326
                field = cleaned_fields[-1]
2✔
327

328
            GeoJsonBaseField = _import_class("GeoJsonBaseField")
2✔
329
            if isinstance(field, GeoJsonBaseField):
2✔
330
                value = field.to_mongo(value)
2✔
331

332
            if op == "pull":
2✔
333
                if field.required or value is not None:
2✔
334
                    if match in ("in", "nin") and not isinstance(value, dict):
2✔
335
                        value = _prepare_query_for_iterable(field, op, value)
2✔
336
                    else:
337
                        value = field.prepare_query_value(op, value)
2✔
338
            elif op == "push" and isinstance(value, (list, tuple, set)):
2✔
339
                value = [field.prepare_query_value(op, v) for v in value]
2✔
340
            elif op in (None, "set", "push"):
2✔
341
                if field.required or value is not None:
2✔
342
                    value = field.prepare_query_value(op, value)
2✔
343
            elif op in ("pushAll", "pullAll"):
2✔
344
                value = [field.prepare_query_value(op, v) for v in value]
2✔
345
            elif op in ("addToSet", "setOnInsert"):
2✔
346
                if isinstance(value, (list, tuple, set)):
2✔
347
                    value = [field.prepare_query_value(op, v) for v in value]
2✔
348
                elif field.required or value is not None:
2✔
349
                    value = field.prepare_query_value(op, value)
2✔
350
            elif op == "unset":
2✔
351
                value = 1
2✔
352
            elif op == "inc":
2✔
353
                value = field.prepare_query_value(op, value)
2✔
354

355
        if match:
2✔
356
            match = "$" + match
2✔
357
            value = {match: value}
2✔
358

359
        key = ".".join(parts)
2✔
360

361
        if "pull" in op and "." in key:
2✔
362
            # Dot operators don't work on pull operations
363
            # unless they point to a list field
364
            # Otherwise it uses nested dict syntax
365
            if op == "pullAll":
2✔
366
                raise InvalidQueryError(
2✔
367
                    "pullAll operations only support a single field depth"
368
                )
369

370
            # Look for the last list field and use dot notation until there
371
            field_classes = [c.__class__ for c in cleaned_fields]
2✔
372
            field_classes.reverse()
2✔
373
            ListField = _import_class("ListField")
2✔
374
            EmbeddedDocumentListField = _import_class("EmbeddedDocumentListField")
2✔
375
            if ListField in field_classes or EmbeddedDocumentListField in field_classes:
2✔
376
                # Join all fields via dot notation to the last ListField or EmbeddedDocumentListField
377
                # Then process as normal
378
                if ListField in field_classes:
2✔
379
                    _check_field = ListField
2✔
380
                else:
381
                    _check_field = EmbeddedDocumentListField
2✔
382

383
                last_listField = len(cleaned_fields) - field_classes.index(_check_field)
2✔
384
                key = ".".join(parts[:last_listField])
2✔
385
                parts = parts[last_listField:]
2✔
386
                parts.insert(0, key)
2✔
387

388
            parts.reverse()
2✔
389
            for key in parts:
2✔
390
                value = {key: value}
2✔
391
        elif op == "addToSet" and isinstance(value, list):
2✔
392
            value = {key: {"$each": value}}
2✔
393
        elif op in ("push", "pushAll"):
2✔
394
            if parts[-1].isdigit():
2✔
395
                key = ".".join(parts[0:-1])
2✔
396
                position = int(parts[-1])
2✔
397
                # $position expects an iterable. If pushing a single value,
398
                # wrap it in a list.
399
                if not isinstance(value, (set, tuple, list)):
2✔
400
                    value = [value]
2✔
401
                value = {key: {"$each": value, "$position": position}}
2✔
402
            else:
403
                if op == "pushAll":
2✔
404
                    op = "push"  # convert to non-deprecated keyword
2✔
405
                    if not isinstance(value, (set, tuple, list)):
2✔
406
                        value = [value]
×
407
                    value = {key: {"$each": value}}
2✔
408
                else:
409
                    value = {key: value}
2✔
410
        else:
411
            value = {key: value}
2✔
412
        key = "$" + op
2✔
413
        if key not in mongo_update:
2✔
414
            mongo_update[key] = value
2✔
415
        elif key in mongo_update and isinstance(mongo_update[key], dict):
2✔
416
            mongo_update[key].update(value)
2✔
417

418
    return mongo_update
2✔
419

420

421
def _geo_operator(field, op, value):
2✔
422
    """Helper to return the query for a given geo query."""
423
    if op == "max_distance":
2✔
424
        value = {"$maxDistance": value}
2✔
425
    elif op == "min_distance":
2✔
426
        value = {"$minDistance": value}
2✔
427
    elif field._geo_index == pymongo.GEO2D:
2✔
428
        if op == "within_distance":
2✔
429
            value = {"$within": {"$center": value}}
2✔
430
        elif op == "within_spherical_distance":
2✔
431
            value = {"$within": {"$centerSphere": value}}
2✔
432
        elif op == "within_polygon":
2✔
433
            value = {"$within": {"$polygon": value}}
2✔
434
        elif op == "near":
2✔
435
            value = {"$near": value}
2✔
436
        elif op == "near_sphere":
2✔
437
            value = {"$nearSphere": value}
2✔
438
        elif op == "within_box":
2✔
439
            value = {"$within": {"$box": value}}
2✔
440
        else:
441
            raise NotImplementedError(
×
442
                'Geo method "%s" has not been ' "implemented for a GeoPointField" % op
443
            )
444
    else:
445
        if op == "geo_within":
2✔
446
            value = {"$geoWithin": _infer_geometry(value)}
2✔
447
        elif op == "geo_within_box":
2✔
448
            value = {"$geoWithin": {"$box": value}}
2✔
449
        elif op == "geo_within_polygon":
2✔
450
            value = {"$geoWithin": {"$polygon": value}}
2✔
451
        elif op == "geo_within_center":
2✔
452
            value = {"$geoWithin": {"$center": value}}
2✔
453
        elif op == "geo_within_sphere":
2✔
454
            value = {"$geoWithin": {"$centerSphere": value}}
×
455
        elif op == "geo_intersects":
2✔
456
            value = {"$geoIntersects": _infer_geometry(value)}
2✔
457
        elif op == "near":
2✔
458
            value = {"$near": _infer_geometry(value)}
2✔
459
        else:
460
            raise NotImplementedError(
×
461
                'Geo method "{}" has not been implemented for a {} '.format(
462
                    op, field._name
463
                )
464
            )
465
    return value
2✔
466

467

468
def _infer_geometry(value):
2✔
469
    """Helper method that tries to infer the $geometry shape for a
470
    given value.
471
    """
472
    if isinstance(value, dict):
2✔
473
        if "$geometry" in value:
2✔
474
            return value
2✔
475
        elif "coordinates" in value and "type" in value:
2✔
476
            return {"$geometry": value}
2✔
477
        raise InvalidQueryError(
×
478
            "Invalid $geometry dictionary should have type and coordinates keys"
479
        )
480
    elif isinstance(value, (list, set)):
2✔
481
        # TODO: shouldn't we test value[0][0][0][0] to see if it is MultiPolygon?
482

483
        try:
2✔
484
            value[0][0][0]
2✔
485
            return {"$geometry": {"type": "Polygon", "coordinates": value}}
2✔
486
        except (TypeError, IndexError):
2✔
487
            pass
2✔
488

489
        try:
2✔
490
            value[0][0]
2✔
491
            return {"$geometry": {"type": "LineString", "coordinates": value}}
2✔
492
        except (TypeError, IndexError):
2✔
493
            pass
2✔
494

495
        try:
2✔
496
            value[0]
2✔
497
            return {"$geometry": {"type": "Point", "coordinates": value}}
2✔
498
        except (TypeError, IndexError):
×
499
            pass
×
500

501
    raise InvalidQueryError(
×
502
        "Invalid $geometry data. Can be either a "
503
        "dictionary or (nested) lists of coordinate(s)"
504
    )
505

506

507
def _prepare_query_for_iterable(field, op, value):
2✔
508
    # We need a special check for BaseDocument, because - although it's iterable - using
509
    # it as such in the context of this method is most definitely a mistake.
510
    BaseDocument = _import_class("BaseDocument")
2✔
511

512
    if isinstance(value, BaseDocument):
2✔
513
        raise TypeError(
2✔
514
            "When using the `in`, `nin`, `all`, or "
515
            "`near`-operators you can't use a "
516
            "`Document`, you must wrap your object "
517
            "in a list (object -> [object])."
518
        )
519

520
    if not hasattr(value, "__iter__"):
2✔
521
        raise TypeError(
2✔
522
            "The `in`, `nin`, `all`, or "
523
            "`near`-operators must be applied to an "
524
            "iterable (e.g. a list)."
525
        )
526

527
    return [field.prepare_query_value(op, v) for v in value]
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc