• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DemocracyClub / UK-Polling-Stations / 28b2cb5e-29d5-4b2a-bba5-65e9410e6204

08 Mar 2024 11:40AM UTC coverage: 71.569% (-0.4%) from 71.995%
28b2cb5e-29d5-4b2a-bba5-65e9410e6204

Pull #6452

circleci

awdem
Import script for City of Lincoln (2024-05-02) (closes #6451)
Pull Request #6452: Import script for City of Lincoln (2024-05-02) (closes #6451)

3411 of 4766 relevant lines covered (71.57%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.43
/polling_stations/apps/file_uploads/views.py
1
import json
1✔
2
import logging
1✔
3
from datetime import datetime
1✔
4

5
import boto3
1✔
6
from addressbase.models import Address, UprnToCouncil
1✔
7
from boto.pyami.config import Config
1✔
8
from botocore.exceptions import ClientError
1✔
9
from councils.models import Council
1✔
10
from data_finder.helpers import EveryElectionWrapper
1✔
11
from django.conf import settings
1✔
12
from django.contrib import messages
1✔
13
from django.contrib.auth import get_user_model, login
1✔
14
from django.contrib.auth.mixins import UserPassesTestMixin
1✔
15
from django.core.exceptions import ValidationError as DjangoValidationError
1✔
16
from django.db import DEFAULT_DB_ALIAS
1✔
17
from django.db.models import Max, Prefetch, Subquery
1✔
18
from django.http import HttpResponseRedirect, JsonResponse
1✔
19
from django.shortcuts import redirect
1✔
20
from django.template.loader import render_to_string
1✔
21
from django.urls import reverse, reverse_lazy
1✔
22
from django.utils.decorators import method_decorator
1✔
23
from django.views.decorators.csrf import csrf_protect, ensure_csrf_cookie
1✔
24
from django.views.generic import DetailView, FormView, ListView, TemplateView
1✔
25
from file_uploads.forms import CouncilLoginForm
1✔
26
from marshmallow import Schema, fields, validate
1✔
27
from marshmallow import ValidationError as MarshmallowValidationError
1✔
28
from pollingstations.models import VisibilityChoices
1✔
29
from sentry_sdk import capture_message
1✔
30
from sesame.utils import get_query_string, get_user
1✔
31

32
from .models import File, Upload
1✔
33
from .utils import assign_councils_to_user, get_domain
1✔
34

35
User = get_user_model()
1✔
36

37

38
class FileSchema(Schema):
1✔
39
    name = fields.String(
1✔
40
        required=True,
41
        validate=validate.Regexp(
42
            r"(?i)^.+(\.csv|\.tsv)$", error="Unexpected file type"
43
        ),
44
    )
45
    size = fields.Integer(
1✔
46
        required=True,
47
        validate=validate.Range(
48
            min=0, max=settings.MAX_FILE_SIZE, error="File too big"
49
        ),
50
    )
51
    type = fields.String(required=False)
1✔
52

53

54
class UploadRequestSchema(Schema):
1✔
55
    files = fields.List(
1✔
56
        fields.Nested(FileSchema()),
57
        required=True,
58
        validate=validate.Length(min=1, max=2),
59
    )
60
    election_date = fields.Date(required=True)
1✔
61

62

63
def get_s3_client():
1✔
64
    config = Config()
×
65
    access_key = config.get_value(settings.BOTO_SECTION, "aws_access_key_id")
×
66
    secret_key = config.get_value(settings.BOTO_SECTION, "aws_secret_access_key")
×
67
    return boto3.client(
×
68
        "s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key
69
    )
70

71

72
class CouncilFileUploadAllowedMixin(UserPassesTestMixin):
1✔
73
    def get_login_url(self):
1✔
74
        return reverse_lazy("file_uploads:council_login_view")
×
75

76
    def test_func(self):
1✔
77
        return self.request.user.is_active
1✔
78

79

80
logger = logging.getLogger(__name__)
1✔
81

82

83
class FileUploadView(CouncilFileUploadAllowedMixin, TemplateView):
1✔
84
    template_name = "file_uploads/upload.html"
1✔
85

86
    @method_decorator(ensure_csrf_cookie)
1✔
87
    def get(self, request, *args, **kwargs):
1✔
88
        return super().get(self, request, *args, **kwargs)
×
89

90
    def get_context_data(self, **context):
1✔
91
        context["council"] = (
×
92
            Council.objects.all()
93
            .exclude(council_id__startswith="N09")
94
            .get(council_id=self.kwargs["gss"])
95
        )
96
        upcoming_election_dates = EveryElectionWrapper(
×
97
            council_id=self.kwargs["gss"]
98
        ).get_future_election_dates()
99

100
        # If the list returns no items, flag that there are no upcoming elections
101
        # that we know about.
102
        context["NO_UPCOMING_ELECTIONS"] = bool(upcoming_election_dates)
×
103

104
        # Only show the date picker if there's more than one upcoming election date
105
        context["SHOW_DATE_PICKER"] = len(upcoming_election_dates) > 1
×
106

107
        context["UPCOMING_ELECTION_DATES"] = upcoming_election_dates
×
108

109
        return context
×
110

111
    def validate_body(self, body):
1✔
112
        """
113
        Do some basic picture checks on request body and files.
114
        Obviously these values can be spoofed, so checks on
115
        file extension and MIME type etc are just to "fail fast".
116
        We will check the files _properly_ in the lambda hook.
117
        """
118
        try:
×
119
            UploadRequestSchema().load(body)
×
120
        except MarshmallowValidationError as e:
×
121
            # for now, log the error and body to sentry
122
            # so we've got visibility on errors
123
            capture_message(f"{e}\n{body}", level="error")
×
124

125
            raise DjangoValidationError("Request body did not match schema")
×
126

127
        files = body["files"]
×
128
        if len(files) == 2 and files[0]["name"] == files[1]["name"]:
×
129
            raise DjangoValidationError("Files must have different names")
×
130

131
    @method_decorator(csrf_protect)
1✔
132
    def post(self, request, *args, **kwargs):
1✔
133
        """
134
        Expects a request matching the UploadRequestSchema
135
        Fetches, and returns, a presigned url for an s3 upload.
136
        This endpoint is called in templates/upload.html from the js,
137
        which then uses the presigned url to upload the users files.
138
        """
139
        body = json.loads(request.body)
×
140

141
        try:
×
142
            self.validate_body(body)
×
143
            try:
×
144
                council = Council.objects.get(pk=self.kwargs["gss"])
×
145
            except Council.DoesNotExist as e:
×
146
                raise DjangoValidationError(str(e))
×
147
        except DjangoValidationError as e:
×
148
            return JsonResponse({"error": e.message}, status=400)
×
149

150
        client = get_s3_client()
×
151
        now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")
×
152
        election_date = body["election_date"]
×
153

154
        resp = {"files": []}
×
155
        for f in body["files"]:
×
156
            bucket_name = settings.S3_UPLOADS_BUCKET
×
157
            object_name = f"{self.kwargs['gss']}/{election_date}/{now}/{f['name']}"
×
158
            fields = {"Content-Type": f["type"]}
×
159
            conditions = [
×
160
                {"Content-Type": f["type"]},
161
                ["content-length-range", 0, settings.MAX_FILE_SIZE],
162
            ]
163
            expiration = 600  # 10 mins
×
164
            try:
×
165
                resp["files"].append(
×
166
                    client.generate_presigned_post(
167
                        bucket_name,
168
                        object_name,
169
                        Fields=fields,
170
                        Conditions=conditions,
171
                        ExpiresIn=expiration,
172
                    )
173
                )
174
                Upload.objects.get_or_create(
×
175
                    gss=council,
176
                    election_date=election_date,
177
                    timestamp=now,
178
                    upload_user=request.user,
179
                )
180
            except ClientError:
×
181
                return JsonResponse(
×
182
                    {"error": "Could not authorize request"}, status=400
183
                )
184
        return JsonResponse(resp, status=200)
×
185

186

187
class CouncilView:
1✔
188
    def get_queryset(self):
1✔
189
        uploads = Upload.objects.all().order_by("-timestamp")
1✔
190
        qs = (
1✔
191
            Council.objects.all()
192
            .exclude(council_id__startswith="N09")
193
            .order_by("name")
194
            .prefetch_related(Prefetch("upload_set", uploads))
195
            .prefetch_related("upload_set__file_set")
196
        )
197
        if not self.request.user.is_staff:
1✔
198
            qs = qs.filter(usercouncils__user=self.request.user)
1✔
199
        return qs
1✔
200

201
    def get_context_data(self, **kwargs):
1✔
202
        context = super().get_context_data(**kwargs)
1✔
203
        context[
1✔
204
            "COUNCILS_WITH_STATIONS"
205
        ] = Council.objects.with_polling_stations_in_db()
206

207
        if self.kwargs.get("pk"):
1✔
208
            upcoming_election_dates = EveryElectionWrapper(
1✔
209
                council_id=self.kwargs["pk"]
210
            ).get_future_election_dates()
211
            context["HAS_UPCOMING_ELECTIONS"] = bool(upcoming_election_dates)
1✔
212
            context["NO_COUNCILS"] = False
1✔
213

214
        return context
1✔
215

216

217
class CouncilListView(CouncilFileUploadAllowedMixin, CouncilView, ListView):
1✔
218
    template_name = "file_uploads/council_list.html"
1✔
219

220
    def get(self, request, *args, **kwargs):
1✔
221
        user_councils = self.get_queryset()
1✔
222

223
        if user_councils.count() == 1:
1✔
224
            return redirect(
×
225
                reverse(
226
                    "file_uploads:councils_detail",
227
                    kwargs={"pk": user_councils.first().pk},
228
                )
229
            )
230
        return super().get(request, *args, **kwargs)
1✔
231

232

233
def get_station_to_example_uprn_map(council: Council) -> dict[str, dict[str, str]]:
1✔
234
    sample_uprn_per_station = (
1✔
235
        UprnToCouncil.objects.filter(lad=council.geography.gss)
236
        .exclude(polling_station_id="")
237
        .values("polling_station_id")
238
        .annotate(max_uprn=Max("uprn"))
239
    )
240
    address_list = Address.objects.filter(
1✔
241
        uprn__in=Subquery(sample_uprn_per_station.values("max_uprn"))
242
    ).values_list(
243
        "uprntocouncil__polling_station_id",
244
        "uprn",
245
        "postcode",
246
    )
247
    return {a[0]: {"uprn": a[1], "postcode": [a[2]]} for a in address_list}
1✔
248

249

250
class CouncilDetailView(CouncilFileUploadAllowedMixin, CouncilView, DetailView):
1✔
251
    template_name = "file_uploads/council_detail.html"
1✔
252

253
    def get_context_data(self, **kwargs):
1✔
254
        context = super().get_context_data(**kwargs)
1✔
255
        context["EC_COUNCIL_CONTACT_EMAIL"] = settings.EC_COUNCIL_CONTACT_EMAIL
1✔
256
        council = context["council"]
1✔
257
        council_from_default_db = Council.objects.using(DEFAULT_DB_ALIAS).get(
1✔
258
            council_id=council.council_id
259
        )
260
        context["STATIONS"] = []
1✔
261
        station_to_example_uprn_map = get_station_to_example_uprn_map(
1✔
262
            council_from_default_db
263
        )
264
        for station in council_from_default_db.pollingstation_set.all():
1✔
265
            context["STATIONS"].append(
×
266
                {
267
                    "address": station.address,
268
                    "postcode": station.postcode,
269
                    "location": "✔️" if station.location else "❌",
270
                    "example_uprn": station_to_example_uprn_map.get(
271
                        station.internal_council_id
272
                    )["uprn"],
273
                    "example_postcode": station_to_example_uprn_map.get(
274
                        station.internal_council_id
275
                    )["postcode"],
276
                    "visibility": VisibilityChoices[station.visibility].label,
277
                    "pk": station.id,
278
                }
279
            )
280
        context["STATIONS"].sort(key=lambda d: d["address"])
1✔
281
        context["live_upload"] = council.live_upload
1✔
282
        context["events"] = council.dataevent_set.all().order_by("-created")
1✔
283
        return context
1✔
284

285

286
class FileDetailView(CouncilFileUploadAllowedMixin, DetailView):
1✔
287
    template_name = "file_uploads/file_detail.html"
1✔
288
    model = File
1✔
289

290

291
class CouncilLoginView(FormView):
1✔
292
    form_class = CouncilLoginForm
1✔
293
    template_name = "file_uploads/council_login.html"
1✔
294

295
    def form_valid(self, form):
1✔
296
        """
297
        Create or retrieve a user trigger the send login email
298
        """
299
        user, created = User.objects.get_or_create(
×
300
            email=form.cleaned_data["email"],
301
            username=form.cleaned_data["email"],
302
        )
303
        if created:
×
304
            user.set_unusable_password()
×
305
            user.save()
×
306

307
        self.send_login_url(user=user)
×
308
        messages.success(
×
309
            self.request,
310
            "Thank you, please check your email for your magic link to log in to your account.",
311
            fail_silently=True,
312
        )
313
        return HttpResponseRedirect(self.get_success_url())
×
314

315
    def send_login_url(self, user):
1✔
316
        """
317
        Send an email to the user with a link to authenticate and log in
318
        """
319
        querystring = get_query_string(user=user)
×
320
        domain = get_domain(request=self.request)
×
321
        path = reverse("file_uploads:council_authenticate")
×
322
        url = f"{self.request.scheme}://{domain}{path}{querystring}"
×
323
        subject = "Your magic link to log in to the WhereDoIVote Uploader"
×
324
        txt = render_to_string(
×
325
            template_name="file_uploads/email/login_message.txt",
326
            context={
327
                "authenticate_url": url,
328
                "subject": subject,
329
            },
330
        )
331
        return user.email_user(subject=subject, message=txt)
×
332

333
    def get_success_url(self):
1✔
334
        """
335
        Redirect to same page where success message will be displayed
336
        """
337
        return reverse("file_uploads:council_login_view")
×
338

339

340
class AuthenticateView(TemplateView):
1✔
341
    template_name = "file_uploads/authenticate.html"
1✔
342

343
    def get(self, request, *args, **kwargs):
1✔
344
        """
345
        Attempts to get user from the request, log them in, and redirect them to
346
        their profile page. Renders an error message if django-sesame fails to
347
        get a user from the request.
348
        """
349
        token = self.request.GET.get("login_token")
×
350
        if not token:
×
351
            return redirect(reverse("file_uploads:council_login_view"))
×
352
        user = get_user(token)
×
353
        if not user:
×
354
            return super().get(request, *args, **kwargs)
×
355
        login(request, user, backend="sesame.backends.ModelBackend")
×
356
        assign_councils_to_user(user)
×
357
        return redirect("file_uploads:councils_list")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc