• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

netzkolchose / django-computedfields / 16457217276

22 Jul 2025 10:50PM UTC coverage: 95.327% (+0.1%) from 95.192%
16457217276

Pull #189

github

web-flow
Merge ae68d52da into f018eaa24
Pull Request #189: autorecover for not_computed context

533 of 574 branches covered (92.86%)

Branch coverage included in aggregate %.

134 of 136 new or added lines in 4 files covered. (98.53%)

5 existing lines in 1 file now uncovered.

1262 of 1309 relevant lines covered (96.41%)

11.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.57
/computedfields/graph.py
1
"""
2
Module containing the graph logic for the dependency resolver.
3

4
Upon application initialization a dependency graph of all project wide
5
computed fields is created. The graph does a basic cycle check and
6
removes redundant dependencies. Finally the dependencies are translated
7
to the resolver map to be used later by ``update_dependent`` and in
8
the signal handlers.
9
"""
10
from os import PathLike
12✔
11
from django.core.exceptions import FieldDoesNotExist
12✔
12
from django.db.models import ForeignKey, ManyToOneRel
12✔
13
from computedfields.helpers import pairwise, modelname, parent_to_inherited_path, skip_equal_segments
12✔
14
from collections import defaultdict
12✔
15

16

17
# typing imports
18
from typing import (Callable, Dict, FrozenSet, Generic, Hashable, Any, List, Optional, Sequence,
12✔
19
                    Set, Tuple, TypeVar, Type, Union, cast)
20
from typing_extensions import TypedDict
12✔
21
from django.db.models import Model, Field
12✔
22

23

24
_ST = TypeVar("_ST", contravariant=True)
12✔
25
_GT = TypeVar("_GT", covariant=True)
12✔
26
F = TypeVar('F', bound=Callable[..., Any])
12✔
27

28

29
# depends interface
30
IDepends = Sequence[Tuple[str, Sequence[str]]]
12✔
31
IDependsAppend = List[Tuple[str, Sequence[str]]]
12✔
32

33
# _computed container
34
class IComputedData(TypedDict):
12✔
35
    depends: IDepends
12✔
36
    func: Callable[[Model], Any]
12✔
37
    select_related: Sequence[str]
12✔
38
    prefetch_related: Sequence[Any]
12✔
39
    querysize: Optional[int]
12✔
40
    default_on_create: Optional[bool]
12✔
41

42

43
# django Field type extended by our _computed data attribute
44
class IComputedField(Field, Generic[_ST, _GT]):
12✔
45
    _computed: IComputedData
12✔
46
    creation_counter: int
12✔
47

48

49
class ICycleData(TypedDict):
12✔
50
    entries: Set['Edge']
12✔
51
    path: List['Edge']
12✔
52

53

54
class IDependsData(TypedDict):
12✔
55
    path: str
12✔
56
    depends: str
12✔
57

58

59
class ILocalMroData(TypedDict):
12✔
60
    base: List[str]
12✔
61
    fields: Dict[str, int]
12✔
62

63

64
class IM2mData(TypedDict):
12✔
65
    left: str
12✔
66
    right: str
12✔
67
IM2mMap = Dict[Type[Model], IM2mData]
12✔
68

69

70
class IChange(TypedDict):
12✔
71
    pks: Set[Any]
12✔
72
    fields: Optional[Set[str]]
12✔
73
IRecorded = Dict[Type[Model], IChange]
12✔
74

75
IModelUpdate = Dict[Type[Model], Tuple[Set[str], Set[str]]]
12✔
76
IModelUpdateCache = Dict[Type[Model], Dict[Optional[FrozenSet[str]], IModelUpdate]]
12✔
77

78

79
# global deps: {cfModel: {cfname: {srcModel: {'path': lookup_path, 'depends': src_fieldname}}}}
80
IGlobalDeps = Dict[Type[Model], Dict[str, Dict[Type[Model], List[IDependsData]]]]
12✔
81
# local deps: {Model: {'cfname': {'depends', 'on', 'these', 'local', 'fieldnames'}}}
82
ILocalDeps = Dict[Type[Model], Dict[str, Set[str]]]
12✔
83
# cleaned: {(cfmodel, cfname): {(srcmodel, fname), ...}}
84
IGlobalDepsCleaned = Dict[Tuple[str, str], Set[Tuple[str, str]]]
12✔
85
IInterimTable = Dict[Type[Model], Dict[str, Dict[Type[Model], Dict[str, List[IDependsData]]]]]
12✔
86

87
# maps exported to resolver
88
# LookupMap: {srcModel: {srcfield: {cfModel, ({querystrings}, {cfields})}}}
89
ILookupMap = Dict[Type[Model], Dict[str, Dict[Type[Model], Tuple[Set[str], Set[str]]]]]
12✔
90
# fk map: {Model: {fkname, ...}}
91
IFkMap = Dict[Type[Model], Set[str]]
12✔
92
# local MRO: {Model: {'base': [mro of all fields], 'fields': {fname: bitarray into base}}}
93
ILocalMroMap = Dict[Type[Model], ILocalMroData]
12✔
94

95

96
class IResolvedDeps(TypedDict):
12✔
97
    globalDeps: IGlobalDeps
12✔
98
    localDeps: ILocalDeps
12✔
99

100

101
class ComputedFieldsException(Exception):
12✔
102
    """
103
    Base exception raised from computed fields.
104
    """
105

106

107
class CycleException(ComputedFieldsException):
12✔
108
    """
109
    Exception raised during path linearization, if a cycle was found.
110
    Contains the found cycle either as list of edges or nodes in
111
    ``args``.
112
    """
113

114

115
class CycleEdgeException(CycleException):
12✔
116
    """
117
    Exception raised during path linearization, if a cycle was found.
118
    Contains the found cycle as list of edges in ``args``.
119
    """
120

121

122
class CycleNodeException(CycleException):
12✔
123
    """
124
    Exception raised during path linearization, if a cycle was found.
125
    Contains the found cycle as list of nodes in ``args``.
126
    """
127

128

129
class Edge:
12✔
130
    """
131
    Class for representing an edge in ``Graph``.
132
    The instances are created as singletons,
133
    calling ``Edge('A', 'B')`` multiple times
134
    will always point to the same object.
135
    """
136
    instances: Dict[Hashable, 'Edge'] = {}
12✔
137

138
    def __new__(cls, *args):
12✔
139
        key: Tuple['Node', 'Node'] = (args[0], args[1])
12✔
140
        if key in cls.instances:
12✔
141
            return cls.instances[key]
12✔
142
        instance = super(Edge, cls).__new__(cls)
12✔
143
        cls.instances[key] = instance
12✔
144
        return instance
12✔
145

146
    def __init__(self, left: 'Node', right: 'Node', data: Optional[Any] = None):
12✔
147
        self.left = left
12✔
148
        self.right = right
12✔
149
        self.data = data
12✔
150

151
    def __str__(self) -> str:
12✔
152
        return f'Edge {self.left}-{self.right}'
12✔
153

154
    def __repr__(self) -> str:
12✔
155
        return str(self)
12✔
156

157
    def __eq__(self, other) -> bool:
12✔
158
        return self is other
12✔
159

160
    def __ne__(self, other) -> bool:
12✔
161
        return self is not other
12✔
162

163
    def __hash__(self) -> int:
12✔
164
        return id(self)
12✔
165

166

167
class Node:
12✔
168
    """
169
    Class for representing a node in ``Graph``.
170
    The instances are created as singletons,
171
    calling ``Node('A')`` multiple times will
172
    always point to the same object.
173
    """
174
    instances: Dict[Hashable, 'Node'] = {}
12✔
175

176
    def __new__(cls, *args):
12✔
177
        if args[0] in cls.instances:
12✔
178
            return cls.instances[args[0]]
12✔
179
        instance = super(Node, cls).__new__(cls)
12✔
180
        cls.instances[args[0]] = instance
12✔
181
        return instance
12✔
182

183
    def __init__(self, data: Any):
12✔
184
        self.data = data
12✔
185

186
    def __str__(self) -> str:
12✔
187
        return self.data if isinstance(self.data, str) else '.'.join(self.data)
12✔
188

189
    def __repr__(self) -> str:
12✔
190
        return str(self)
12✔
191

192
    def __eq__(self, other) -> bool:
12✔
193
        return self is other
12✔
194

195
    def __ne__(self, other) -> bool:
12✔
UNCOV
196
        return self is not other
×
197

198
    def __hash__(self) -> int:
12✔
199
        return id(self)
12✔
200

201

202
class Graph:
12✔
203
    """
204
    .. _graph:
205

206
    Simple directed graph implementation.
207
    """
208

209
    def __init__(self):
12✔
210
        self.nodes: Set[Node] = set()
12✔
211
        self.edges: Set[Edge] = set()
12✔
212
        self._removed: Set[Edge] = set()
12✔
213

214
    def add_node(self, node: Node) -> None:
12✔
215
        """Add a node to the graph."""
UNCOV
216
        self.nodes.add(node)
×
217

218
    def remove_node(self, node: Node) -> None:
12✔
219
        """
220
        Remove a node from the graph.
221

222
        .. WARNING::
223
            Removing edges containing the removed node
224
            is not implemented.
225
        """
226
        self.nodes.remove(node)
12✔
227

228
    def add_edge(self, edge: Edge) -> None:
12✔
229
        """
230
        Add an edge to the graph.
231
        Automatically inserts the associated nodes.
232
        """
233
        self.edges.add(edge)
12✔
234
        self.nodes.add(edge.left)
12✔
235
        self.nodes.add(edge.right)
12✔
236

237
    def remove_edge(self, edge: Edge) -> None:
12✔
238
        """
239
        Removes an edge from the graph.
240

241
        .. WARNING::
242
            Does not remove leftover contained nodes.
243
        """
244
        self.edges.remove(edge)
12✔
245

246
    def get_dot(
12✔
247
            self,
248
            format: str = 'pdf',
249
            mark_edges: Optional[Dict[Edge, Dict[str, Any]]] = None,
250
            mark_nodes: Optional[Dict[Node, Dict[str, Any]]] = None
251
    ):
252
        """
253
        Returns the graphviz object of the graph
254
        (needs the :mod:`graphviz` package).
255
        """
256
        from graphviz import Digraph
12✔
257
        if not mark_edges:
12✔
258
            mark_edges = {}
12✔
259
        if not mark_nodes:
12!
260
            mark_nodes = {}
12✔
261
        dot = Digraph(format=format)
12✔
262
        for node in self.nodes:
12✔
263
            dot.node(str(node), str(node), **mark_nodes.get(node, {}))
12✔
264
        for edge in self.edges:
12✔
265
            dot.edge(str(edge.left), str(edge.right), **mark_edges.get(edge, {}))
12✔
266
        return dot
12✔
267

268
    def render(
12✔
269
            self,
270
            filename: Optional[Union[PathLike, str]] = None,
271
            format: str = 'pdf',
272
            mark_edges: Optional[Dict[Edge, Dict[str, Any]]] = None,
273
            mark_nodes: Optional[Dict[Node, Dict[str, Any]]] = None
274
    ) -> None:
275
        """
276
        Renders the graph to file (needs the :mod:`graphviz` package).
277
        """
278
        self.get_dot(format, mark_edges, mark_nodes).render(
12✔
279
            filename=filename, cleanup=True)
280

281
    def view(
12✔
282
            self,
283
            format: str = 'pdf',
284
            mark_edges: Optional[Dict[Edge, Dict[str, Any]]] = None,
285
            mark_nodes: Optional[Dict[Node, Dict[str, Any]]] = None
286
    ) -> None:
287
        """
288
        Directly opens the graph in the associated desktop viewer
289
        (needs the :mod:`graphviz` package).
290
        """
UNCOV
291
        self.get_dot(format, mark_edges, mark_nodes).view(cleanup=True)
×
292

293
    @staticmethod
12✔
294
    def edgepath_to_nodepath(path: Sequence[Edge]) -> List[Node]:
12✔
295
        """
296
        Converts a list of edges to a list of nodes.
297
        """
298
        return [edge.left for edge in path] + [path[-1].right]
12✔
299

300
    @staticmethod
12✔
301
    def nodepath_to_edgepath(path: Sequence[Node]) -> List[Edge]:
12✔
302
        """
303
        Converts a list of nodes to a list of edges.
304
        """
305
        return [Edge(*pair) for pair in pairwise(path)]
12✔
306

307
    def _get_edge_paths(
12✔
308
            self,
309
            edge: Edge,
310
            left_edges: Dict[Node, List[Edge]],
311
            paths: List[List[Edge]],
312
            seen: Optional[List[Edge]] = None
313
    ) -> None:
314
        """
315
        Walks the graph recursively to get all possible paths.
316
        Might raise a ``CycleEdgeException``.
317
        """
318
        if not seen:
12✔
319
            seen = []
12✔
320
        if edge in seen:
12✔
321
            raise CycleEdgeException(seen[seen.index(edge):])
12✔
322
        seen.append(edge)
12✔
323
        if edge.right in left_edges:
12✔
324
            for new_edge in left_edges[edge.right]:
12✔
325
                self._get_edge_paths(new_edge, left_edges, paths, seen[:])
12✔
326
        paths.append(seen)
12✔
327

328
    def get_edgepaths(self) -> List[List[Edge]]:
12✔
329
        """
330
        Returns a list of all edge paths.
331
        An edge path is represented as list of edges.
332

333
        Might raise a ``CycleEdgeException``. For in-depth cycle detection
334
        use ``edge_cycles``, `node_cycles`` or ``get_cycles()``.
335
        """
336
        left_edges: Dict[Node, List[Edge]] = defaultdict(list)
12✔
337
        paths: List[List[Edge]] = []
12✔
338
        for edge in self.edges:
12✔
339
            left_edges[edge.left].append(edge)
12✔
340
        for edge in self.edges:
12✔
341
            self._get_edge_paths(edge, left_edges, paths)
12✔
342
        return paths
12✔
343

344
    def get_nodepaths(self) -> List[List[Node]]:
12✔
345
        """
346
        Returns a list of all node paths.
347
        A node path is represented as list of nodes.
348

349
        Might raise a ``CycleNodeException``. For in-depth cycle detection
350
        use ``edge_cycles``, ``node_cycles`` or ``get_cycles()``.
351
        """
352
        try:
12✔
353
            paths: List[List[Edge]] = self.get_edgepaths()
12✔
354
        except CycleEdgeException as exc:
12✔
355
            raise CycleNodeException(self.edgepath_to_nodepath(exc.args[0]))
12✔
356
        node_paths: List[List[Node]] = []
12✔
357
        for path in paths:
12✔
358
            node_paths.append(self.edgepath_to_nodepath(path))
12✔
359
        return node_paths
12✔
360

361
    def _get_cycles(
12✔
362
            self,
363
            edge: Edge,
364
            left_edges: Dict[Node, List[Edge]],
365
            cycles: Dict[FrozenSet[Edge], ICycleData],
366
            seen: Optional[List[Edge]] = None
367
    ) -> None:
368
        """
369
        Walks the graph to collect all cycles.
370
        """
371
        if not seen:
12✔
372
            seen = []
12✔
373
        if edge in seen:
12✔
374
            cycle: FrozenSet[Edge] = frozenset(seen[seen.index(edge):])
12✔
375
            data: ICycleData = cycles[cycle]
12✔
376
            if seen:
12!
377
                data['entries'].add(seen[0])
12✔
378
            data['path'] = seen[seen.index(edge):]
12✔
379
            return
12✔
380
        seen.append(edge)
12✔
381
        if edge.right in left_edges:
12✔
382
            for new_edge in left_edges[edge.right]:
12✔
383
                self._get_cycles(new_edge, left_edges, cycles, seen[:])
12✔
384

385
    def get_cycles(self) -> Dict[FrozenSet[Edge], ICycleData]:
12✔
386
        """
387
        Gets all cycles in graph.
388

389
        This is not optimised by any means, it simply walks the whole graph
390
        recursively and aborts as soon a seen edge gets entered again.
391
        Therefore use this and all dependent properties
392
        (``edge_cycles`` and ``node_cycles``) for in-depth cycle inspection
393
        only.
394

395
        As a start node any node on the left side of an edge will be tested.
396

397
        Returns a mapping of
398

399
        .. code:: python
400

401
            {frozenset(<cycle edges>): {
402
                'entries': set(edges leading to the cycle),
403
                'path': list(cycle edges in last seen order)
404
            }}
405

406
        An edge in ``entries`` is not necessarily part of the cycle itself,
407
        but once entered it will lead to the cycle.
408
        """
409
        left_edges: Dict[Node, List[Edge]] = defaultdict(list)
12✔
410
        cycles: Dict[FrozenSet[Edge], ICycleData] = defaultdict(lambda: {'entries': set(), 'path': []})
12✔
411
        for edge in self.edges:
12✔
412
            left_edges[edge.left].append(edge)
12✔
413
        for edge in self.edges:
12✔
414
            self._get_cycles(edge, left_edges, cycles)
12✔
415
        return cycles
12✔
416

417
    @property
12✔
418
    def edge_cycles(self) -> List[List[Edge]]:
12✔
419
        """
420
        Returns all cycles as list of edge lists.
421
        Use this only for in-depth cycle inspection.
422
        """
423
        return [cycle['path'] for cycle in self.get_cycles().values()]
12✔
424

425
    @property
12✔
426
    def node_cycles(self) -> List[List[Node]]:
12✔
427
        """
428
        Returns all cycles as list of node lists.
429
        Use this only for in-depth cycle inspection.
430
        """
431
        return [self.edgepath_to_nodepath(cycle['path'])
12✔
432
                for cycle in self.get_cycles().values()]
433

434
    @property
12✔
435
    def is_cyclefree(self) -> bool:
12✔
436
        """
437
        True if the graph contains no cycles.
438

439
        For faster calculation this property relies on
440
        path linearization instead of the more expensive
441
        full cycle detection. For in-depth cycle inspection
442
        use ``edge_cycles`` or ``node_cycles`` instead.
443
        """
444
        try:
12✔
445
            self.get_edgepaths()
12✔
446
            return True
12✔
447
        except CycleEdgeException:
12✔
448
            return False
12✔
449

450

451
class ComputedModelsGraph(Graph):
12✔
452
    """
453
    Class to convert the computed fields dependency strings into
454
    a graph and generate the final resolver functions.
455

456
    Steps taken:
457

458
    - ``resolve_dependencies`` resolves the depends field strings
459
      to real model fields.
460
    - The dependencies are rearranged to adjacency lists for
461
      the underlying graph.
462
    - The graph does a cycle check and removes redundant edges
463
      to lower the database penalty.
464
    - In ``generate_lookup_map`` the path segments of remaining edges
465
      are collected into the final lookup map.
466
    """
467

468
    def __init__(self, computed_models: Dict[Type[Model], Dict[str, IComputedField]]):
12✔
469
        """
470
        ``computed_models`` is ``Resolver.computed_models``.
471
        """
472
        super(ComputedModelsGraph, self).__init__()
12✔
473
        self._m2m: IM2mMap = {}
12✔
474
        self._computed_models: Dict[Type[Model], Dict[str, IComputedField]] = computed_models
12✔
475
        self.models: Dict[str, Type[Model]] = {}
12✔
476
        self.resolved: IResolvedDeps = self.resolve_dependencies(computed_models)
12✔
477
        self.cleaned_data: IGlobalDepsCleaned = self._clean_data(self.resolved['globalDeps'])
12✔
478
        self._insert_data(self.cleaned_data)
12✔
479
        self.modelgraphs: Dict[Type[Model], ModelGraph] = {}
12✔
480
        self.union: Optional[Graph] = None
12✔
481

482
    def _right_constrain(self, model: Type[Model], fieldname: str) -> None:
12✔
483
        """
484
        Sanity check for right side field types.
485

486
        On the right side of a `depends` rule only real database fields should occur,
487
        that are fieldnames that can safely be used with `update_fields` in ``save``.
488
        This includes any concrete fields (also computed fields itself), but not m2m fields.
489
        """
490
        f = model._meta.get_field(fieldname)
12✔
491
        if not f.concrete or f.many_to_many:
12!
UNCOV
492
            raise ComputedFieldsException(f"{model} has no concrete field named '{fieldname}'")
×
493
    
494
    def _expand_m2m(self, model: Type[Model], path: str) -> str:
12✔
495
        """
496
        Expand M2M dependencies into through model.
497
        """
498
        cls: Type[Model] = model
12✔
499
        symbols: list[str] = []
12✔
500
        for symbol in path.split('.'):
12✔
501
            try:
12✔
502
                rel: Any = cls._meta.get_field(symbol)
12✔
503
            except FieldDoesNotExist:
12✔
504
                descriptor = getattr(cls, symbol)
12✔
505
                rel = getattr(descriptor, 'rel', None) or getattr(descriptor, 'related')
12✔
506
            if rel.many_to_many:
12✔
507
                if hasattr(rel, 'through'):
12✔
508
                    through = rel.through
12✔
509
                    m2m_field_name = rel.remote_field.m2m_field_name()
12✔
510
                    m2m_reverse_field_name = rel.remote_field.m2m_reverse_field_name()
12✔
511
                    symbols.append(through._meta.get_field(m2m_reverse_field_name).related_query_name())
12✔
512
                    symbols.append(m2m_field_name)
12✔
513
                else:
514
                    through = rel.remote_field.through
12✔
515
                    m2m_field_name = rel.m2m_field_name()
12✔
516
                    m2m_reverse_field_name = rel.m2m_reverse_field_name()
12✔
517
                    symbols.append(through._meta.get_field(m2m_field_name).related_query_name())
12✔
518
                    symbols.append(m2m_reverse_field_name)
12✔
519
                self._m2m[through] = {
12✔
520
                    'left': m2m_field_name,
521
                    'right': m2m_reverse_field_name
522
                }
523
            else:
524
                symbols.append(symbol)
12✔
525
            cls = rel.related_model
12✔
526
        return '.'.join(symbols)
12✔
527

528
    def resolve_dependencies(
12✔
529
        self,
530
        computed_models: Dict[Type[Model], Dict[str, IComputedField]]
531
    ) -> IResolvedDeps:
532
        """
533
        Converts `depends` rules into ORM lookups and checks the source fields' existance.
534

535
        Basic `depends` rules:
536
        - left side may contain any relation path accessible from an instance as ``'a.b.c'``
537
        - right side may contain real database source fields (never relations)
538

539
        Deeper nested relations get automatically added to the resolver map:
540

541
        - fk relations are added on the model holding the fk field
542
        - reverse fk relations are added on related model holding the fk field
543
        - m2m fields are expanded via their through model
544
        """
545
        global_deps: IGlobalDeps = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))
12✔
546
        local_deps: ILocalDeps = defaultdict(lambda: defaultdict(set))
12✔
547
        for model, fields in computed_models.items():
12✔
548
            # skip proxy models for graph handling,
549
            # deps get patched at runtime from resolved real models
550
            if model._meta.proxy:
12✔
551
                continue
12✔
552
            local_deps[model]   # always add to local to get a result for MRO
12✔
553
            for field, real_field in fields.items():
12✔
554
                fieldentry = global_deps[model][field]
12✔
555
                local_deps[model][field]
12✔
556

557
                depends: IDepends = real_field._computed['depends']
12✔
558

559
                # fields contributed from multi table model inheritance need patched depends rules,
560
                # so the relation paths match the changed model entrypoint
561
                if real_field.model != model and not real_field.model._meta.abstract:
12✔
562
                    # path from original model to current inherited
563
                    # these path segments have to be removed from depends
564
                    remove_segments = parent_to_inherited_path(real_field.model, model)
12✔
565
                    if not remove_segments:
12!
UNCOV
566
                        raise ComputedFieldsException(f'field {real_field} cannot be mapped on model {model}')
×
567

568
                    # paths starting with these segments belong to other derived models
569
                    # and get skipped for the dep tree creation on the current model
570
                    # allows depending on fields of derived models in a computed field on the parent model
571
                    # ("up-pulling", make sure your method handles the attribute access correctly)
572
                    skip_paths: List[str] = []
12✔
573
                    for rel1 in real_field.model._meta.related_objects:
12✔
574
                        if rel1.name != remove_segments[0]:
12✔
575
                            skip_paths.append(rel1.name)
12✔
576

577
                    # do a full rewrite of depends entry
578
                    depends_overwrite: IDependsAppend = []
12✔
579
                    for path, fieldnames in depends:
12✔
580
                        ps = path.split('.')
12✔
581
                        if ps[0] in skip_paths:
12✔
582
                            continue
12✔
583
                        path = '.'.join(skip_equal_segments(ps, remove_segments)) or 'self'
12✔
584
                        depends_overwrite.append((path, fieldnames[:]))
12✔
585
                    depends = depends_overwrite
12✔
586

587
                for path, fieldnames in depends:
12✔
588
                    if path == 'self':
12✔
589
                        # skip selfdeps in global graph handling
590
                        # we handle it instead on model graph level
591
                        # do at least an existance check here to provide an early error
592
                        for fieldname in fieldnames:
12✔
593
                            self._right_constrain(model, fieldname)
12✔
594
                        local_deps[model][field].update(fieldnames)
12✔
595
                        continue
12✔
596

597
                    # expand m2m into through model
598
                    path = self._expand_m2m(model, path)
12✔
599

600
                    path_segments: List[str] = []
12✔
601
                    cls: Type[Model] = model
12✔
602
                    for symbol in path.split('.'):
12✔
603
                        try:
12✔
604
                            rel: Any = cls._meta.get_field(symbol)
12✔
605
                            if isinstance(rel, ManyToOneRel):
12✔
606
                                symbol = rel.field.related_query_name()
12✔
607
                                # add dependency to reverse relation field as well
608
                                # this needs to be added in opposite direction on related model
609
                                path_segments.append(symbol)
12✔
610
                                fieldentry[cast(Type[Model], rel.related_model)].append(
12✔
611
                                    {'path': '__'.join(path_segments), 'depends': rel.field.name})
612
                                path_segments.pop()
12✔
613
                        except FieldDoesNotExist:
12✔
614
                            # handle reverse relation (not a concrete field)
615
                            descriptor = getattr(cls, symbol)
12✔
616
                            rel = getattr(descriptor, 'rel', None) or getattr(descriptor, 'related')
12✔
617
                            symbol = rel.field.related_query_name()
12✔
618
                            # add dependency to reverse relation field as well
619
                            # this needs to be added in opposite direction on related model
620
                            path_segments.append(symbol)
12✔
621
                            fieldentry[rel.related_model].append(
12✔
622
                                {'path': '__'.join(path_segments), 'depends': rel.field.name})
623
                            path_segments.pop()
12✔
624
                        # add path segment to self deps if we have an fk field on a CFM
625
                        # this is needed to correctly propagate direct fk changes
626
                        # in local cf mro later on
627
                        if isinstance(rel, ForeignKey) and cls in computed_models:
12✔
628
                            self._right_constrain(cls, symbol)
12✔
629
                            local_deps[cls][field].add(symbol)
12✔
630
                        if path_segments:
12✔
631
                            # add segment to intermodel graph deps
632
                            fieldentry[cls].append(
12✔
633
                                {'path': '__'.join(path_segments), 'depends': symbol})
634
                        path_segments.append(symbol)
12✔
635
                        cls = cast(Type[Model], rel.related_model)
12✔
636
                    for target_field in fieldnames:
12✔
637
                        self._right_constrain(cls, target_field)
12✔
638
                        fieldentry[cls].append(
12✔
639
                            {'path': '__'.join(path_segments), 'depends': target_field})
640
        return {'globalDeps': global_deps, 'localDeps': local_deps}
12✔
641

642
    def _clean_data(self, data: IGlobalDeps) -> IGlobalDepsCleaned:
12✔
643
        """
644
        Converts the global dependency data into an adjacency list tree
645
        to be used with the underlying graph.
646
        """
647
        cleaned: IGlobalDepsCleaned = defaultdict(set)
12✔
648
        for model, fielddata in data.items():
12✔
649
            self.models[modelname(model)] = model
12✔
650
            for field, modeldata in fielddata.items():
12✔
651
                for depmodel, relations in modeldata.items():
12✔
652
                    self.models[modelname(depmodel)] = depmodel
12✔
653
                    for dep in relations:
12✔
654
                        key = (modelname(depmodel), dep['depends'])
12✔
655
                        value = (modelname(model), field)
12✔
656
                        cleaned[key].add(value)
12✔
657
        return cleaned
12✔
658

659
    def _insert_data(self, data: IGlobalDepsCleaned) -> None:
12✔
660
        """
661
        Adds edges in ``data`` to the graph.
662
        Data must be an adjacency mapping like
663
        ``{left: set(right neighbours)}``.
664
        """
665
        for left, value in data.items():
12✔
666
            for right in value:
12✔
667
                edge = Edge(Node(left), Node(right))
12✔
668
                self.add_edge(edge)
12✔
669

670
    def _get_fk_fields(self, model: Type[Model], paths: Set[str]) -> Set[str]:
12✔
671
        """
672
        Reduce field name dependencies in paths to reverse real local fk fields.
673
        """
674
        candidates = set(el.split('__')[-1] for el in paths)
12✔
675
        result: Set[str] = set()
12✔
676
        for field in model._meta.get_fields():
12✔
677
            if isinstance(field, ForeignKey) and field.related_query_name() in candidates:
12✔
678
                result.add(field.name)
12✔
679
        return result
12✔
680

681
    def _resolve(self, data: Dict[str, List[IDependsData]]) -> Tuple[Set[str], Set[str]]:
12✔
682
        """
683
        Helper to merge querystring paths for lookup map.
684
        """
685
        fields: Set[str] = set()
12✔
686
        strings: Set[str] = set()
12✔
687
        for field, dependencies in data.items():
12✔
688
            fields.add(field)
12✔
689
            for dep in dependencies:
12✔
690
                strings.add(dep['path'])
12✔
691
        return fields, strings
12✔
692

693
    def generate_maps(self) -> Tuple[ILookupMap, IFkMap]:
12✔
694
        """
695
        Generates the final lookup map and the fk map.
696

697
        Schematically the lookup map is a reversed adjacency list of every source model
698
        with its fields mapping to the target models with computed fields it would
699
        update through a certain filter string::
700

701
            src_model:[src_field, ...] --> target_model:[(cf_field, filter_string), ...]
702

703
        During runtime ``update_dependent`` will use the the information to create
704
        select querysets on the target_models (roughly):
705

706
        .. code:: python
707

708
            qs = target_model._base_manager.filter(filter_string=src_model.instance)
709
            qs |= target_model._base_manager.filter(filter_string2=src_model.instance)
710
            ...
711
            bulk_updater(qs, cf_fields)
712

713
        The fk map list all models with fk fieldnames, that contribute to computed fields.
714

715
        Returns a tuple of (lookup_map, fk_map).
716
        """
717
        # apply full node information to graph edges
718
        table: IInterimTable = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(list))))
12✔
719
        for edge in self.edges:
12✔
720
            lmodel, lfield = edge.left.data
12✔
721
            lmodel = self.models[lmodel]
12✔
722
            rmodel, rfield = edge.right.data
12✔
723
            rmodel = self.models[rmodel]
12✔
724
            table[lmodel][lfield][rmodel][rfield].extend(self.resolved['globalDeps'][rmodel][rfield][lmodel])
12✔
725

726
        # build lookup and path map
727
        lookup_map: ILookupMap = defaultdict(lambda: defaultdict(dict))
12✔
728
        path_map: Dict[Type[Model], Set[str]] = defaultdict(set)
12✔
729
        for lmodel, data in table.items():
12✔
730
            for lfield, ldata in data.items():
12✔
731
                for rmodel, rdata in ldata.items():
12✔
732
                    fields, strings = self._resolve(rdata)
12✔
733
                    lookup_map[lmodel][lfield][rmodel] = (fields, strings)
12✔
734
                    path_map[lmodel].update(strings)
12✔
735

736
        # translate paths to model local fields and filter for fk fields
737
        fk_map: IFkMap = {}
12✔
738
        for model, paths in path_map.items():
12✔
739
            value = self._get_fk_fields(model, paths)
12✔
740
            if value:
12✔
741
                fk_map[model] = value
12✔
742

743
        return lookup_map, fk_map
12✔
744

745
    def prepare_modelgraphs(self) -> None:
12✔
746
        """
747
        Helper to initialize model local subgraphs.
748
        """
749
        if self.modelgraphs:
12✔
750
            return
12✔
751
        data = self.resolved['localDeps']
12✔
752
        for model, local_deps in data.items():
12✔
753
            model_graph = ModelGraph(model, local_deps, self._computed_models[model])
12✔
754
            model_graph.transitive_reduction()  # modelgraph always must be cyclefree
12✔
755
            self.modelgraphs[model] = model_graph
12✔
756

757
    def generate_local_mro_map(self) -> ILocalMroMap:
12✔
758
        """
759
        Generate model local computed fields mro maps.
760
        Returns a mapping of models with local computed fields dependencies and their
761
        `mro`, example:
762

763
        .. code:: python
764

765
            {
766
                modelX: {
767
                    'base': ['c1', 'c2', 'c3'],
768
                    'fields': {
769
                        'name': ['c2', 'c3'],
770
                        'c2': ['c2', 'c3']
771
                    }
772
                }
773
            }
774

775
        In the example `modelX` would have 3 computed fields, where `c2` somehow depends on
776
        the field `name`. `c3` itself depends on changes to `c2`, thus a change to `name` should
777
        run `c2` and `c3` in that specific order.
778

779
        `base` lists all computed fields in topological execution order (mro).
780
        It is also used at runtime to cover a full update of an instance (``update_fields=None``).
781

782
        .. NOTE::
783
            Note that the actual values in `fields` are bitarrays to index positions of `base`,
784
            which allows quick field update merges at runtime by doing binary OR on the bitarrays.
785
        """
786
        self.prepare_modelgraphs()
12✔
787
        return dict(
12✔
788
            (model, g.generate_local_mapping(g.generate_field_paths(g.get_topological_paths())))
789
            for model, g in self.modelgraphs.items()
790
        )
791

792
    def get_uniongraph(self) -> Graph:
12✔
793
        """
794
        Build a union graph of intermodel dependencies and model local dependencies.
795
        This graph represents the final update cascades triggered by certain field updates.
796
        The union graph is needed to spot cycles introduced by model local dependencies,
797
        that otherwise might went unnoticed, example:
798

799
        - global dep graph (acyclic):  ``A.comp --> B.comp, B.comp2 --> A.comp``
800
        - modelgraph of B  (acyclic):  ``B.comp --> B.comp2``
801

802
        Here the resulting union graph is not a DAG anymore, since both subgraphs short-circuit
803
        to a cycle of ``A.comp --> B.comp --> B.comp2 --> A.comp``.
804
        """
805
        if not self.union:
12✔
806
            graph = Graph()
12✔
807
            # copy intermodel edges
808
            for edge in self.edges:
12✔
809
                graph.add_edge(edge)
12✔
810
            # copy modelgraph edges
811
            self.prepare_modelgraphs()
12✔
812
            for model, modelgraph in self.modelgraphs.items():
12✔
813
                name = modelname(model)
12✔
814
                for edge in modelgraph.edges:
12✔
815
                    graph.add_edge(Edge(
12✔
816
                        Node((name, edge.left.data)),
817
                        Node((name, edge.right.data))
818
                    ))
819
            self.union = graph
12✔
820
        return self.union
12✔
821

822

823
class ModelGraph(Graph):
12✔
824
    """
825
    Graph to resolve model local computed field dependencies in right calculation order.
826
    """
827

828
    def __init__(
12✔
829
            self,
830
            model: Type[Model],
831
            local_dependencies: Dict[str, Set[str]],
832
            computed_fields: Dict[str, IComputedField]
833
    ):
834
        super(ModelGraph, self).__init__()
12✔
835
        self.model = model
12✔
836

837
        # add all edges from extracted local deps
838
        for right, deps in local_dependencies.items():
12✔
839
            for left in deps:
12✔
840
                self.add_edge(Edge(Node(left), Node(right)))
12✔
841

842
        # add ## node as update_fields=None placeholder with edges to all computed fields
843
        # --> None explicitly updates all computed fields
844
        # Note: this has to be on all cfs to not skip a non local dependent one by accident
845
        left_all = Node('##')
12✔
846
        for computed in computed_fields:
12✔
847
            self.add_edge(Edge(left_all, Node(computed)))
12✔
848

849
    def transitive_reduction(self) -> None:
12✔
850
        """
851
        Remove redundant single edges. Also checks for cycles.
852
        *Note:* Other than intermodel dependencies local dependencies must always be cyclefree.
853
        """
854
        paths: List[List[Edge]] = self.get_edgepaths()
12✔
855
        remove: Set[Edge] = set()
12✔
856
        for path1 in paths:
12✔
857
            # we only cut single edge paths
858
            if len(path1) > 1:
12✔
859
                continue
12✔
860
            left: Node = path1[0].left
12✔
861
            right: Node = path1[-1].right
12✔
862
            for path2 in paths:
12✔
863
                if path2 == path1:
12✔
864
                    continue
12✔
865
                if right == path2[-1].right and left == path2[0].left:
12✔
866
                    remove.add(path1[0])
12✔
867
        for edge in remove:
12✔
868
            self.remove_edge(edge)
12✔
869

870
    def _tsort(
12✔
871
            self,
872
            graph: Dict[Node, List[Node]],
873
            start: Node,
874
            paths: Dict[Node, List[Node]],
875
            path: List[Node]
876
    ):
877
        """
878
        Recursive deep first search variant of topsort.
879
        Also accumulates any revealed subpaths.
880
        """
881
        for node in graph.get(start, []):
12✔
882
            if not node in paths:
12✔
883
                # accumulate revealed topological subpaths
884
                paths[node] = self._tsort(graph, node, paths, [])
12✔
885
            for snode in paths[node]:
12✔
886
                # append node if its not yet part of the path
887
                if not snode in path:
12✔
888
                    path += [snode]
12✔
889
        path += [start]
12✔
890
        return path
12✔
891

892
    def get_topological_paths(self) -> Dict[Node, List[Node]]:
12✔
893
        """
894
        Creates a map of all possible entry nodes and their topological update path
895
        (computed fields mro).
896
        """
897
        # create simplified parent-child relation graph
898
        graph: Dict[Node, List[Node]] = defaultdict(list)
12✔
899
        for edge in self.edges:
12✔
900
            graph[edge.left].append(edge.right)
12✔
901
        topological_paths: Dict[Node, List[Node]] = {}
12✔
902

903
        # '##' has connections to all cfs thus creates the basic deps order map containing all cfs
904
        # it also creates all tpaths between cfs itself
905
        topological_paths[Node('##')] = self._tsort(graph, Node('##'), topological_paths, [])[:-1]
12✔
906

907
        # we still need to reveal concrete field deps
908
        # other than for cfs we also strip last entry (the field itself)
909
        for node in graph:
12✔
910
            if node in topological_paths:
12✔
911
                continue
12✔
912
            topological_paths[node] = self._tsort(graph, node, topological_paths, [])[:-1]
12✔
913

914
        # reverse all tpaths
915
        for entry, path in topological_paths.items():
12✔
916
            topological_paths[entry] = path[::-1]
12✔
917

918
        return topological_paths
12✔
919

920
    def generate_field_paths(self, tpaths: Dict[Node, List[Node]]) -> Dict[str, List[str]]:
12✔
921
        """
922
        Convert topological path node mapping into a mapping containing the fieldnames.
923
        """
924
        field_paths: Dict[str, List[str]] = {}
12✔
925
        for node, path in tpaths.items():
12✔
926
            field_paths[node.data] = [el.data for el in path]
12✔
927
        return field_paths
12✔
928

929
    def generate_local_mapping(self, field_paths: Dict[str, List[str]]) -> ILocalMroData:
12✔
930
        """
931
        Generates the final model local update table to be used during ``ComputedFieldsModel.save``.
932
        Output is a mapping of local fields, that also update local computed fields, to a bitarray
933
        containing the computed fields mro, and the base topologcial order for a full update.
934
        """
935
        # transfer final data to bitarray
936
        # bitarray: bit index is realisation of one valid full topsort order held in '##'
937
        # since this order was build from full graph it always reflects the correct mro
938
        # even for a subset of fields in update_fields later on
939
        # properties:
940
        #   - length is number of computed fields on model
941
        #   - True indicates execution of associated function at position in topological order
942
        #   - bitarray is int, field update rules can be added by bitwise OR at save time
943
        # example:
944
        #   - edges: [name, c_a], [c_a, c_b], [c_c, c_b], [z, c_c]
945
        #   - topological order of cfs: {1: c_a, 2: c_c, 4: c_b}
946
        #   - field deps:   name  : c_a, c_b    --> 0b101
947
        #                   c_a   : c_a, c_b    --> 0b101
948
        #                   c_c   : c_c, c_b    --> 0b110
949
        #                   z     : c_c, c_b    --> 0b110
950
        #   - update mro:   [name, z]   --> 0b101 | 0b110 --> [c_a, c_c, c_b]
951
        #                   [c_a, c_b]  --> 0b101 | 0b110 --> [c_a, c_c, c_b]
952
        binary: Dict[str, int] = {}
12✔
953
        base = field_paths['##']
12✔
954
        for field, deps in field_paths.items():
12✔
955
            if field == '##':
12✔
956
                continue
12✔
957
            binary[field] = 0
12✔
958
            for pos, name in enumerate(base):
12✔
959
                binary[field] |= (1 if name in deps else 0) << pos
12✔
960
        return {'base': base, 'fields': binary}
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc