• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

SectorLabs / django-postgres-extra / afc25a2e-2e33-4b0c-8a3a-c2aa4b1bae92

08 Jun 2025 12:32PM UTC coverage: 91.017% (+0.05%) from 90.968%
afc25a2e-2e33-4b0c-8a3a-c2aa4b1bae92

push

circleci

Photonios
Use Postgres when testing Python >=3.10 and Django >=5.x

1834 of 2015 relevant lines covered (91.02%)

0.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.28
/psqlextra/sql.py
1
from collections import OrderedDict
1✔
2
from collections.abc import Iterable
1✔
3
from typing import Any, Dict, List, Optional, Tuple, Union
1✔
4

5
import django
1✔
6

7
from django.core.exceptions import SuspiciousOperation
1✔
8
from django.db import connections, models
1✔
9
from django.db.models import Expression, sql
1✔
10
from django.db.models.constants import LOOKUP_SEP
1✔
11
from django.db.models.expressions import Ref
1✔
12

13
from .compiler import PostgresInsertOnConflictCompiler
1✔
14
from .compiler import SQLUpdateCompiler as PostgresUpdateCompiler
1✔
15
from .expressions import HStoreColumn
1✔
16
from .fields import HStoreField
1✔
17
from .types import ConflictAction
1✔
18

19

20
class PostgresQuery(sql.Query):
1✔
21
    select: Tuple[Expression, ...]
1✔
22

23
    def chain(self, klass=None):
1✔
24
        """Chains this query to another.
25

26
        We override this so that we can make sure our subclassed query
27
        classes are used.
28
        """
29

30
        if klass == sql.UpdateQuery:
1✔
31
            return super().chain(PostgresUpdateQuery)
1✔
32

33
        if klass == sql.InsertQuery:
1✔
34
            return super().chain(PostgresInsertQuery)
×
35

36
        return super().chain(klass)
1✔
37

38
    def rename_annotations(self, annotations) -> None:
1✔
39
        """Renames the aliases for the specified annotations:
40

41
            .annotate(myfield=F('somestuf__myfield'))
42
            .rename_annotations(myfield='field')
43

44
        Arguments:
45
            annotations:
46
                The annotations to rename. Mapping the
47
                old name to the new name.
48
        """
49

50
        # safety check only, make sure there are no renames
51
        # left that cannot be mapped back to the original name
52
        for old_name, new_name in annotations.items():
1✔
53
            annotation = self.annotations.get(old_name)
1✔
54
            if not annotation:
1✔
55
                raise SuspiciousOperation(
×
56
                    (
57
                        'Cannot rename annotation "{old_name}" to "{new_name}", because there'
58
                        ' is no annotation named "{old_name}".'
59
                    ).format(old_name=old_name, new_name=new_name)
60
                )
61

62
        # rebuild the annotations according to the original order
63
        new_annotations = OrderedDict()
1✔
64
        for old_name, annotation in self.annotations.items():
1✔
65
            new_name = annotations.get(old_name)
1✔
66
            new_annotations[new_name or old_name] = annotation
1✔
67

68
            if new_name and self.annotation_select_mask:
1✔
69
                # It's a set in all versions prior to Django 5.x
70
                # and a list in Django 5.x and newer.
71
                # https://github.com/django/django/commit/d6b6e5d0fd4e6b6d0183b4cf6e4bd4f9afc7bf67
72
                if isinstance(self.annotation_select_mask, set):
1✔
73
                    updated_annotation_select_mask = set(
1✔
74
                        self.annotation_select_mask
75
                    )
76
                    updated_annotation_select_mask.discard(old_name)
1✔
77
                    updated_annotation_select_mask.add(new_name)
1✔
78
                    self.set_annotation_mask(updated_annotation_select_mask)
1✔
79
                elif isinstance(self.annotation_select_mask, list):
×
80
                    self.annotation_select_mask.remove(old_name)
×
81
                    self.annotation_select_mask.append(new_name)
×
82

83
        if isinstance(self.group_by, Iterable):
1✔
84
            for statement in self.group_by:
1✔
85
                if not isinstance(statement, Ref):
1✔
86
                    continue
1✔
87

88
                if statement.refs in annotations:  # type: ignore[attr-defined]
1✔
89
                    statement.refs = annotations[statement.refs]  # type: ignore[attr-defined]
1✔
90

91
        self.annotations.clear()
1✔
92
        self.annotations.update(new_annotations)
1✔
93

94
    def add_fields(self, field_names, *args, **kwargs) -> None:
1✔
95
        """Adds the given (model) fields to the select set.
96

97
        The field names are added in the order specified. This overrides
98
        the base class's add_fields method. This is called by the
99
        .values() or .values_list() method of the query set. It
100
        instructs the ORM to only select certain values. A lot of
101
        processing is neccesarry because it can be used to easily do
102
        joins. For example, `my_fk__name` pulls in the `name` field in
103
        foreign key `my_fk`. In our case, we want to be able to do
104
        `title__en`, where `title` is a HStoreField and `en` a key. This
105
        doesn't really involve a join. We iterate over the specified
106
        field names and filter out the ones that refer to HStoreField
107
        and compile it into an expression which is added to the list of
108
        to be selected fields using `self.add_select`.
109
        """
110

111
        # django knows how to do all of this natively from v2.1
112
        # see: https://github.com/django/django/commit/20bab2cf9d02a5c6477d8aac066a635986e0d3f3
113
        if django.VERSION >= (2, 1):
1✔
114
            return super().add_fields(field_names, *args, **kwargs)
1✔
115

116
        select = []
×
117
        field_names_without_hstore = []
×
118

119
        for name in field_names:
×
120
            parts = name.split(LOOKUP_SEP)
×
121

122
            # it cannot be a special hstore thing if there's no __ in it
123
            if len(parts) > 1:
×
124
                column_name, hstore_key = parts[:2]
×
125
                is_hstore, field = self._is_hstore_field(column_name)
×
126
                if self.model and is_hstore:
×
127
                    select.append(
×
128
                        HStoreColumn(
129
                            self.model._meta.db_table
130
                            or self.model.__class__.__name__,
131
                            field,
132
                            hstore_key,
133
                        )
134
                    )
135
                    continue
×
136

137
            field_names_without_hstore.append(name)
×
138

139
        super().add_fields(field_names_without_hstore, *args, **kwargs)
×
140

141
        if len(select) > 0:
×
142
            self.set_select(list(self.select + tuple(select)))
×
143

144
    def _is_hstore_field(
1✔
145
        self, field_name: str
146
    ) -> Tuple[bool, Optional[models.Field]]:
147
        """Gets whether the field with the specified name is a HStoreField.
148

149
        Returns     A tuple of a boolean indicating whether the field
150
        with the specified name is a HStoreField, and the     field
151
        instance.
152
        """
153

154
        if not self.model:
×
155
            return (False, None)
×
156

157
        field_instance = None
×
158
        for field in self.model._meta.local_concrete_fields:  # type: ignore[attr-defined]
×
159
            if field.name == field_name or field.column == field_name:
×
160
                field_instance = field
×
161
                break
×
162

163
        return isinstance(field_instance, HStoreField), field_instance
×
164

165

166
class PostgresInsertQuery(sql.InsertQuery):
1✔
167
    """Insert query using PostgreSQL."""
168

169
    def __init__(self, *args, **kwargs):
1✔
170
        """Initializes a new instance :see:PostgresInsertQuery."""
171

172
        super(PostgresInsertQuery, self).__init__(*args, **kwargs)
1✔
173

174
        self.conflict_target = []
1✔
175
        self.conflict_action = ConflictAction.UPDATE
1✔
176
        self.conflict_update_condition = None
1✔
177
        self.index_predicate = None
1✔
178
        self.update_values = {}
1✔
179

180
    def insert_on_conflict_values(
1✔
181
        self,
182
        objs: List,
183
        insert_fields: List,
184
        update_values: Dict[str, Union[Any, Expression]] = {},
185
    ):
186
        """Sets the values to be used in this query.
187

188
        Insert fields are fields that are definitely
189
        going to be inserted, and if an existing row
190
        is found, are going to be overwritten with the
191
        specified value.
192

193
        Update fields are fields that should be overwritten
194
        in case an update takes place rather than an insert.
195
        If we're dealing with a INSERT, these will not be used.
196

197
        Arguments:
198
            objs:
199
                The objects to apply this query to.
200

201
            insert_fields:
202
                The fields to use in the INSERT statement
203

204
            update_values:
205
                Expressions/values to use when a conflict
206
                occurs and an UPDATE is performed.
207
        """
208

209
        self.insert_values(insert_fields, objs, raw=False)
1✔
210
        self.update_values = update_values
1✔
211

212
    def get_compiler(self, using=None, connection=None):
1✔
213
        if using:
1✔
214
            connection = connections[using]
1✔
215
        return PostgresInsertOnConflictCompiler(self, connection, using)
1✔
216

217

218
class PostgresUpdateQuery(sql.UpdateQuery):
1✔
219
    """Update query using PostgreSQL."""
220

221
    def get_compiler(self, using=None, connection=None):
1✔
222
        if using:
1✔
223
            connection = connections[using]
1✔
224
        return PostgresUpdateCompiler(self, connection, using)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc