• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bagerard / mongoengine / 16950450531

13 Aug 2025 10:07PM UTC coverage: 94.028% (-0.5%) from 94.481%
16950450531

push

github

bagerard
add useful cr comment for .path in install_mongo

5322 of 5660 relevant lines covered (94.03%)

1.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.67
/mongoengine/context_managers.py
1
import contextlib
2✔
2
import logging
2✔
3
import threading
2✔
4
from contextlib import contextmanager
2✔
5

6
from pymongo.errors import ConnectionFailure, OperationFailure
2✔
7
from pymongo.read_concern import ReadConcern
2✔
8
from pymongo.write_concern import WriteConcern
2✔
9

10
from mongoengine.base.fields import _no_dereference_for_fields
2✔
11
from mongoengine.common import _import_class
2✔
12
from mongoengine.connection import (
2✔
13
    DEFAULT_CONNECTION_NAME,
14
    _clear_session,
15
    _get_session,
16
    _set_session,
17
    get_connection,
18
    get_db,
19
)
20
from mongoengine.pymongo_support import count_documents
2✔
21

22
__all__ = (
2✔
23
    "switch_db",
24
    "switch_collection",
25
    "no_dereference",
26
    "no_sub_classes",
27
    "query_counter",
28
    "set_write_concern",
29
    "set_read_write_concern",
30
    "no_dereferencing_active_for_class",
31
    "run_in_transaction",
32
)
33

34

35
class MyThreadLocals(threading.local):
2✔
36
    def __init__(self):
2✔
37
        # {DocCls: count} keeping track of classes with an active no_dereference context
38
        self.no_dereferencing_class = {}
2✔
39

40

41
thread_locals = MyThreadLocals()
2✔
42

43

44
def no_dereferencing_active_for_class(cls):
2✔
45
    return cls in thread_locals.no_dereferencing_class
2✔
46

47

48
def _register_no_dereferencing_for_class(cls):
2✔
49
    thread_locals.no_dereferencing_class.setdefault(cls, 0)
2✔
50
    thread_locals.no_dereferencing_class[cls] += 1
2✔
51

52

53
def _unregister_no_dereferencing_for_class(cls):
2✔
54
    thread_locals.no_dereferencing_class[cls] -= 1
2✔
55
    if thread_locals.no_dereferencing_class[cls] == 0:
2✔
56
        thread_locals.no_dereferencing_class.pop(cls)
2✔
57

58

59
class switch_db:
2✔
60
    """switch_db alias context manager.
61

62
    Example ::
63

64
        # Register connections
65
        register_connection('default', 'mongoenginetest')
66
        register_connection('testdb-1', 'mongoenginetest2')
67

68
        class Group(Document):
69
            name = StringField()
70

71
        Group(name='test').save()  # Saves in the default db
72

73
        with switch_db(Group, 'testdb-1') as Group:
74
            Group(name='hello testdb!').save()  # Saves in testdb-1
75
    """
76

77
    def __init__(self, cls, db_alias):
2✔
78
        """Construct the switch_db context manager
79

80
        :param cls: the class to change the registered db
81
        :param db_alias: the name of the specific database to use
82
        """
83
        self.cls = cls
2✔
84
        self.collection = cls._get_collection()
2✔
85
        self.db_alias = db_alias
2✔
86
        self.ori_db_alias = cls._meta.get("db_alias", DEFAULT_CONNECTION_NAME)
2✔
87

88
    def __enter__(self):
2✔
89
        """Change the db_alias and clear the cached collection."""
90
        self.cls._meta["db_alias"] = self.db_alias
2✔
91
        self.cls._collection = None
2✔
92
        return self.cls
2✔
93

94
    def __exit__(self, t, value, traceback):
2✔
95
        """Reset the db_alias and collection."""
96
        self.cls._meta["db_alias"] = self.ori_db_alias
2✔
97
        self.cls._collection = self.collection
2✔
98

99

100
class switch_collection:
2✔
101
    """switch_collection alias context manager.
102

103
    Example ::
104

105
        class Group(Document):
106
            name = StringField()
107

108
        Group(name='test').save()  # Saves in the default db
109

110
        with switch_collection(Group, 'group1') as Group:
111
            Group(name='hello testdb!').save()  # Saves in group1 collection
112
    """
113

114
    def __init__(self, cls, collection_name):
2✔
115
        """Construct the switch_collection context manager.
116

117
        :param cls: the class to change the registered db
118
        :param collection_name: the name of the collection to use
119
        """
120
        self.cls = cls
2✔
121
        self.ori_collection = cls._get_collection()
2✔
122
        self.ori_get_collection_name = cls._get_collection_name
2✔
123
        self.collection_name = collection_name
2✔
124

125
    def __enter__(self):
2✔
126
        """Change the _get_collection_name and clear the cached collection."""
127

128
        @classmethod
2✔
129
        def _get_collection_name(cls):
2✔
130
            return self.collection_name
2✔
131

132
        self.cls._get_collection_name = _get_collection_name
2✔
133
        self.cls._collection = None
2✔
134
        return self.cls
2✔
135

136
    def __exit__(self, t, value, traceback):
2✔
137
        """Reset the collection."""
138
        self.cls._collection = self.ori_collection
2✔
139
        self.cls._get_collection_name = self.ori_get_collection_name
2✔
140

141

142
@contextlib.contextmanager
2✔
143
def no_dereference(cls):
2✔
144
    """no_dereference context manager.
145

146
    Turns off all dereferencing in Documents for the duration of the context
147
    manager::
148

149
        with no_dereference(Group):
150
            Group.objects()
151
    """
152
    try:
2✔
153
        cls = cls
2✔
154

155
        ReferenceField = _import_class("ReferenceField")
2✔
156
        GenericReferenceField = _import_class("GenericReferenceField")
2✔
157
        ComplexBaseField = _import_class("ComplexBaseField")
2✔
158

159
        deref_fields = [
2✔
160
            field
161
            for name, field in cls._fields.items()
162
            if isinstance(
163
                field, (ReferenceField, GenericReferenceField, ComplexBaseField)
164
            )
165
        ]
166

167
        _register_no_dereferencing_for_class(cls)
2✔
168

169
        with _no_dereference_for_fields(*deref_fields):
2✔
170
            yield None
2✔
171
    finally:
172
        _unregister_no_dereferencing_for_class(cls)
2✔
173

174

175
class no_sub_classes:
2✔
176
    """no_sub_classes context manager.
177

178
    Only returns instances of this class and no sub (inherited) classes::
179

180
        with no_sub_classes(Group) as Group:
181
            Group.objects.find()
182
    """
183

184
    def __init__(self, cls):
2✔
185
        """Construct the no_sub_classes context manager.
186

187
        :param cls: the class to turn querying subclasses on
188
        """
189
        self.cls = cls
2✔
190
        self.cls_initial_subclasses = None
2✔
191

192
    def __enter__(self):
2✔
193
        """Change the objects default and _auto_dereference values."""
194
        self.cls_initial_subclasses = self.cls._subclasses
2✔
195
        self.cls._subclasses = (self.cls._class_name,)
2✔
196
        return self.cls
2✔
197

198
    def __exit__(self, t, value, traceback):
2✔
199
        """Reset the default and _auto_dereference values."""
200
        self.cls._subclasses = self.cls_initial_subclasses
2✔
201

202

203
class query_counter:
2✔
204
    """Query_counter context manager to get the number of queries.
205
    This works by updating the `profiling_level` of the database so that all queries get logged,
206
    resetting the db.system.profile collection at the beginning of the context and counting the new entries.
207

208
    This was designed for debugging purpose. In fact it is a global counter so queries issued by other threads/processes
209
    can interfere with it
210

211
    Usage:
212

213
    .. code-block:: python
214

215
        class User(Document):
216
            name = StringField()
217

218
        with query_counter() as q:
219
            user = User(name='Bob')
220
            assert q == 0       # no query fired yet
221
            user.save()
222
            assert q == 1       # 1 query was fired, an 'insert'
223
            user_bis = User.objects().first()
224
            assert q == 2       # a 2nd query was fired, a 'find_one'
225

226
    Be aware that:
227

228
    - Iterating over large amount of documents (>101) makes pymongo issue `getmore` queries to fetch the next batch of documents (https://www.mongodb.com/docs/manual/tutorial/iterate-a-cursor/#cursor-batches)
229
    - Some queries are ignored by default by the counter (killcursors, db.system.indexes)
230
    """
231

232
    def __init__(self, alias=DEFAULT_CONNECTION_NAME):
2✔
233
        self.db = get_db(alias=alias)
2✔
234
        self.initial_profiling_level = None
2✔
235
        self._ctx_query_counter = 0  # number of queries issued by the context
2✔
236

237
        self._ignored_query = {
2✔
238
            "ns": {"$ne": "%s.system.indexes" % self.db.name},
239
            "op": {"$ne": "killcursors"},  # MONGODB < 3.2
240
            "command.killCursors": {"$exists": False},  # MONGODB >= 3.2
241
        }
242

243
    def _turn_on_profiling(self):
2✔
244
        profile_update_res = self.db.command({"profile": 0}, session=_get_session())
2✔
245
        self.initial_profiling_level = profile_update_res["was"]
2✔
246

247
        self.db.system.profile.drop()
2✔
248
        self.db.command({"profile": 2}, session=_get_session())
2✔
249

250
    def _resets_profiling(self):
2✔
251
        self.db.command({"profile": self.initial_profiling_level})
2✔
252

253
    def __enter__(self):
2✔
254
        self._turn_on_profiling()
2✔
255
        return self
2✔
256

257
    def __exit__(self, t, value, traceback):
2✔
258
        self._resets_profiling()
2✔
259

260
    def __eq__(self, value):
2✔
261
        counter = self._get_count()
2✔
262
        return value == counter
2✔
263

264
    def __ne__(self, value):
2✔
265
        return not self.__eq__(value)
2✔
266

267
    def __lt__(self, value):
2✔
268
        return self._get_count() < value
2✔
269

270
    def __le__(self, value):
2✔
271
        return self._get_count() <= value
2✔
272

273
    def __gt__(self, value):
2✔
274
        return self._get_count() > value
2✔
275

276
    def __ge__(self, value):
2✔
277
        return self._get_count() >= value
2✔
278

279
    def __int__(self):
2✔
280
        return self._get_count()
2✔
281

282
    def __repr__(self):
2✔
283
        """repr query_counter as the number of queries."""
284
        return "%s" % self._get_count()
2✔
285

286
    def _get_count(self):
2✔
287
        """Get the number of queries by counting the current number of entries in db.system.profile
288
        and substracting the queries issued by this context. In fact everytime this is called, 1 query is
289
        issued so we need to balance that
290
        """
291
        count = (
2✔
292
            count_documents(self.db.system.profile, self._ignored_query)
293
            - self._ctx_query_counter
294
        )
295
        self._ctx_query_counter += (
2✔
296
            1  # Account for the query we just issued to gather the information
297
        )
298
        return count
2✔
299

300

301
@contextmanager
2✔
302
def set_write_concern(collection, write_concerns):
2✔
303
    combined_concerns = dict(collection.write_concern.document.items())
2✔
304
    combined_concerns.update(write_concerns)
2✔
305
    yield collection.with_options(write_concern=WriteConcern(**combined_concerns))
2✔
306

307

308
@contextmanager
2✔
309
def set_read_write_concern(collection, write_concerns, read_concerns):
2✔
310
    combined_write_concerns = dict(collection.write_concern.document.items())
2✔
311

312
    if write_concerns is not None:
2✔
313
        combined_write_concerns.update(write_concerns)
2✔
314

315
    combined_read_concerns = dict(collection.read_concern.document.items())
2✔
316

317
    if read_concerns is not None:
2✔
318
        combined_read_concerns.update(read_concerns)
2✔
319

320
    yield collection.with_options(
2✔
321
        write_concern=WriteConcern(**combined_write_concerns),
322
        read_concern=ReadConcern(**combined_read_concerns),
323
    )
324

325

326
def _commit_with_retry(session):
2✔
327
    while True:
328
        try:
2✔
329
            # Commit uses write concern set at transaction start.
330
            session.commit_transaction()
2✔
331
            break
2✔
332
        except (ConnectionFailure, OperationFailure) as exc:
×
333
            # Can retry commit
334
            if exc.has_error_label("UnknownTransactionCommitResult"):
×
335
                logging.warning(
×
336
                    "UnknownTransactionCommitResult, retrying commit operation ..."
337
                )
338
                continue
×
339
            else:
340
                # Error during commit
341
                raise
×
342

343

344
@contextmanager
2✔
345
def run_in_transaction(
2✔
346
    alias=DEFAULT_CONNECTION_NAME, session_kwargs=None, transaction_kwargs=None
347
):
348
    """run_in_transaction context manager
349
    Execute queries within the context in a database transaction.
350

351
    Usage:
352

353
    .. code-block:: python
354

355
        class A(Document):
356
            name = StringField()
357

358
        with run_in_transaction():
359
            a_doc = A.objects.create(name="a")
360
            a_doc.update(name="b")
361

362
    Be aware that:
363
    - Mongo transactions run inside a session which is bound to a connection. If you attempt to
364
      execute a transaction across a different connection alias, pymongo will raise an exception. In
365
      other words: you cannot create a transaction that crosses different database connections. That
366
      said, multiple transaction can be nested within the same session for particular connection.
367

368
    For more information regarding pymongo transactions: https://pymongo.readthedocs.io/en/stable/api/pymongo/client_session.html#transactions
369
    """
370
    conn = get_connection(alias)
2✔
371
    session_kwargs = session_kwargs or {}
2✔
372
    with conn.start_session(**session_kwargs) as session:
2✔
373
        transaction_kwargs = transaction_kwargs or {}
2✔
374
        with session.start_transaction(**transaction_kwargs):
2✔
375
            try:
2✔
376
                _set_session(session)
2✔
377
                yield
2✔
378
                _commit_with_retry(session)
2✔
379
            finally:
380
                _clear_session()
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc