• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DemocracyClub / yournextrepresentative / 74df76c8-4768-48d5-bb7f-5ee50aa05217

06 Nov 2023 01:38PM UTC coverage: 67.523% (-0.3%) from 67.801%
74df76c8-4768-48d5-bb7f-5ee50aa05217

Pull #2177

circleci

VirginiaDooley
Create TextractResults model
Pull Request #2177: Spike: AWS Textract

1640 of 2760 branches covered (0.0%)

Branch coverage included in aggregate %.

12 of 62 new or added lines in 3 files covered. (19.35%)

110 existing lines in 10 files now uncovered.

6662 of 9535 relevant lines covered (69.87%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.37
/ynr/apps/sopn_parsing/helpers/parse_tables.py
1
import json
1✔
2
import re
1✔
3
from os.path import join
1✔
4

5
from bulk_adding.models import RawPeople
1✔
6
from django.contrib.postgres.search import TrigramSimilarity
1✔
7
from django.core.files.base import ContentFile
1✔
8
from django.core.files.storage import DefaultStorage
1✔
9
from django.db.models import Value
1✔
10
from django.db.models.functions import Replace
1✔
11
from nameparser import HumanName
1✔
12
from parties.models import Party, PartyDescription
1✔
13
from sopn_parsing.helpers.text_helpers import clean_text
1✔
14
from sopn_parsing.models import ParsedSOPN
1✔
15
from utils.db import Levenshtein
1✔
16

17
FIRST_NAME_FIELDS = [
1✔
18
    "other name",
19
    "other names",
20
    "candidate forename",
21
    "candidates other names",
22
    "other names in full",
23
    "other names / enwau eraill",
24
]
25
LAST_NAME_FIELDS = [
1✔
26
    "surname",
27
    "candidate surname",
28
    "candidates surname",
29
    "last name",
30
    "surname / cyfenw",
31
]
32
WELSH_NAME_FIELDS = [
1✔
33
    "enwr ymgeisydd",
34
    "enwr ymgeisydd candidate name",
35
    "enwr ymgeisydd name of candidate",
36
]
37
NAME_FIELDS = (
1✔
38
    FIRST_NAME_FIELDS
39
    + LAST_NAME_FIELDS
40
    + [
41
        "name of candidate",
42
        "names of candidate",
43
        "candidate name",
44
        "surname other names",
45
        "surname other names in full",
46
    ]
47
    + WELSH_NAME_FIELDS
48
)
49

50

51
INDEPENDENT_VALUES = ["Independent", "", "Annibynnol"]
1✔
52

53
WELSH_DESCRIPTION_VALUES = [
1✔
54
    "disgrifiad",
55
    "disgrifiad or ymgeisydd",
56
    "disgrifiad or ymgeisydd description of candidate",
57
]
58
DESCRIPTION_VALUES = [
1✔
59
    "description of candidate",
60
    "description",
61
] + WELSH_DESCRIPTION_VALUES
62

63

64
def iter_rows(data):
1✔
65
    counter = 0
1✔
66
    more = True
1✔
67
    while more:
1✔
68
        try:
1✔
69
            yield data.iloc[counter]
1✔
70
            counter += 1
1✔
71
        except IndexError:
1✔
72
            more = False
1✔
73

74

75
def merge_row_cells(row):
1✔
76
    return [c for c in row if c]
1✔
77

78

79
def clean_row(row):
1✔
80
    return [clean_text(c) for c in row]
1✔
81

82

83
def contains_header_like_strings(row):
1✔
84
    row_string = clean_text(row.to_string())
1✔
85
    if any(s in row_string for s in NAME_FIELDS):
1!
86
        return True
1✔
UNCOV
87
    return False
×
88

89

90
def looks_like_header(row, avg_row):
1✔
91
    avg_row = avg_row - 3
1✔
92
    if len(merge_row_cells(row)) >= avg_row and contains_header_like_strings(
1!
93
        row
94
    ):
95
        return True
1✔
UNCOV
96
    return False
×
97

98

99
def order_name_fields(name_fields):
1✔
100
    """
101
    Takes a list of name fields and attempts to find a field with in the
102
    LAST_NAME_FIELDS and move to the end of the list
103
    """
104
    for index, field in enumerate(name_fields):
1!
105
        if field in LAST_NAME_FIELDS:
1✔
106
            # found the fieldname we think is for the last name,
107
            # so move that to the end of our name fields
108
            name_fields.append(name_fields.pop(index))
1✔
109
            break
1✔
110

111
    return name_fields
1✔
112

113

114
def get_name_fields(row):
1✔
115
    """
116
    Returns a list of name fields. This could be a single field or multiple
117
    fields.
118
    """
119
    name_fields = [cell for cell in row if cell in NAME_FIELDS]
1✔
120
    if not name_fields:
1✔
121
        raise ValueError("No name guess for {}".format(row))
1✔
122
    return name_fields
1✔
123

124

125
def guess_description_field(row):
1✔
126
    for cell in row:
1!
127
        if cell in DESCRIPTION_VALUES:
1✔
128
            return cell
1✔
UNCOV
129
    raise ValueError("No description guess for {}".format(row))
×
130

131

132
def guess_previous_party_affiliations_field(data, sopn):
1✔
133
    data = clean_row(data)
1✔
134
    if not sopn.sopn.ballot.is_welsh_run:
1✔
135
        return None
1✔
136

137
    field_value = None
1✔
138

139
    for cell in data:
1!
140
        if cell in ["statement of party membership"]:  # this could become more
1✔
141
            field_value = cell
1✔
142
            break
1✔
143

144
    return field_value
1✔
145

146

147
def clean_name(name):
1✔
148
    """
149
    - Strips some special characters from the name string
150
    - Splits the string in to a list, removing any empty strings
151
    - Build a string to represent the last name by looking for all words that are in all caps
152
    - Build a string to represent the other names by looking for all words not in all caps
153
    - Strip whitespace in case last_names is empty and return string titleized
154
    """
155
    name = name.replace("\n", " ")
1✔
156
    name = name.replace("`", "'")
1✔
157
    name = name.replace("\u2013", "\u002d")
1✔
158
    # remove multiple whitespaces
159
    name = " ".join(name.split())
1✔
160
    # this can leave extra whitespace after special chars so remove these
161
    name = name.replace("- ", "-")
1✔
162
    name = name.replace("' ", "'")
1✔
163

164
    if "commonly known as" in name:
1!
UNCOV
165
        name = name.replace(")", "")
×
166
        name = name.split("commonly known as")[-1].replace(")", "").strip()
×
167

168
    names = list(filter(None, name.split(" ")))
1✔
169
    last_names = clean_last_names(names)
1✔
170
    first_names = " ".join([name for name in names if not name.isupper()])
1✔
171
    return f"{first_names} {last_names}".strip()
1✔
172

173

174
## Handles Mc and Mac and other mixed titlecase names
175
def clean_last_names(names):
1✔
176
    last_names = " ".join([name for name in names if name.isupper()])
1✔
177
    last_names = HumanName(last_names)
1✔
178
    last_names.capitalize()
1✔
179
    return str(last_names)
1✔
180

181

182
def clean_description(description):
1✔
183
    description = str(description)
1✔
184
    description = description.replace("\\n", "")
1✔
185
    description = description.replace("\n", "")
1✔
186
    description = description.replace("`", "'")
1✔
187
    description = description.replace("&", "and")
1✔
188
    # change dash to hyphen to match how they are stored in our DB
189
    description = description.replace("\u2013", "\u002d")
1✔
190
    description = re.sub(r"\s+", " ", description)
1✔
191
    # handle edgecases for the green party to stop incorrectly matching against
192
    # Welsh descriptions
193
    if description.lower() in ["the green party", "the green party candidate"]:
1!
UNCOV
194
        description = "Green Party"
×
195
    return description
1✔
196

197

198
def get_description(description, sopn):
1✔
199
    description = clean_description(description)
1✔
200

201
    if not description:
1!
UNCOV
202
        return None
×
203
    if description in INDEPENDENT_VALUES:
1✔
204
        return None
1✔
205

206
    register = sopn.sopn.ballot.post.party_set.slug.upper()
1✔
207

208
    # First try to get Party object with an exact match between parsed
209
    # description and the Party name
210

211
    # annotate search_text field to both QuerySets which normalizes name field
212
    # by changing '&' to 'and' this is then used instead of the name field for
213
    # string matching
214
    party_qs = (
1✔
215
        Party.objects.register(register)
216
        .current()
217
        .annotate(search_text=Replace("name", Value("&"), Value("and")))
218
    )
219
    party = party_qs.filter(search_text=description)
1✔
220
    # If we find one, return None, so that the pain Party object
221
    # is parsed in get_party below, and this will then be preselected
222
    # for the user on the form.
223
    if party.exists():
1!
224
        return None
1✔
225

UNCOV
226
    party_description_qs = PartyDescription.objects.annotate(
×
227
        search_text=Replace("description", Value("&"), Value("and"))
228
    )
UNCOV
229
    try:
×
230
        return party_description_qs.get(
×
231
            search_text=description, party__register=register
232
        )
UNCOV
233
    except PartyDescription.DoesNotExist:
×
234
        pass
×
235

236
    # try to find any that start with parsed description
237
    description_obj = party_description_qs.filter(
×
238
        search_text__istartswith=description, party__register=register
239
    ).first()
UNCOV
240
    if description_obj:
×
UNCOV
241
        return description_obj
×
242

243
    # Levenshtein
UNCOV
244
    qs = party_description_qs.annotate(
×
245
        lev_dist=Levenshtein("search_text", Value(description))
246
    ).order_by("lev_dist")
UNCOV
247
    description_obj = qs.filter(lev_dist__lte=5).first()
×
UNCOV
248
    if description_obj:
×
249
        print(
×
250
            f"{description} matched with {description_obj.description} with a distance of {description_obj.lev_dist}"
251
        )
252
        return description_obj
×
253

254
    # final check - if this is a Welsh version of a description, it will be at
255
    # the end of the description
256
    return party_description_qs.filter(
×
257
        search_text__endswith=f"| {description}", party__register=register
258
    ).first()
259

260

261
def get_party(description_model, description_str, sopn):
1✔
262
    if description_model:
1!
UNCOV
263
        return description_model.party
×
264

265
    party_name = clean_description(description_str)
1✔
266
    register = sopn.sopn.ballot.post.party_set.slug.upper()
1✔
267

268
    # annotate search_text field which normalizes name field by changing '&' to 'and'
269
    # this is then used instead of the name field for string matching
270
    qs = (
1✔
271
        Party.objects.register(register)
272
        .active_for_date(date=sopn.sopn.ballot.election.election_date)
273
        .annotate(search_text=Replace("name", Value("&"), Value("and")))
274
    )
275
    if not party_name or party_name in INDEPENDENT_VALUES:
1✔
276
        return Party.objects.get(ec_id="ynmp-party:2")
1✔
277

278
    try:
1✔
279
        return qs.get(search_text=party_name)
1✔
UNCOV
280
    except Party.DoesNotExist:
×
UNCOV
281
        party_obj = None
×
282

UNCOV
283
    qs = qs.annotate(
×
284
        lev_dist=Levenshtein("search_text", Value(party_name))
285
    ).order_by("lev_dist")
UNCOV
286
    party_obj = qs.filter(lev_dist__lte=5).first()
×
UNCOV
287
    if party_obj:
×
UNCOV
288
        print(
×
289
            f"{party_name} matched with {party_obj.name} with a distance of {party_obj.lev_dist}"
290
        )
UNCOV
291
        return party_obj
×
292

293
    # Last resort attempt - look for the most similar party object to help when
294
    # parsed name is missing a whitespace e.g. Barnsley IndependentGroup
UNCOV
295
    qs = qs.annotate(similarity=TrigramSimilarity("name", party_name)).order_by(
×
296
        "-similarity"
297
    )
298

299
    party_obj = qs.filter(similarity__gte=0.5).first()
×
300
    if not party_obj:
×
301
        closest = qs.first()
×
UNCOV
302
        print(f"Couldn't find party for {party_name}.")
×
UNCOV
303
        print(f"Closest is {closest.name} with similarity {closest.similarity}")
×
304

UNCOV
305
    return party_obj
×
306

307

308
def get_name(row, name_fields):
1✔
309
    """
310
    Takes a list of name fields and returns a string of the values of each of
311
    the name fields in the row
312
    """
313
    name = " ".join([row[field] for field in name_fields])
1✔
314
    return clean_name(name)
1✔
315

316

317
def add_previous_party_affiliations(party_str, raw_data, sopn):
1✔
318
    """
319
    Attempts to find previous party affiliations and add them to the data
320
    object. If no party can be found, returns the data unchanged.
321
    """
322
    if not party_str:
1✔
323
        return raw_data
1✔
324

325
    party = get_party(
1✔
326
        description_model=None, description_str=party_str, sopn=sopn
327
    )
328

329
    if not party:
1✔
330
        return raw_data
1✔
331

332
    raw_data["previous_party_affiliations"] = [party.ec_id]
1✔
333
    return raw_data
1✔
334

335

336
def parse_table(sopn, data):
1✔
337
    data.columns = clean_row(data.columns)
1✔
338

339
    name_fields = get_name_fields(data.columns)
1✔
340

341
    # if we have more than one name field try to order them
342
    if len(name_fields) > 1:
1!
UNCOV
343
        name_fields = order_name_fields(name_fields)
×
344

345
    description_field = guess_description_field(data.columns)
1✔
346
    previous_party_affiliations_field = guess_previous_party_affiliations_field(
1✔
347
        data=data.columns, sopn=sopn
348
    )
349

350
    ballot_data = []
1✔
351
    for row in iter_rows(data):
1✔
352
        name = get_name(row, name_fields)
1✔
353
        # if we couldnt parse a candidate name skip this row
354
        if not name:
1!
UNCOV
355
            continue
×
356

357
        description_obj = get_description(
1✔
358
            description=row[description_field], sopn=sopn
359
        )
360
        party_obj = get_party(
1✔
361
            description_model=description_obj,
362
            description_str=row[description_field],
363
            sopn=sopn,
364
        )
365
        if not party_obj:
1!
UNCOV
366
            continue
×
367

368
        data = {"name": name, "party_id": party_obj.ec_id}
1✔
369
        if description_obj:
1!
UNCOV
370
            data["description_id"] = description_obj.pk
×
371

372
        if previous_party_affiliations_field:
1✔
373
            data = add_previous_party_affiliations(
1✔
374
                party_str=row[previous_party_affiliations_field],
375
                raw_data=data,
376
                sopn=sopn,
377
            )
378

379
        ballot_data.append(data)
1✔
380
    return ballot_data
1✔
381

382

383
def parse_raw_data_for_ballot(ballot):
1✔
384
    """
385

386
    :type ballot: candidates.models.Ballot
387
    """
388
    if ballot.candidates_locked:
1!
UNCOV
389
        raise ValueError(
×
390
            f"Can't parse a locked ballot {ballot.ballot_paper_id}"
391
        )
392

393
    if ballot.suggestedpostlock_set.exists():
1!
UNCOV
394
        raise ValueError(
×
395
            f"Can't parse a ballot with lock suggestions {ballot.ballot_paper_id}"
396
        )
397

398
    try:
1✔
399
        parsed_sopn_model = ballot.sopn.parsedsopn
1✔
400
    except ParsedSOPN.DoesNotExist:
1✔
401
        raise ValueError(f"No ParsedSOPN for {ballot.ballot_paper_id}")
1✔
402

403
    data = parsed_sopn_model.as_pandas
1✔
404
    cell_counts = [len(merge_row_cells(c)) for c in iter_rows(data)]
1✔
405

406
    header_found = False
1✔
407
    avg_row = sum(cell_counts) / float(len(cell_counts))
1✔
408
    for row in iter_rows(data):
1✔
409
        if not header_found:
1✔
410
            if looks_like_header(row, avg_row):
1!
411
                data.columns = row
1✔
412
                data = data.drop(row.name)
1✔
413
                header_found = True
1✔
414
            else:
415
                try:
×
416
                    data = data.drop(row.name)
×
417
                except IndexError:
×
418
                    break
×
419
    if not header_found:
1!
420
        # Don't try to parse if we don't think we know the header
421
        print(f"We couldnt find a header for {ballot.ballot_paper_id}")
×
422
        return
×
423
    # We're now in a position where we think we have the table we want
424
    # with the columns set and other header rows removed.
425
    # Time to parse it in to names and parties
426
    try:
1✔
427
        ballot_data = parse_table(parsed_sopn_model, data)
1✔
428
    except ValueError as e:
×
429
        # Something went wrong. This will happen a lot. let's move on
430
        print(f"Error attempting to parse a table for {ballot.ballot_paper_id}")
×
431
        print(e.args[0])
×
432
        return
×
433

434
    if ballot_data:
1!
435
        # Check there isn't a rawpeople object from another (better) source
436
        rawpeople_qs = RawPeople.objects.filter(
1✔
437
            ballot=parsed_sopn_model.sopn.ballot
438
        ).exclude(source_type=RawPeople.SOURCE_PARSED_PDF)
439
        if not rawpeople_qs.exists():
1!
440
            RawPeople.objects.update_or_create(
1✔
441
                ballot=parsed_sopn_model.sopn.ballot,
442
                defaults={
443
                    "data": ballot_data,
444
                    "source": "Parsed from {}".format(
445
                        parsed_sopn_model.sopn.source_url
446
                    ),
447
                    "source_type": RawPeople.SOURCE_PARSED_PDF,
448
                },
449
            )
450
        # We've done the parsing, so let's still save the result
451
        storage = DefaultStorage()
1✔
452
        desired_storage_path = join(
1✔
453
            "raw_people",
454
            "{}.json".format(parsed_sopn_model.sopn.ballot.ballot_paper_id),
455
        )
456
        storage.save(
1✔
457
            desired_storage_path,
458
            ContentFile(json.dumps(ballot_data, indent=4).encode("utf8")),
459
        )
460

461
        parsed_sopn_model.status = "parsed"
1✔
462
        parsed_sopn_model.save()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc