• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DemocracyClub / UK-Polling-Stations / 28b2cb5e-29d5-4b2a-bba5-65e9410e6204

08 Mar 2024 11:40AM UTC coverage: 71.569% (-0.4%) from 71.995%
28b2cb5e-29d5-4b2a-bba5-65e9410e6204

Pull #6452

circleci

awdem
Import script for City of Lincoln (2024-05-02) (closes #6451)
Pull Request #6452: Import script for City of Lincoln (2024-05-02) (closes #6451)

3411 of 4766 relevant lines covered (71.57%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.82
/polling_stations/apps/file_uploads/models.py
1
from datetime import timedelta
1✔
2

3
from commitment import GitHubClient, GitHubCredentials
1✔
4
from data_importers.import_script import ImportScript
1✔
5
from django.conf import settings
1✔
6
from django.contrib.auth.models import User
1✔
7
from django.contrib.gis.db import models
1✔
8
from django.core.mail import EmailMessage
1✔
9
from django.db import transaction
1✔
10
from django.template.loader import render_to_string
1✔
11
from django.utils.timezone import now
1✔
12
from requests import HTTPError
1✔
13
from sentry_sdk import capture_message
1✔
14

15
status_map = {
1✔
16
    "Pending": "⌛",
17
    "Waiting": "⌛ waiting for second file",
18
    "Error": "❌",
19
    "Error One File": "❌ only one file uploaded",
20
    "OK": "✔️",
21
}
22

23

24
def status_to_emoji(status):
1✔
25
    if status in status_map:
×
26
        return status_map[status]
×
27
    return status
×
28

29

30
class UploadQuerySet(models.QuerySet):
1✔
31
    def future(self):
1✔
32
        return self.filter(election_date__gte=now())
1✔
33

34
    def pending_upload_qs(self):
1✔
35
        from_time = now() - timedelta(minutes=20)
1✔
36
        return Upload.objects.filter(
1✔
37
            timestamp__lte=from_time, warning_about_pending_sent=False
38
        )
39

40

41
class Upload(models.Model):
1✔
42
    gss = models.ForeignKey(
1✔
43
        "councils.Council",
44
        null=True,
45
        db_constraint=False,
46
        on_delete=models.DO_NOTHING,
47
    )
48
    timestamp = models.DateTimeField()
1✔
49
    election_date = models.DateField(null=True)
1✔
50
    github_issue = models.CharField(blank=True, max_length=100)
1✔
51

52
    objects = UploadQuerySet.as_manager()
1✔
53
    warning_about_pending_sent = models.BooleanField(default=False)
1✔
54
    upload_user = models.ForeignKey(User, null=True, on_delete=models.SET_NULL)
1✔
55

56
    class Meta:
1✔
57
        get_latest_by = "timestamp"
1✔
58

59
    def __str__(self):
1✔
60
        return f"{self.timestamp}: {self.gss}"
1✔
61

62
    @property
1✔
63
    def status(self):
1✔
64
        if not self.file_set.all():
1✔
65
            return "Pending"
×
66
        for f in self.file_set.all():
1✔
67
            if ("Expected 2 files, found 1" in f.errors) and (
1✔
68
                self.timestamp + timedelta(seconds=180) > now()
69
            ):
70
                return "Waiting"
1✔
71
            if ("Expected 2 files, found 1" in f.errors) and (
1✔
72
                self.timestamp + timedelta(seconds=180) < now()
73
            ):
74
                return "Error One File"
1✔
75

76
            if not f.csv_valid:
×
77
                return "Error"
×
78
        return "OK"
×
79

80
    @property
1✔
81
    def status_emoji(self):
1✔
82
        return status_to_emoji(self.status)
×
83

84
    @property
1✔
85
    def import_script(self):
1✔
86
        if self.status != "OK":
×
87
            return None
×
88

89
        elections = [str(self.election_date)]
×
90
        council_id = self.gss.council_id
×
91

92
        if len(self.file_set.all()) == 1:
×
93
            file = self.file_set.first()
×
94
            path = "/".join(file.key.split("/")[1:])
×
95
            import_script = ImportScript(
×
96
                **{
97
                    "council_id": council_id,
98
                    "ems": file.ems,
99
                    "addresses_name": path,
100
                    "stations_name": path,
101
                    "encoding": file.csv_encoding,
102
                    "elections": elections,
103
                }
104
            )
105

106
        elif len(self.file_set.all()) == 2:
×
107
            stations_file, addresses_file = sorted(
×
108
                self.file_set.all(), key=lambda f: f.csv_rows
109
            )
110
            import_script = ImportScript(
×
111
                **{
112
                    "council_id": council_id,
113
                    "ems": stations_file.ems,
114
                    "addresses_name": "/".join(addresses_file.key.split("/")[1:]),
115
                    "stations_name": "/".join(stations_file.key.split("/")[1:]),
116
                    "encoding": stations_file.csv_encoding,
117
                    "elections": elections,
118
                }
119
            )
120
        else:
121
            return None
×
122

123
        return import_script.script
×
124

125
    @property
1✔
126
    def branch_name(self):
1✔
127
        return f"import-{self.gss.short_name}-{self.election_date}".lower().replace(
×
128
            " ", "-"
129
        )
130

131
    @property
1✔
132
    def gh_issue_number(self):
1✔
133
        return self.github_issue.split("/")[-1]
×
134

135
    @property
1✔
136
    def pr_title(self):
1✔
137
        title = f"Import script for {self.gss.short_name} ({self.election_date}) (closes #{self.gh_issue_number})"
×
138
        server_env = getattr(settings, "SERVER_ENVIRONMENT", None)
×
139
        if server_env == "production":
×
140
            return title
×
141
        if server_env in ["staging", "development", "test"]:
×
142
            return f"TEST/{title}"
×
143
        return f"LOCALTEST/{title}"
×
144

145
    @property
1✔
146
    def pr_body(self):
1✔
147
        message = f"PR triggered by upload at {self.github_issue}"
×
148
        server_env = getattr(settings, "SERVER_ENVIRONMENT", "unknown_env")
×
149
        if server_env == "production":
×
150
            return message
×
151
        if server_env in ["staging", "development", "test"]:
×
152
            return f"**NB triggered from {server_env} instance**\n{message}"
×
153
        f"**NB triggered from local machine**\n{message}"
×
154
        return None
×
155

156
    def send_confirmation_email(self):
1✔
157
        server_env = getattr(settings, "SERVER_ENVIRONMENT", "unknown_env")
1✔
158
        # If we're in production, and the user has been deleted, return early.
159
        # We don't want to send an email to a non-existent user and we already
160
        # have github issues to track successful uploads
161
        if server_env == "production" and self.upload_user is None:
1✔
162
            return
1✔
163
        # if we're in production, and the upload user exists, send them an email
164
        if server_env == "production" and self.upload_user.email:
1✔
165
            to = self.upload_user.email
1✔
166
        # for all other environments, send the email to the default
167
        # from email with a subject line that makes it clear
168
        # we are not in production and testing is taking place
169
        else:
170
            to = settings.DEFAULT_FROM_EMAIL
1✔
171
        if server_env == "production":
1✔
172
            subject = f"Your file upload for {self.gss.short_name} ({self.election_date}) was successful"
1✔
173
        else:
174
            subject = f"**NB triggered from {server_env} instance** Your file upload for {self.gss.short_name} ({self.election_date}) was successful"
×
175

176
        email = EmailMessage(
1✔
177
            subject,
178
            render_to_string(
179
                template_name="file_uploads/email/upload_confirmation.txt"
180
            ),
181
            settings.DEFAULT_FROM_EMAIL,
182
            [to],
183
            reply_to=[settings.DEFAULT_FROM_EMAIL],
184
            headers={"Message-ID": subject},
185
        )
186
        email.send()
1✔
187

188
    @transaction.atomic
1✔
189
    def send_error_email(self):
1✔
190
        subject = "File upload failed"
1✔
191
        message = f"File upload failure: {self}. Please investigate further."
1✔
192
        email = EmailMessage(
1✔
193
            subject,
194
            message,
195
            settings.DEFAULT_FROM_EMAIL,
196
            [settings.DEFAULT_FROM_EMAIL],
197
            reply_to=[settings.DEFAULT_FROM_EMAIL],
198
            headers={"Message-ID": subject},
199
        )
200

201
        email.send()
1✔
202
        self.warning_about_pending_sent = True
1✔
203
        self.save()
1✔
204

205
    def make_pull_request(self):
1✔
206
        if getattr(settings, "RUNNING_TESTS", False):
×
207
            return
×
208
        creds = GitHubCredentials(
×
209
            repo=settings.GITHUB_REPO,
210
            name=settings.GITHUB_USERNAME,
211
            api_key=settings.GITHUB_API_KEY,
212
            email=settings.GITHUB_EMAIL,
213
        )
214
        client = GitHubClient(creds)
×
215
        try:
×
216
            client.create_branch(self.branch_name)
×
217
        except HTTPError as e:
×
218
            if e.response.json()["message"] == "Reference already exists":
×
219
                capture_message(
×
220
                    f"Branch {self.branch_name} already exists", level="warning"
221
                )
222
            else:
223
                raise e
×
224

225
        client.push_file(
×
226
            content=self.import_script,
227
            filename=self.gss.import_script_path,
228
            message=self.pr_title,
229
            branch=self.branch_name,
230
        )
231

232
        try:
×
233
            client.open_pull_request(
×
234
                head_branch=self.branch_name,
235
                title=self.pr_title,
236
                body=self.pr_body,
237
            )
238
        except HTTPError as e:
×
239
            if (
×
240
                e.response.json()["errors"][0]["message"]
241
                == f"A pull request already exists for DemocracyClub:{self.branch_name}."
242
            ):
243
                capture_message(f"PR already exists for {self.branch_name}:\n{e}")
×
244
            else:
245
                raise e
×
246

247

248
class File(models.Model):
1✔
249
    upload = models.ForeignKey(Upload, on_delete=models.CASCADE)
1✔
250
    version = models.PositiveIntegerField(default=1)
1✔
251
    csv_valid = models.BooleanField()
1✔
252
    csv_rows = models.IntegerField(default=0)
1✔
253
    csv_encoding = models.CharField(max_length=20, blank=True)
1✔
254
    ems = models.CharField(max_length=40)
1✔
255
    key = models.CharField(max_length=255)
1✔
256
    errors = models.TextField(blank=True)
1✔
257

258
    def __str__(self):
1✔
259
        return self.key
×
260

261
    @property
1✔
262
    def filename(self):
1✔
263
        return self.key.split("/")[-1]
×
264

265
    @property
1✔
266
    def path(self):
1✔
267
        return "/".join(self.key.split("/")[:-1]) + "/"
×
268

269
    @property
1✔
270
    def status(self):
1✔
271
        if self.csv_valid:
×
272
            return "OK"
×
273
        return "Error"
×
274

275
    @property
1✔
276
    def status_emoji(self):
1✔
277
        return status_to_emoji(self.status)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc