• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DemocracyClub / yournextrepresentative / 06fffc96-31fa-407b-8443-1731e4db6cf1

04 Apr 2024 08:24AM CUT coverage: 68.368% (+1.1%) from 67.282%
06fffc96-31fa-407b-8443-1731e4db6cf1

Pull #2269

circleci

symroe
Clean up some testsing code

Misc fixes to the test code
Pull Request #2269: Redesign SOPN models

1749 of 2922 branches covered (59.86%)

Branch coverage included in aggregate %.

415 of 488 new or added lines in 25 files covered. (85.04%)

18 existing lines in 4 files now uncovered.

7093 of 10011 relevant lines covered (70.85%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.43
/ynr/apps/sopn_parsing/helpers/parse_tables.py
1
import json
1✔
2
import re
1✔
3

4
from bulk_adding.models import RawPeople
1✔
5
from candidates.models import Ballot
1✔
6
from django.contrib.postgres.search import TrigramSimilarity
1✔
7
from django.core.files.base import ContentFile
1✔
8
from django.core.files.storage import DefaultStorage
1✔
9
from django.db.models import Value
1✔
10
from django.db.models.functions import Replace
1✔
11
from django.db.utils import DataError
1✔
12
from nameparser import HumanName
1✔
13
from pandas import DataFrame
1✔
14
from parties.models import Party, PartyDescription
1✔
15
from sopn_parsing.helpers.text_helpers import clean_text
1✔
16
from utils.db import Levenshtein
1✔
17

18
FIRST_NAME_FIELDS = [
1✔
19
    "other name",
20
    "other names",
21
    "candidate forename",
22
    "candidates other names",
23
    "other names in full",
24
    "other names / enwau eraill",
25
]
26
LAST_NAME_FIELDS = [
1✔
27
    "surname",
28
    "candidate surname",
29
    "candidates surname",
30
    "last name",
31
    "surname / cyfenw",
32
]
33
WELSH_NAME_FIELDS = [
1✔
34
    "enwr ymgeisydd",
35
    "enwr ymgeisydd candidate name",
36
    "enwr ymgeisydd name of candidate",
37
]
38
NAME_FIELDS = (
1✔
39
    FIRST_NAME_FIELDS
40
    + LAST_NAME_FIELDS
41
    + [
42
        "name of candidate",
43
        "names of candidate",
44
        "candidate name",
45
        "surname other names",
46
        "surname other names in full",
47
    ]
48
    + WELSH_NAME_FIELDS
49
)
50

51
INDEPENDENT_VALUES = ["Independent", "", "Annibynnol"]
1✔
52

53
WELSH_DESCRIPTION_VALUES = [
1✔
54
    "disgrifiad",
55
    "disgrifiad or ymgeisydd",
56
    "disgrifiad or ymgeisydd description of candidate",
57
]
58
DESCRIPTION_VALUES = [
1✔
59
    "description of candidate",
60
    "description",
61
] + WELSH_DESCRIPTION_VALUES
62

63

64
def iter_rows(data):
1✔
65
    counter = 0
1✔
66
    more = True
1✔
67
    while more:
1✔
68
        try:
1✔
69
            yield data.iloc[counter]
1✔
70
            counter += 1
1✔
71
        except IndexError:
1✔
72
            more = False
1✔
73

74

75
def merge_row_cells(row):
1✔
76
    return [c for c in row if c]
1✔
77

78

79
def clean_row(row):
1✔
80
    return [clean_text(c) for c in row]
1✔
81

82

83
def contains_header_like_strings(row):
1✔
84
    row_string = clean_text(row.to_string())
1✔
85
    if any(s in row_string for s in NAME_FIELDS):
1!
86
        return True
1✔
87
    return False
×
88

89

90
def looks_like_header(row, avg_row):
1✔
91
    avg_row = avg_row - 3
1✔
92
    if len(merge_row_cells(row)) >= avg_row and contains_header_like_strings(
1!
93
        row
94
    ):
95
        return True
1✔
96
    return False
×
97

98

99
def order_name_fields(name_fields):
1✔
100
    """
101
    Takes a list of name fields and attempts to find a field with in the
102
    LAST_NAME_FIELDS and move to the end of the list
103
    """
104
    for index, field in enumerate(name_fields):
1!
105
        if field in LAST_NAME_FIELDS:
1✔
106
            # found the fieldname we think is for the last name,
107
            # so move that to the end of our name fields
108
            name_fields.append(name_fields.pop(index))
1✔
109
            break
1✔
110

111
    return name_fields
1✔
112

113

114
def get_name_fields(row):
1✔
115
    """
116
    Returns a list of name fields. This could be a single field or multiple
117
    fields.
118
    """
119
    name_fields = [cell for cell in row if cell in NAME_FIELDS]
1✔
120
    if not name_fields:
1✔
121
        raise ValueError("No name guess for {}".format(row))
1✔
122
    return name_fields
1✔
123

124

125
def guess_description_field(row):
1✔
126
    for cell in row:
1!
127
        if cell in DESCRIPTION_VALUES:
1✔
128
            return cell
1✔
129
    raise ValueError("No description guess for {}".format(row))
×
130

131

132
def guess_previous_party_affiliations_field(data, sopn):
1✔
133
    data = clean_row(data)
1✔
134
    if not sopn.sopn.ballot.is_welsh_run:
1✔
135
        return None
1✔
136

137
    field_value = None
1✔
138

139
    for cell in data:
1!
140
        if cell in ["statement of party membership"]:  # this could become more
1✔
141
            field_value = cell
1✔
142
            break
1✔
143

144
    return field_value
1✔
145

146

147
def clean_name(name):
1✔
148
    """
149
    - Strips some special characters from the name string
150
    - Splits the string in to a list, removing any empty strings
151
    - Build a string to represent the last name by looking for all words that are in all caps
152
    - Build a string to represent the other names by looking for all words not in all caps
153
    - Strip whitespace in case last_names is empty and return string titleized
154
    """
155
    name = name.replace("\n", " ")
1✔
156
    name = name.replace("`", "'")
1✔
157
    name = name.replace("\u2013", "\u002d")
1✔
158
    # remove multiple whitespaces
159
    name = " ".join(name.split())
1✔
160
    # this can leave extra whitespace after special chars so remove these
161
    name = name.replace("- ", "-")
1✔
162
    name = name.replace("' ", "'")
1✔
163

164
    if "commonly known as" in name:
1!
165
        name = name.replace(")", "")
×
166
        name = name.split("commonly known as")[-1].replace(")", "").strip()
×
167

168
    names = list(filter(None, name.split(" ")))
1✔
169
    last_names = clean_last_names(names)
1✔
170
    first_names = " ".join([name for name in names if not name.isupper()])
1✔
171
    return f"{first_names} {last_names}".strip()
1✔
172

173

174
## Handles Mc and Mac and other mixed titlecase names
175
def clean_last_names(names):
1✔
176
    last_names = " ".join([name for name in names if name.isupper()])
1✔
177
    last_names = HumanName(last_names)
1✔
178
    last_names.capitalize()
1✔
179
    return str(last_names)
1✔
180

181

182
def clean_description(description):
1✔
183
    description = str(description)
1✔
184
    description = description.replace("\\n", "")
1✔
185
    description = description.replace("\n", "")
1✔
186
    description = description.replace("`", "'")
1✔
187
    description = description.replace("&", "and")
1✔
188
    # change dash to hyphen to match how they are stored in our DB
189
    description = description.replace("\u2013", "\u002d")
1✔
190
    description = re.sub(r"\s+", " ", description)
1✔
191
    # handle edgecases for the green party to stop incorrectly matching against
192
    # Welsh descriptions
193
    if description.lower() in ["the green party", "the green party candidate"]:
1!
194
        description = "Green Party"
×
195
    return description
1✔
196

197

198
def get_description(description, sopn):
1✔
199
    description = clean_description(description)
1✔
200

201
    if not description:
1!
202
        return None
×
203
    if description in INDEPENDENT_VALUES:
1✔
204
        return None
1✔
205

206
    register = sopn.sopn.ballot.post.party_set.slug.upper()
1✔
207

208
    # First try to get Party object with an exact match between parsed
209
    # description and the Party name
210

211
    # annotate search_text field to both QuerySets which normalizes name field
212
    # by changing '&' to 'and' this is then used instead of the name field for
213
    # string matching
214
    party_qs = (
1✔
215
        Party.objects.register(register)
216
        .current()
217
        .annotate(search_text=Replace("name", Value("&"), Value("and")))
218
    )
219
    party = party_qs.filter(search_text=description)
1✔
220
    # If we find one, return None, so that the pain Party object
221
    # is parsed in get_party below, and this will then be preselected
222
    # for the user on the form.
223
    if party.exists():
1!
224
        return None
1✔
225

226
    party_description_qs = PartyDescription.objects.annotate(
×
227
        search_text=Replace("description", Value("&"), Value("and"))
228
    )
229
    try:
×
230
        return party_description_qs.get(
×
231
            search_text=description, party__register=register
232
        )
233
    except (
×
234
        PartyDescription.DoesNotExist,
235
        PartyDescription.MultipleObjectsReturned,
236
    ) as e:
237
        print(e)
×
238
        pass
×
239

240
    # try to find any that start with parsed description
241
    description_obj = party_description_qs.filter(
×
242
        search_text__istartswith=description, party__register=register
243
    ).first()
244
    if description_obj:
×
245
        return description_obj
×
246

247
    # Levenshtein
248
    try:
×
249
        qs = party_description_qs.annotate(
×
250
            lev_dist=Levenshtein("search_text", Value(description))
251
        ).order_by("lev_dist")
252
        description_obj = qs.filter(lev_dist__lte=5).first()
×
253
        if description_obj:
×
254
            print(
×
255
                f"{description} matched with {description_obj.description} with a distance of {description_obj.lev_dist}"
256
            )
257
            return description_obj
×
258
    except ValueError:
×
259
        print("Levenshtein failed")
×
260
        pass
×
261

262
    # final check - if this is a Welsh version of a description, it will be at
263
    # the end of the description
264
    try:
×
265
        return party_description_qs.filter(
×
266
            search_text__endswith=f"| {description}", party__register=register
267
        ).first()
268
    except PartyDescription.DoesNotExist:
×
269
        print(f"Couldn't find description for {description}")
×
270
        pass
×
271

272

273
def get_party(description_model, description_str, sopn):
1✔
274
    if description_model:
1!
275
        return description_model.party
×
276

277
    party_name = clean_description(description_str)
1✔
278
    register = sopn.sopn.ballot.post.party_set.slug.upper()
1✔
279

280
    # annotate search_text field which normalizes name field by changing '&' to 'and'
281
    # this is then used instead of the name field for string matching
282
    qs = (
1✔
283
        Party.objects.register(register)
284
        .active_for_date(date=sopn.sopn.ballot.election.election_date)
285
        .annotate(search_text=Replace("name", Value("&"), Value("and")))
286
    )
287
    if not party_name or party_name in INDEPENDENT_VALUES:
1✔
288
        return Party.objects.get(ec_id="ynmp-party:2")
1✔
289

290
    try:
1✔
291
        return qs.get(search_text=party_name)
1✔
292
    except Party.DoesNotExist:
×
293
        party_obj = None
×
294

295
    qs = qs.annotate(
×
296
        lev_dist=Levenshtein("search_text", Value(party_name))
297
    ).order_by("lev_dist")
298
    party_obj = qs.filter(lev_dist__lte=5).first()
×
299
    if party_obj:
×
300
        print(
×
301
            f"{party_name} matched with {party_obj.name} with a distance of {party_obj.lev_dist}"
302
        )
303
        return party_obj
×
304

305
    # Last resort attempt - look for the most similar party object to help when
306
    # parsed name is missing a whitespace e.g. Barnsley IndependentGroup
307
    qs = qs.annotate(similarity=TrigramSimilarity("name", party_name)).order_by(
×
308
        "-similarity"
309
    )
310

311
    party_obj = qs.filter(similarity__gte=0.5).first()
×
312
    if not party_obj:
×
313
        closest = qs.first()
×
314
        print(f"Couldn't find party for {party_name}.")
×
315
        print(f"Closest is {closest.name} with similarity {closest.similarity}")
×
316

317
    return party_obj
×
318

319

320
def get_name(row, name_fields):
1✔
321
    """
322
    Takes a list of name fields and returns a string of the values of each of
323
    the name fields in the row
324
    """
325
    name = " ".join([row[field] for field in name_fields])
1✔
326
    return clean_name(name)
1✔
327

328

329
def add_previous_party_affiliations(party_str, raw_data, sopn):
1✔
330
    """
331
    Attempts to find previous party affiliations and add them to the data
332
    object. If no party can be found, returns the data unchanged.
333
    """
334
    if not party_str:
1✔
335
        return raw_data
1✔
336

337
    party = get_party(
1✔
338
        description_model=None, description_str=party_str, sopn=sopn
339
    )
340

341
    if not party:
1✔
342
        return raw_data
1✔
343

344
    raw_data["previous_party_affiliations"] = [party.ec_id]
1✔
345
    return raw_data
1✔
346

347

348
def parse_table(sopn, data):
1✔
349
    data.columns = clean_row(data.columns)
1✔
350

351
    name_fields = get_name_fields(data.columns)
1✔
352

353
    # if we have more than one name field try to order them
354
    if len(name_fields) > 1:
1!
355
        name_fields = order_name_fields(name_fields)
×
356

357
    description_field = guess_description_field(data.columns)
1✔
358
    previous_party_affiliations_field = guess_previous_party_affiliations_field(
1✔
359
        data=data.columns, sopn=sopn
360
    )
361

362
    ballot_data = []
1✔
363
    for row in iter_rows(data):
1✔
364
        name = get_name(row, name_fields)
1✔
365
        # if we couldnt parse a candidate name skip this row
366
        if not name:
1!
367
            continue
×
368

369
        description_obj = get_description(
1✔
370
            description=row[description_field], sopn=sopn
371
        )
372
        party_obj = get_party(
1✔
373
            description_model=description_obj,
374
            description_str=row[description_field],
375
            sopn=sopn,
376
        )
377
        if not party_obj:
1!
378
            continue
×
379

380
        data = {"name": name, "party_id": party_obj.ec_id}
1✔
381
        if description_obj:
1!
382
            data["description_id"] = description_obj.pk
×
383

384
        if previous_party_affiliations_field:
1✔
385
            data = add_previous_party_affiliations(
1✔
386
                party_str=row[previous_party_affiliations_field],
387
                raw_data=data,
388
                sopn=sopn,
389
            )
390

391
        ballot_data.append(data)
1✔
392
    return ballot_data
1✔
393

394

395
def parse_raw_data_for_ballot(ballot):
1✔
396
    """
397

398
    :type ballot: candidates.models.Ballot
399
    """
400
    if ballot.candidates_locked:
1!
401
        raise ValueError(
×
402
            f"Can't parse a locked ballot {ballot.ballot_paper_id}"
403
        )
404

405
    if ballot.suggestedpostlock_set.exists():
1!
406
        raise ValueError(
×
407
            f"Can't parse a ballot with lock suggestions {ballot.ballot_paper_id}"
408
        )
409
    # at this point, we may have two sets of data that need to both follow the same
410
    # parsing process. We need parse both but do we only save one to the RawPeople model?
411
    # or do we save both? If we save both, we need to make sure that the data is
412
    # consistent between the two sets of data. If we only save one, which one do we save?
413
    # do we save the one that has the most data? or do we save the one that has the most
414
    # data that matches the data in the RawPeople model? We should let the user choose
415
    # which one to save. In this case, we need to present the user with the two sets of
416
    # data and let them choose which one to save.
417
    parse_raw_data(ballot)
1✔
418

419

420
def parse_dataframe(ballot: Ballot, df: DataFrame):
1✔
421
    cell_counts = [len(merge_row_cells(c)) for c in iter_rows(df)]
1✔
422

423
    header_found = False
1✔
424
    avg_row = sum(cell_counts) / float(len(cell_counts) or 1)
1✔
425
    for row in iter_rows(df):
1✔
426
        if not header_found:
1✔
427
            if looks_like_header(row, avg_row):
1!
428
                df.columns = row
1✔
429
                df = df.drop(row.name)
1✔
430
                header_found = True
1✔
431
            else:
432
                try:
×
433
                    df = df.drop(row.name)
×
434
                except IndexError:
×
435
                    break
×
436
    if not header_found:
1!
437
        # Don't try to parse if we don't think we know the header
438
        print(f"We couldn't find a header for {ballot.ballot_paper_id}")
×
439
        return None
×
440
    # We're now in a position where we think we have the table we want
441
    # with the columns set and other header rows removed.
442
    # Time to parse it in to names and parties
443
    try:
1✔
444
        return parse_table(ballot, df)
1✔
445
    except ValueError as e:
×
446
        # Something went wrong. This will happen a lot. let's move on
447
        print(f"Error attempting to parse a table for {ballot.ballot_paper_id}")
×
448
        print(e.args[0])
×
449
        return None
×
450

451

452
def parse_raw_data(ballot: Ballot, reparse=False):
1✔
453
    """
454
    Given a Ballot, go and get the Camelot and the AWS Textract dataframes
455
    and process them
456
    """
457

458
    camelot_model = getattr(ballot.sopn, "camelotparsedsopn", None)
1✔
459
    camelot_data = {}
1✔
460
    textract_model = getattr(ballot.sopn, "awstextractparsedsopn", None)
1✔
461
    textract_data = {}
1✔
462
    if (
1!
463
        camelot_model
464
        and camelot_model.raw_data_type == "pandas"
465
        and (reparse or not camelot_model.parsed_data)
466
    ):
467
        camelot_data = parse_dataframe(ballot, camelot_model.as_pandas)
1✔
468
    if (
1!
469
        textract_model
470
        and textract_model.raw_data
471
        and textract_model.raw_data_type == "pandas"
472
        and (reparse or not textract_model.parsed_data)
473
    ):
UNCOV
474
        textract_data = parse_dataframe(ballot, textract_model.as_pandas)
×
475

476
    if camelot_data or textract_data:
1!
477
        # Check there isn't a rawpeople object from another (better) source
478
        rawpeople_qs = RawPeople.objects.filter(ballot=ballot).exclude(
1✔
479
            source_type=RawPeople.SOURCE_PARSED_PDF
480
        )
481
        if not rawpeople_qs.exists():
1!
482
            try:
1✔
483
                RawPeople.objects.update_or_create(
1✔
484
                    ballot=ballot,
485
                    defaults={
486
                        "data": camelot_data,
487
                        "textract_data": textract_data,
488
                        "source": "Parsed from {}".format(
489
                            ballot.sopn.source_url
490
                        ),
491
                        "source_type": RawPeople.SOURCE_PARSED_PDF,
492
                    },
493
                )
494
            except DataError:
×
495
                print(
×
496
                    f"DataError attempting to save RawPeople for {ballot.ballot_paper_id}"
497
                )
498
                return
×
499
        # We've done the parsing, so let's still save the result
500
        storage = DefaultStorage()
1✔
501
        storage.save(
1✔
502
            f"raw_people/camelot_{ballot.ballot_paper_id}.json",
503
            ContentFile(json.dumps(camelot_data, indent=4).encode("utf8")),
504
        )
505
        storage.save(
1✔
506
            f"raw_people/textract_{ballot.ballot_paper_id}.json",
507
            ContentFile(json.dumps(textract_data, indent=4).encode("utf8")),
508
        )
509
        if camelot_model:
1!
510
            ballot.sopn.camelotparsedsopn.status = "parsed"
1✔
511
            ballot.sopn.camelotparsedsopn.save()
1✔
512
        if textract_model:
1!
513
            ballot.sopn.awstextractparsedsopn.status = "parsed"
×
514
            ballot.sopn.awstextractparsedsopn.save()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc