• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DemocracyClub / WhoCanIVoteFor / e1a02576-fed3-4822-b3d7-83768ed61fb1

09 Oct 2025 04:45PM UTC coverage: 58.669% (+0.1%) from 58.523%
e1a02576-fed3-4822-b3d7-83768ed61fb1

Pull #2299

circleci

chris48s
register markdown_subset filter, use it for candidate statements
Pull Request #2299: WIP render good markdown, but not bad markdown

11 of 11 new or added lines in 1 file covered. (100.0%)

268 existing lines in 11 files now uncovered.

2927 of 4989 relevant lines covered (58.67%)

0.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.67
/wcivf/apps/people/management/commands/import_people.py
1
import json
1✔
2
import os
1✔
3
import shutil
1✔
4
import tempfile
1✔
5
from urllib.parse import urlencode
1✔
6

7
import requests
1✔
8
from core.helpers import show_data_on_error
1✔
9
from dateutil.parser import parse
1✔
10
from django.conf import settings
1✔
11
from django.core.management.base import BaseCommand
1✔
12
from django.db import transaction
1✔
13
from elections.import_helpers import YNRBallotImporter
1✔
14
from elections.models import PostElection
1✔
15
from parties.models import Party
1✔
16
from people.models import Person, PersonRedirect
1✔
17

18
from wcivf.apps.elections.import_helpers import time_function_length
1✔
19
from wcivf.apps.people.import_helpers import YNRPersonImporter
1✔
20

21

22
class Command(BaseCommand):
1✔
23
    def add_arguments(self, parser):
1✔
24
        parser.add_argument(
×
25
            "--recently-updated",
26
            action="store_true",
27
            dest="recently_updated",
28
            default=False,
29
            help="Import changes in the last `n` minutes",
30
        )
31

32
        parser.add_argument(
×
33
            "--since",
34
            action="store",
35
            dest="since",
36
            type=self.valid_date,
37
            help="Import changes since [datetime]",
38
        )
39
        parser.add_argument(
×
40
            "--exclude-candidacies",
41
            action="store_true",
42
            dest="exclude_candidacies",
43
            default=False,
44
            help="Ignore candidacies when importing people",
45
        )
46

47
    def valid_date(self, value):
1✔
48
        return parse(value)
×
49

50
    def handle(self, **options):
1✔
51
        self.options = options
×
52
        self.ballot_importer = YNRBallotImporter(stdout=self.stdout)
×
53

UNCOV
54
        try:
×
55
            person = Person.objects.latest()
×
56
            self.stdout.write(
×
57
                f"Using timestamp from {person.name} (PK:{person.pk} TS:{person.last_updated})"
58
            )
UNCOV
59
            last_updated = person.last_updated
×
60
            self.past_time_str = str(last_updated)
×
61
        except Person.DoesNotExist:
×
62
            # In case this is the first run
UNCOV
63
            self.past_time_str = "1800-01-01"
×
64
        if self.options["since"]:
×
65
            self.past_time_str = self.options["since"]
×
66

UNCOV
67
        self.past_time_str = str(self.past_time_str)
×
68

UNCOV
69
        if options["recently_updated"]:
×
70
            importer = YNRPersonImporter(params={"last_updated": last_updated})
×
71
            for page in importer.people_to_import:
×
72
                self.add_people(results=page)
×
73

74
        else:
UNCOV
75
            self.dirpath = tempfile.mkdtemp()
×
76
            self.download_pages()
×
77
            self.add_to_db()
×
78
            shutil.rmtree(self.dirpath)
×
79

UNCOV
80
        self.delete_merged_people()
×
81
        self.delete_orphaned_people()
×
82

83
    def add_to_db(self):
1✔
UNCOV
84
        self.existing_people = set(Person.objects.values_list("pk", flat=True))
×
UNCOV
85
        self.seen_people = set()
×
86

87
        files = [f for f in os.listdir(self.dirpath) if f.endswith(".json")]
×
UNCOV
88
        files = sorted(files, key=lambda k: int(k.split("-")[-1].split(".")[0]))
×
89
        for file in files:
×
90
            self.stdout.write("Importing {}".format(file))
×
91
            with open(os.path.join(self.dirpath, file), "r") as f:
×
92
                results = json.loads(f.read())
×
93
                self.add_people(results)
×
94

95
        should_clean_up = not any(
×
96
            [
97
                self.options["recently_updated"],
98
                self.options["since"],
99
            ]
100
        )
UNCOV
101
        if should_clean_up:
×
UNCOV
102
            deleted_ids = self.existing_people.difference(self.seen_people)
×
UNCOV
103
            Person.objects.filter(ynr_id__in=deleted_ids).delete()
×
104

105
    def save_page(self, url, page):
1✔
106
        # get the file name from the page number
107
        if "cached-api" in url:
×
108
            filename = url.split("/")[-1]
×
109
        else:
UNCOV
110
            if "page=" in url:
×
UNCOV
111
                page_number = url.split("page=")[1].split("&")[0]
×
112

113
            else:
UNCOV
114
                page_number = 1
×
115
            filename = "page-{}.json".format(page_number)
×
116
        file_path = os.path.join(self.dirpath, filename)
×
117

118
        # Save the page
119
        with open(file_path, "w") as f:
×
120
            f.write(page)
×
121

122
    def download_pages(self):
1✔
UNCOV
123
        params = {"page_size": "200"}
×
124
        if self.options["recently_updated"] or self.options["since"]:
×
125
            params["last_updated"] = self.past_time_str
×
126

UNCOV
127
            next_page = settings.YNR_BASE + "/api/next/people/?{}".format(
×
128
                urlencode(params)
129
            )
130
        else:
UNCOV
131
            next_page = (
×
132
                settings.YNR_BASE
133
                + "/media/cached-api/latest/people-000001.json"
134
            )
135

136
        while next_page:
×
UNCOV
137
            self.stdout.write("Downloading {}".format(next_page))
×
UNCOV
138
            req = requests.get(next_page)
×
UNCOV
139
            req.raise_for_status()
×
UNCOV
140
            page = req.text
×
141
            results = req.json()
×
142
            self.save_page(next_page, page)
×
143
            next_page = results.get("next")
×
144

145
    @time_function_length
1✔
146
    @transaction.atomic
1✔
147
    def add_people(self, results):
1✔
148
        self.stdout.write(f"Found {results['count']} people to import")
×
UNCOV
149
        for person in results["results"]:
×
UNCOV
150
            with show_data_on_error("Person {}".format(person["id"]), person):
×
UNCOV
151
                person_obj = Person.objects.update_or_create_from_ynr(person)
×
UNCOV
152
                self.stdout.write(
×
153
                    f"Updated {person_obj.name} ({person_obj.pk})"
154
                )
155

156
                if self.options["recently_updated"]:
×
157
                    self.delete_old_candidacies(
×
158
                        person_data=person,
159
                        person_obj=person_obj,
160
                    )
161
                    if not self.options["exclude_candidacies"]:
×
162
                        self.update_candidacies(
×
163
                            person_data=person, person_obj=person_obj
164
                        )
165
                    # dont keep track of seen people in a recent update
166
                    continue
×
167

UNCOV
168
                if person["candidacies"]:
×
UNCOV
169
                    self.seen_people.add(person_obj.pk)
×
170

171
    def delete_old_candidacies(self, person_data, person_obj):
1✔
172
        """
173
        Delete any candidacies that have been deleted upstream in YNR
174
        """
175
        ballot_paper_ids = [
1✔
176
            c["ballot"]["ballot_paper_id"] for c in person_data["candidacies"]
177
        ]
178

179
        count, _ = person_obj.personpost_set.exclude(
1✔
180
            post_election__ballot_paper_id__in=ballot_paper_ids
181
        ).delete()
182
        self.stdout.write(f"Deleted {count} candidacies for {person_obj.name}")
1✔
183

184
    def update_candidacies(self, person_data, person_obj):
1✔
185
        """
186
        Loops through candidacy dictionaries in the person data and updates or
187
        creates the candidacy object for the Person
188
        """
189
        for candidacy in person_data["candidacies"]:
1✔
190
            ballot_paper_id = candidacy["ballot"]["ballot_paper_id"]
1✔
191
            try:
1✔
192
                ballot = PostElection.objects.get(
1✔
193
                    ballot_paper_id=ballot_paper_id
194
                )
UNCOV
195
            except PostElection.DoesNotExist:
×
196
                # This might be because we've not run import_ballots
197
                # recently enough. Let's import just the ballots for this
198
                # date
UNCOV
199
                date = ballot_paper_id.split(".")[-1]
×
200
                self.import_ballots_for_date(date=date)
×
UNCOV
201
                ballot = PostElection.objects.get(
×
202
                    ballot_paper_id=ballot_paper_id
203
                )
204

205
            # TODO check if the post/election could have changed and should be
206
            # used in defaults dict
207
            defaults = {
1✔
208
                "party_id": candidacy["party"]["legacy_slug"],
209
                "list_position": candidacy["party_list_position"],
210
                "deselected": candidacy["deselected"],
211
                "deselected_source": candidacy["deselected_source"],
212
                "elected": candidacy["elected"],
213
                "party_name": candidacy["party_name"],
214
                "party_description_text": candidacy["party_description_text"],
215
            }
216
            # TODO add this to YNR CandidacyOnPersonSerializer
217
            if candidacy.get("result"):
1✔
UNCOV
218
                num_ballots = candidacy["result"].get("num_ballots", None)
×
UNCOV
219
                defaults["votes_cast"] = num_ballots
×
220

221
            personpost, created = person_obj.personpost_set.update_or_create(
1✔
222
                post_election=ballot,
223
                post=ballot.post,
224
                election=ballot.election,
225
                defaults=defaults,
226
            )
227
            for party in candidacy.get("previous_party_affiliations", []):
1✔
228
                # if the previous party affiliation is the same as the
229
                # party on the candidacy skip it
230
                party_id = party["legacy_slug"]
1✔
231
                if party_id == personpost.party_id:
1✔
UNCOV
232
                    continue
×
233

234
                try:
1✔
235
                    party = Party.objects.get(party_id=party["legacy_slug"])
1✔
UNCOV
236
                except Party.DoesNotExist:
×
237
                    continue
×
238
                personpost.previous_party_affiliations.add(party)
1✔
239

240
            msg = f"{personpost} was {'created' if created else 'updated'}"
1✔
241
            self.stdout.write(msg=msg)
1✔
242

243
    def import_ballots_for_date(self, date):
1✔
UNCOV
244
        self.ballot_importer.do_import(params={"election_date": date})
×
245

246
    @time_function_length
1✔
247
    def delete_merged_people(self):
1✔
UNCOV
248
        url = f"{settings.YNR_BASE}/api/next/person_redirects/?page_size=200&updated_gte={self.past_time_str}"
×
UNCOV
249
        if settings.YNR_API_KEY:
×
UNCOV
250
            url = f"{url}&auth_token={settings.YNR_API_KEY}"
×
UNCOV
251
        merged_ids = []
×
UNCOV
252
        while url:
×
253
            req = requests.get(url)
×
UNCOV
254
            page = req.json()
×
UNCOV
255
            for result in page.get("results", []):
×
256
                merged_ids.append(result["old_person_id"])
×
UNCOV
257
                PersonRedirect.objects.get_or_create(
×
258
                    old_person_id=result["old_person_id"],
259
                    new_person_id=result["new_person_id"],
260
                )
261
            url = page.get("next")
×
UNCOV
262
        Person.objects.filter(ynr_id__in=merged_ids).delete()
×
263

264
    @time_function_length
1✔
265
    def delete_orphaned_people(self):
1✔
266
        """
267
        Delete all people without candidacies
268
        """
UNCOV
269
        _, deleted_dict = Person.objects.filter(
×
270
            personpost__isnull=True
271
        ).delete()
UNCOV
272
        count = deleted_dict.get("people.Person", 0)
×
273
        self.stdout.write(f"Deleted {count} orphaned People objects")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc