• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

charettes / django-polymodels / 14162717205

31 Mar 2025 02:19AM UTC coverage: 99.112%. Remained the same
14162717205

push

github

charettes
Drop support for Django < 4.2 and Python < 3.9.

128 of 137 branches covered (93.43%)

335 of 338 relevant lines covered (99.11%)

4.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.25
/polymodels/models.py
1
import threading
5✔
2
from collections import defaultdict, namedtuple
5✔
3
from operator import attrgetter
5✔
4

5
from django.contrib.contenttypes.models import ContentType
5✔
6
from django.core import checks
5✔
7
from django.core.exceptions import FieldDoesNotExist
5✔
8
from django.db import models, transaction
5✔
9
from django.db.models.constants import LOOKUP_SEP
5✔
10
from django.db.models.signals import class_prepared
5✔
11
from django.utils.functional import cached_property
5✔
12

13
from .managers import PolymorphicManager
5✔
14
from .utils import copy_fields, get_content_type, get_content_types
5✔
15

16

17
class SubclassAccessor(
5✔
18
    namedtuple("SubclassAccessor", ["attrs", "proxy", "related_lookup"])
19
):
20
    @staticmethod
5✔
21
    def _identity(obj):
5✔
22
        return obj
5✔
23

24
    @cached_property
5✔
25
    def attrgetter(self):
5✔
26
        if not self.attrs:
5✔
27
            return self._identity
5✔
28
        return attrgetter(".".join(self.attrs))
5✔
29

30
    def __call__(self, obj, with_prefetched_objects=False):
5✔
31
        # Cast to the right concrete model by going up in the
32
        # SingleRelatedObjectDescriptor chain
33
        casted = self.attrgetter(obj)
5✔
34
        # If it's a proxy model we make sure to type cast it
35
        proxy = self.proxy
5✔
36
        if proxy:
5✔
37
            casted = copy_fields(casted, proxy)
5✔
38
        if with_prefetched_objects:
5✔
39
            try:
5✔
40
                casted._prefetched_objects_cache.update(obj._prefetched_objects_cache)
5✔
41
            except AttributeError:
5✔
42
                casted._prefetched_objects_cache = obj._prefetched_objects_cache
5✔
43
        return casted
5✔
44

45

46
EMPTY_ACCESSOR = SubclassAccessor((), None, "")
5✔
47

48

49
class SubclassAccessors(defaultdict):
5✔
50
    def __init__(self):
5✔
51
        self.model = None
5✔
52
        self.apps = None
5✔
53

54
    def contribute_to_class(self, model, name, **kwargs):
5✔
55
        self.model = model
5✔
56
        self.apps = model._meta.apps
5✔
57
        self.lock = threading.RLock()
5✔
58
        setattr(model, name, self)
5✔
59
        # Ideally we would connect to the model.apps.clear_cache()
60
        class_prepared.connect(self.class_prepared_receiver, weak=False)
5✔
61

62
    def class_prepared_receiver(self, sender, **kwargs):
5✔
63
        if issubclass(sender, self.model):
5✔
64
            with self.lock:
5✔
65
                for parent in sender._meta.parents:
5✔
66
                    self.pop(self.get_model_key(parent._meta), None)
5✔
67

68
    def get_model_key(self, opts):
5✔
69
        return opts.app_label, opts.model_name
5✔
70

71
    def __get__(self, instance, owner):
5✔
72
        if owner is self.model:
5✔
73
            return self
5✔
74
        opts = owner._meta
5✔
75
        model_key = self.get_model_key(opts)
5✔
76
        return self[model_key]
5✔
77

78
    def __missing__(self, model_key):
5✔
79
        """
80
        Generate the accessors for this model by recursively generating its
81
        children accessors and prefixing them.
82
        """
83
        owner = self.apps.get_model(*model_key)
5✔
84
        if not issubclass(owner, self.model):
5✔
85
            raise KeyError
5✔
86
        accessors = {owner: EMPTY_ACCESSOR}
5✔
87
        with self.lock:
5✔
88
            for model in self.apps.get_models():
5✔
89
                opts = model._meta
5✔
90
                if (
5✔
91
                    opts.proxy
92
                    and issubclass(model, owner)
93
                    and (owner._meta.proxy or opts.concrete_model is owner)
94
                ):
95
                    accessors[model] = SubclassAccessor((), model, "")
5✔
96
                # Use .get() instead of `in` as proxy inheritance is also
97
                # stored in _meta.parents as None.
98
                elif opts.parents.get(owner):
5✔
99
                    part = opts.model_name
5✔
100
                    for child, (parts, proxy, _lookup) in self[
5✔
101
                        self.get_model_key(opts)
102
                    ].items():
103
                        accessors[child] = SubclassAccessor(
5✔
104
                            (part,) + parts, proxy, LOOKUP_SEP.join((part,) + parts)
105
                        )
106
        return accessors
5✔
107

108

109
class BasePolymorphicModel(models.Model):
5✔
110
    class Meta:
5✔
111
        abstract = True
5✔
112

113
    subclass_accessors = SubclassAccessors()
5✔
114

115
    def type_cast(self, to=None, with_prefetched_objects=False):
5✔
116
        if to is None:
5✔
117
            content_type_id = getattr(self, "%s_id" % self.CONTENT_TYPE_FIELD)
5✔
118
            to = ContentType.objects.get_for_id(content_type_id).model_class()
5✔
119
        accessor = self.subclass_accessors[to]
5✔
120
        return accessor(self, with_prefetched_objects)
5✔
121

122
    def save(self, *args, **kwargs):
5✔
123
        if self._state.adding and getattr(self, self.CONTENT_TYPE_FIELD, None) is None:
5✔
124
            content_type = get_content_type(self.__class__)
5✔
125
            setattr(self, self.CONTENT_TYPE_FIELD, content_type)
5✔
126
        return super().save(*args, **kwargs)
5✔
127

128
    def delete(self, using=None, keep_parents=False):
5✔
129
        kept_parent = None
5✔
130
        if keep_parents:
5!
131
            parent_ptr = next(
5✔
132
                iter(self._meta.concrete_model._meta.parents.values()), None
133
            )
134
            if parent_ptr:
5!
135
                kept_parent = getattr(self, parent_ptr.name)
5✔
136
        if kept_parent:
5!
137
            context_manager = transaction.atomic(using=using, savepoint=False)
5✔
138
        else:
139
            context_manager = transaction.mark_for_rollback_on_error(using=using)
×
140
        with context_manager:
5✔
141
            deletion = super().delete(using=using, keep_parents=keep_parents)
5✔
142
            if kept_parent:
5!
143
                parent_content_type = get_content_type(kept_parent)
5✔
144
                setattr(kept_parent, self.CONTENT_TYPE_FIELD, parent_content_type)
5✔
145
                kept_parent.save(update_fields=[self.CONTENT_TYPE_FIELD])
5✔
146
        return deletion
5✔
147

148
    @classmethod
5✔
149
    def content_type_lookup(cls, *models, **kwargs):
5✔
150
        query_name = kwargs.pop("query_name", None) or cls.CONTENT_TYPE_FIELD
5✔
151
        if models:
5✔
152
            query_name = "%s__in" % query_name
5✔
153
            value = set(ct.pk for ct in get_content_types(*models).values())
5✔
154
        else:
155
            value = get_content_type(cls).pk
5✔
156
        return {query_name: value}
5✔
157

158
    @classmethod
5✔
159
    def subclasses_lookup(cls, query_name=None):
5✔
160
        return cls.content_type_lookup(
5✔
161
            cls, *tuple(cls.subclass_accessors), query_name=query_name
162
        )
163

164
    @classmethod
5✔
165
    def check(cls, **kwargs):
5✔
166
        errors = super().check(**kwargs)
5✔
167
        try:
5✔
168
            content_type_field_name = getattr(cls, "CONTENT_TYPE_FIELD")
5✔
169
        except AttributeError:
5✔
170
            errors.append(
5✔
171
                checks.Error(
172
                    "`BasePolymorphicModel` subclasses must define a `CONTENT_TYPE_FIELD`.",
173
                    hint=None,
174
                    obj=cls,
175
                    id="polymodels.E001",
176
                )
177
            )
178
        else:
179
            try:
5✔
180
                content_type_field = cls._meta.get_field(content_type_field_name)
5✔
181
            except FieldDoesNotExist:
5✔
182
                errors.append(
5✔
183
                    checks.Error(
184
                        "`CONTENT_TYPE_FIELD` points to an inexistent field '%s'."
185
                        % content_type_field_name,
186
                        hint=None,
187
                        obj=cls,
188
                        id="polymodels.E002",
189
                    )
190
                )
191
            else:
192
                if (
5✔
193
                    not isinstance(content_type_field, models.ForeignKey)
194
                    or content_type_field.remote_field.model is not ContentType
195
                ):
196
                    errors.append(
5✔
197
                        checks.Error(
198
                            "`%s` must be a `ForeignKey` to `ContentType`."
199
                            % content_type_field_name,
200
                            hint=None,
201
                            obj=content_type_field,
202
                            id="polymodels.E003",
203
                        )
204
                    )
205
        return errors
5✔
206

207

208
class PolymorphicModel(BasePolymorphicModel):
5✔
209
    CONTENT_TYPE_FIELD = "content_type"
5✔
210
    content_type = models.ForeignKey(
5✔
211
        ContentType, on_delete=models.CASCADE, related_name="+"
212
    )
213

214
    objects = PolymorphicManager()
5✔
215

216
    class Meta:
5✔
217
        abstract = True
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc