• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DemocracyClub / UK-Polling-Stations / 28b2cb5e-29d5-4b2a-bba5-65e9410e6204

08 Mar 2024 11:40AM UTC coverage: 71.569% (-0.4%) from 71.995%
28b2cb5e-29d5-4b2a-bba5-65e9410e6204

Pull #6452

circleci

awdem
Import script for City of Lincoln (2024-05-02) (closes #6451)
Pull Request #6452: Import script for City of Lincoln (2024-05-02) (closes #6451)

3411 of 4766 relevant lines covered (71.57%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.27
/polling_stations/apps/councils/models.py
1
import re
1✔
2
from pathlib import Path
1✔
3
from typing import Optional
1✔
4

5
from data_importers.event_types import DataEventType
1✔
6
from data_importers.models import DataEvent, DataQuality
1✔
7
from django.apps import apps
1✔
8
from django.conf import settings
1✔
9
from django.contrib.gis.db import models
1✔
10
from django.contrib.postgres.fields import ArrayField
1✔
11
from django.db import DEFAULT_DB_ALIAS, transaction
1✔
12
from django.db.models import Count, JSONField, OuterRef, Q, Subquery
1✔
13
from django.utils.translation import get_language
1✔
14
from file_uploads.models import File, Upload
1✔
15
from pollingstations.models import PollingStation
1✔
16

17
from polling_stations.i18n.cy import WelshNameMutationMixin
1✔
18

19

20
class CouncilQueryset(models.QuerySet):
1✔
21
    def with_polling_stations_in_db(self):
1✔
22
        return (
1✔
23
            self.using(DEFAULT_DB_ALIAS)
24
            .annotate(ps_count=Count("pollingstation"))
25
            .exclude(pollingstation=None)
26
        )
27

28
    def with_ems_from_uploads(self):
1✔
29
        latest_ems_subquery = (
×
30
            File.objects.filter(upload=OuterRef("pk")).values("ems").distinct("ems")
31
        )
32
        upload_subquery = (
×
33
            Upload.objects.filter(gss=OuterRef("council_id"))
34
            .annotate(ems=latest_ems_subquery)
35
            .order_by("-timestamp")
36
        )
37

38
        return self.using(DEFAULT_DB_ALIAS).annotate(
×
39
            latest_ems=Subquery(upload_subquery.values("ems")[:1]),
40
            latest_upload_id=(upload_subquery.values("id")[:1]),
41
        )
42

43

44
class Council(WelshNameMutationMixin, models.Model):
1✔
45
    council_id = models.CharField(primary_key=True, max_length=100)
1✔
46
    name = models.CharField(blank=True, max_length=255)
1✔
47
    name_translated = JSONField(default=dict)
1✔
48
    identifiers = ArrayField(models.CharField(max_length=100), default=list)
1✔
49

50
    electoral_services_email = models.EmailField(blank=True)
1✔
51
    electoral_services_phone_numbers = ArrayField(
1✔
52
        models.CharField(max_length=100), default=list
53
    )
54
    electoral_services_website = models.URLField(blank=True)
1✔
55
    electoral_services_postcode = models.CharField(
1✔
56
        blank=True, null=True, max_length=100
57
    )
58
    electoral_services_address = models.TextField(blank=True, null=True)
1✔
59

60
    registration_email = models.EmailField(blank=True)
1✔
61
    registration_phone_numbers = ArrayField(
1✔
62
        models.CharField(blank=True, max_length=100), default=list
63
    )
64
    registration_website = models.URLField(blank=True)
1✔
65
    registration_postcode = models.CharField(blank=True, null=True, max_length=100)
1✔
66
    registration_address = models.TextField(blank=True, null=True)
1✔
67

68
    users = models.ManyToManyField(through="UserCouncils", to=settings.AUTH_USER_MODEL)
1✔
69

70
    objects = CouncilQueryset.as_manager()
1✔
71

72
    def __str__(self):
1✔
73
        try:
1✔
74
            return self.name_translated[get_language()]
1✔
75
        except KeyError:
1✔
76
            return self.name
1✔
77

78
    class Meta:
1✔
79
        ordering = ("name",)
1✔
80

81
    def save(
1✔
82
        self, force_insert=False, force_update=False, using=None, update_fields=None
83
    ):
84
        with transaction.atomic():
1✔
85
            new = self._state.adding
1✔
86
            super().save(
1✔
87
                force_insert=force_insert,
88
                force_update=force_update,
89
                using=using,
90
                update_fields=update_fields,
91
            )
92
            if new:
1✔
93
                # model has just been created, so create corresponding DataQuality object.
94
                DataQuality.objects.get_or_create(council_id=self.council_id)
1✔
95

96
    @property
1✔
97
    def nation(self):
1✔
98
        nations_lookup = {
1✔
99
            "E": "England",
100
            "W": "Wales",
101
            "S": "Scotland",
102
            "N": "Northern Ireland",
103
        }
104
        # A GSS code is:
105
        #   'ANN' + 'NNNNNN' where 'A' is one of 'ENSW' and 'N' is 0-9.
106
        #   'ANN' section is the Entity Code.
107
        # We want to look for identifiers that look like GSS codes
108
        # for the types of organisations that manage elections.
109

110
        # Ref: https://geoportal.statistics.gov.uk/search?collection=Dataset&sort=-created&tags=all(PRD_RGC)
111
        gss_pattern = re.compile(
1✔
112
            """
113
            ^        # Start of string
114
            (        # Entity Codes:
115
                E06   # Unitary Authorities (England)
116
              | E07   # Non-metropolitan Districts (England)
117
              | E08   # Metropolitan Districts (England)
118
              | E09   # London Boroughs (England)
119
              | N09   # Local Government Districts (Northern Ireland)
120
              | S12   # Council Areas (Scotland)
121
              | W06   # Unitary Authorities (Wales)
122
            )
123
            [0-9]{6} # id
124
            $        # End of string
125
            """,
126
            re.VERBOSE,
127
        )
128
        identifier_matches = [
1✔
129
            identifier
130
            for identifier in self.identifiers
131
            if re.match(gss_pattern, identifier)
132
        ]
133
        identifier_nations = {
1✔
134
            nations_lookup[identifier[0]]
135
            for identifier in identifier_matches
136
            if identifier
137
        }
138
        if len(identifier_nations) == 1:
1✔
139
            return identifier_nations.pop()
1✔
140
        return ""
1✔
141

142
    @property
1✔
143
    def short_name(self):
1✔
144
        short_name = self.name
1✔
145
        extras = [
1✔
146
            "London Borough of ",
147
            "Royal Borough of ",
148
            "Council of the " "City of ",
149
            "City & County of ",
150
            " City & District Council",
151
            " City Council",
152
            " District Council",
153
            " Metropolitan",
154
            " Borough",
155
            " County",
156
            " Council",
157
        ]
158
        for extra in extras:
1✔
159
            short_name = short_name.replace(extra, "")
1✔
160
        return short_name
1✔
161

162
    @property
1✔
163
    def import_script_path(self):
1✔
164
        import_script_path = None
×
165

166
        scripts = Path(
×
167
            "./polling_stations/apps/data_importers/management/commands/"
168
        ).glob("import_*.py")
169
        for script in scripts:
×
170
            if f'council_id = "{self.council_id}"' in script.read_text():
×
171
                import_script_path = script
×
172
        if not import_script_path:
×
173
            import_script_path = Path(
×
174
                f'./polling_stations/apps/data_importers/management/commands/import_{self.short_name.lower().replace(" ", "_")}.py'
175
            )
176

177
        return str(import_script_path)
×
178

179
    @property
1✔
180
    def has_polling_stations_in_db(self):
1✔
181
        if self.pollingstation_set.count() > 0:
1✔
182
            return True
1✔
183
        return False
1✔
184

185
    def latest_data_event(self, event_type: DataEventType):
1✔
186
        data_event_model = apps.get_model("data_importers", "DataEvent")
1✔
187
        try:
1✔
188
            return self.dataevent_set.filter(event_type=event_type).latest()
1✔
189
        except data_event_model.DoesNotExist:
1✔
190
            return None
1✔
191

192
    @property
1✔
193
    def live_upload(self):
1✔
194
        if not self.has_polling_stations_in_db:
1✔
195
            return None
1✔
196
        latest_import = self.latest_data_event(DataEventType.IMPORT)
1✔
197
        latest_teardown = self.latest_data_event(DataEventType.TEARDOWN)
1✔
198
        if (
1✔
199
            latest_teardown
200
            and latest_import
201
            and (latest_import.created > latest_teardown.created)
202
            and latest_import.upload
203
        ):
204
            return latest_import.upload
1✔
205
        if latest_import and not latest_teardown and latest_import.upload:
1✔
206
            return latest_import.upload
1✔
207
        return None
1✔
208

209
    def update_all_station_visibilities_from_events(
1✔
210
        self, election_dates: Optional[list] = None
211
    ):
212
        for station in self.pollingstation_set.all():
1✔
213
            self.update_station_visibility_from_events(station, election_dates)
1✔
214

215
    def update_station_visibility_from_events(
1✔
216
        self, station: PollingStation, election_dates: Optional[list] = None
217
    ):
218
        query = Q(
1✔
219
            council=self,
220
            event_type=DataEventType.SET_STATION_VISIBILITY,
221
            payload__internal_council_id=station.internal_council_id,
222
        )
223
        if election_dates:
1✔
224
            election_query = Q(election_dates__contains=[election_dates[0]])
1✔
225
            for date in election_dates[1:]:
1✔
226
                election_query |= Q(election_dates__contains=[date])
1✔
227
            query &= election_query
1✔
228
        try:
1✔
229
            latest_visibility_event_for_station = DataEvent.objects.filter(
1✔
230
                query
231
            ).latest()
232
            visibility = latest_visibility_event_for_station.payload["visibility"]
1✔
233
            station.visibility = visibility
1✔
234
            station.save()
1✔
235
        except DataEvent.DoesNotExist:
1✔
236
            pass
1✔
237

238

239
class CouncilGeography(models.Model):
1✔
240
    council = models.OneToOneField(
1✔
241
        "Council", related_name="geography", on_delete=models.CASCADE
242
    )
243
    gss = models.CharField(blank=True, max_length=20)
1✔
244
    geography = models.MultiPolygonField(null=True)
1✔
245

246

247
class UserCouncils(models.Model):
1✔
248
    user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
1✔
249
    council = models.ForeignKey(Council, on_delete=models.CASCADE)
1✔
250

251
    class Meta:
1✔
252
        verbose_name_plural = "User Councils"
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc