• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DemocracyClub / yournextrepresentative / 52a0cf3d-b78e-4870-8d22-43f4c46e7039

pending completion
52a0cf3d-b78e-4870-8d22-43f4c46e7039

Pull #2107

circleci

VirginiaDooley
Handle newlines in the middle of a name
Pull Request #2107: Handle newlines in the middle of a name

1598 of 2729 branches covered (58.56%)

Branch coverage included in aggregate %.

8 of 8 new or added lines in 1 file covered. (100.0%)

6598 of 9425 relevant lines covered (70.01%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.22
/ynr/apps/sopn_parsing/helpers/parse_tables.py
1
import json
1✔
2
import re
1✔
3
from os.path import join
1✔
4

5
from django.db.models.functions import Replace
1✔
6
from django.db.models import Value
1✔
7
from django.core.files.base import ContentFile
1✔
8
from django.core.files.storage import DefaultStorage
1✔
9
from django.contrib.postgres.search import TrigramSimilarity
1✔
10
from nameparser import HumanName
1✔
11

12
from bulk_adding.models import RawPeople
1✔
13
from parties.models import Party, PartyDescription
1✔
14
from sopn_parsing.helpers.text_helpers import clean_text
1✔
15
from sopn_parsing.models import ParsedSOPN
1✔
16
from utils.db import Levenshtein
1✔
17

18

19
FIRST_NAME_FIELDS = [
1✔
20
    "other name",
21
    "other names",
22
    "candidate forename",
23
    "candidates other names",
24
    "other names in full",
25
    "other names / enwau eraill",
26
]
27
LAST_NAME_FIELDS = [
1✔
28
    "surname",
29
    "candidate surname",
30
    "candidates surname",
31
    "last name",
32
    "surname / cyfenw",
33
]
34
WELSH_NAME_FIELDS = [
1✔
35
    "enwr ymgeisydd",
36
    "enwr ymgeisydd candidate name",
37
    "enwr ymgeisydd name of candidate",
38
]
39
NAME_FIELDS = (
1✔
40
    FIRST_NAME_FIELDS
41
    + LAST_NAME_FIELDS
42
    + [
43
        "name of candidate",
44
        "names of candidate",
45
        "candidate name",
46
        "surname other names",
47
        "surname other names in full",
48
    ]
49
    + WELSH_NAME_FIELDS
50
)
51

52

53
INDEPENDENT_VALUES = ["Independent", "", "Annibynnol"]
1✔
54

55
WELSH_DESCRIPTION_VALUES = [
1✔
56
    "disgrifiad",
57
    "disgrifiad or ymgeisydd",
58
    "disgrifiad or ymgeisydd description of candidate",
59
]
60
DESCRIPTION_VALUES = [
1✔
61
    "description of candidate",
62
    "description",
63
] + WELSH_DESCRIPTION_VALUES
64

65

66
def iter_rows(data):
1✔
67
    counter = 0
1✔
68
    more = True
1✔
69
    while more:
1✔
70
        try:
1✔
71
            yield data.iloc[counter]
1✔
72
            counter += 1
1✔
73
        except IndexError:
1✔
74
            more = False
1✔
75

76

77
def merge_row_cells(row):
1✔
78
    return [c for c in row if c]
1✔
79

80

81
def clean_row(row):
1✔
82
    return [clean_text(c) for c in row]
1✔
83

84

85
def contains_header_like_strings(row):
1✔
86
    row_string = clean_text(row.to_string())
1✔
87
    if any(s in row_string for s in NAME_FIELDS):
1!
88
        return True
1✔
89
    return False
×
90

91

92
def looks_like_header(row, avg_row):
1✔
93
    avg_row = avg_row - 3
1✔
94
    if len(merge_row_cells(row)) >= avg_row:
1!
95
        if contains_header_like_strings(row):
1!
96
            return True
1✔
97
    return False
×
98

99

100
def order_name_fields(name_fields):
1✔
101
    """
102
    Takes a list of name fields and attempts to find a field with in the
103
    LAST_NAME_FIELDS and move to the end of the list
104
    """
105
    for index, field in enumerate(name_fields):
1!
106
        if field in LAST_NAME_FIELDS:
1✔
107
            # found the fieldname we think is for the last name,
108
            # so move that to the end of our name fields
109
            name_fields.append(name_fields.pop(index))
1✔
110
            break
1✔
111

112
    return name_fields
1✔
113

114

115
def get_name_fields(row):
1✔
116
    """
117
    Returns a list of name fields. This could be a single field or multiple
118
    fields.
119
    """
120
    name_fields = [cell for cell in row if cell in NAME_FIELDS]
1✔
121
    if not name_fields:
1✔
122
        raise ValueError("No name guess for {}".format(row))
1✔
123
    return name_fields
1✔
124

125

126
def guess_description_field(row):
1✔
127
    for cell in row:
1!
128
        if cell in DESCRIPTION_VALUES:
1✔
129
            return cell
1✔
130
    raise ValueError("No description guess for {}".format(row))
×
131

132

133
def guess_previous_party_affiliations_field(data, sopn):
1✔
134
    data = clean_row(data)
1✔
135
    if not sopn.sopn.ballot.is_welsh_run:
1✔
136
        return None
1✔
137

138
    field_value = None
1✔
139

140
    for cell in data:
1!
141
        if cell in ["statement of party membership"]:  # this could become more
1✔
142
            field_value = cell
1✔
143
            break
1✔
144

145
    return field_value
1✔
146

147

148
def clean_name(name):
1✔
149
    """
150
    - Strips some special characters from the name string
151
    - Splits the string in to a list, removing any empty strings
152
    - Build a string to represent the last name by looking for all words that are in all caps
153
    - Build a string to represent the other names by looking for all words not in all caps
154
    - Strip whitespace in case last_names is empty and return string titleized
155
    """
156

157
    if "\n" in name and "-" not in name:
1✔
158
        if name.count("\n") == 1 and name.find("\n") != -1:
1✔
159
            name = name.replace("\n", " ")
1✔
160
        #  # if the newline is in the middle of a word, then replace with an empty string
161
        elif name.count("\n") > 1 or name.find("\n") == -1:
1!
162
            name = name.replace("\n", "")
1✔
163
        else:
164
            name = name.replace("\n", " ")
×
165
    elif "\n" in name and "-" in name:
1✔
166
        name = name.replace("\n", " ")
1✔
167

168
    name = name.replace("`", "'")
1✔
169
    name = name.replace("\u2013", "\u002d")
1✔
170
    # remove multiple whitespaces
171
    name = " ".join(name.split())
1✔
172
    # this can leave extra whitespace after special chars so remove these
173
    name = name.replace("- ", "-")
1✔
174
    name = name.replace("' ", "'")
1✔
175

176
    if "commonly known as" in name:
1!
177
        name = name.replace(")", "")
×
178
        name = name.split("commonly known as")[-1].replace(")", "").strip()
×
179

180
    names = list(filter(None, name.split(" ")))
1✔
181
    last_names = clean_last_names(names)
1✔
182
    first_names = " ".join([name for name in names if not name.isupper()])
1✔
183
    full_name = f"{first_names} {last_names}".strip()
1✔
184
    return full_name
1✔
185

186

187
## Handles Mc and Mac and other mixed titlecase names
188
def clean_last_names(names):
1✔
189
    last_names = " ".join([name for name in names if name.isupper()])
1✔
190
    last_names = HumanName(last_names)
1✔
191
    last_names.capitalize()
1✔
192
    return str(last_names)
1✔
193

194

195
def clean_description(description):
1✔
196
    description = str(description)
1✔
197
    description = description.replace("\\n", "")
1✔
198
    description = description.replace("\n", "")
1✔
199
    description = description.replace("`", "'")
1✔
200
    description = description.replace("&", "and")
1✔
201
    # change dash to hyphen to match how they are stored in our DB
202
    description = description.replace("\u2013", "\u002d")
1✔
203
    description = re.sub(r"\s+", " ", description)
1✔
204
    # handle edgecases for the green party to stop incorrectly matching against
205
    # Welsh descriptions
206
    if description.lower() in ["the green party", "the green party candidate"]:
1!
207
        description = "Green Party"
×
208
    return description
1✔
209

210

211
def get_description(description, sopn):
1✔
212
    description = clean_description(description)
1✔
213

214
    if not description:
1!
215
        return None
×
216
    if description in INDEPENDENT_VALUES:
1✔
217
        return None
1✔
218

219
    register = sopn.sopn.ballot.post.party_set.slug.upper()
1✔
220

221
    # First try to get Party object with an exact match between parsed
222
    # description and the Party name
223

224
    # annotate search_text field to both QuerySets which normalizes name field
225
    # by changing '&' to 'and' this is then used instead of the name field for
226
    # string matching
227
    party_qs = (
1✔
228
        Party.objects.register(register)
229
        .current()
230
        .annotate(search_text=Replace("name", Value("&"), Value("and")))
231
    )
232
    party = party_qs.filter(search_text=description)
1✔
233
    # If we find one, return None, so that the pain Party object
234
    # is parsed in get_party below, and this will then be preselected
235
    # for the user on the form.
236
    if party.exists():
1!
237
        return None
1✔
238

239
    party_description_qs = PartyDescription.objects.annotate(
×
240
        search_text=Replace("description", Value("&"), Value("and"))
241
    )
242
    try:
×
243
        return party_description_qs.get(
×
244
            search_text=description, party__register=register
245
        )
246
    except PartyDescription.DoesNotExist:
×
247
        pass
×
248

249
    # try to find any that start with parsed description
250
    description_obj = party_description_qs.filter(
×
251
        search_text__istartswith=description, party__register=register
252
    ).first()
253
    if description_obj:
×
254
        return description_obj
×
255

256
    # Levenshtein
257
    qs = party_description_qs.annotate(
×
258
        lev_dist=Levenshtein("search_text", Value(description))
259
    ).order_by("lev_dist")
260
    description_obj = qs.filter(lev_dist__lte=5).first()
×
261
    if description_obj:
×
262
        print(
×
263
            f"{description} matched with {description_obj.description} with a distance of {description_obj.lev_dist}"
264
        )
265
        return description_obj
×
266

267
    # final check - if this is a Welsh version of a description, it will be at
268
    # the end of the description
269
    return party_description_qs.filter(
×
270
        search_text__endswith=f"| {description}", party__register=register
271
    ).first()
272

273

274
def get_party(description_model, description_str, sopn):
1✔
275
    if description_model:
1!
276
        return description_model.party
×
277

278
    party_name = clean_description(description_str)
1✔
279
    register = sopn.sopn.ballot.post.party_set.slug.upper()
1✔
280

281
    # annotate search_text field which normalizes name field by changing '&' to 'and'
282
    # this is then used instead of the name field for string matching
283
    qs = (
1✔
284
        Party.objects.register(register)
285
        .active_for_date(date=sopn.sopn.ballot.election.election_date)
286
        .annotate(search_text=Replace("name", Value("&"), Value("and")))
287
    )
288
    if not party_name or party_name in INDEPENDENT_VALUES:
1✔
289
        return Party.objects.get(ec_id="ynmp-party:2")
1✔
290

291
    try:
1✔
292
        return qs.get(search_text=party_name)
1✔
293
    except Party.DoesNotExist:
×
294
        party_obj = None
×
295

296
    qs = qs.annotate(
×
297
        lev_dist=Levenshtein("search_text", Value(party_name))
298
    ).order_by("lev_dist")
299
    party_obj = qs.filter(lev_dist__lte=5).first()
×
300
    if party_obj:
×
301
        print(
×
302
            f"{party_name} matched with {party_obj.name} with a distance of {party_obj.lev_dist}"
303
        )
304
        return party_obj
×
305

306
    # Last resort attempt - look for the most similar party object to help when
307
    # parsed name is missing a whitespace e.g. Barnsley IndependentGroup
308
    qs = qs.annotate(similarity=TrigramSimilarity("name", party_name)).order_by(
×
309
        "-similarity"
310
    )
311

312
    party_obj = qs.filter(similarity__gte=0.5).first()
×
313
    if not party_obj:
×
314
        closest = qs.first()
×
315
        print(f"Couldn't find party for {party_name}.")
×
316
        print(f"Closest is {closest.name} with similarity {closest.similarity}")
×
317

318
    return party_obj
×
319

320

321
def get_name(row, name_fields):
1✔
322
    """
323
    Takes a list of name fields and returns a string of the values of each of
324
    the name fields in the row
325
    """
326
    name = " ".join([row[field] for field in name_fields])
1✔
327
    name = clean_name(name)
1✔
328
    return name
1✔
329

330

331
def add_previous_party_affiliations(party_str, raw_data, sopn):
1✔
332
    """
333
    Attempts to find previous party affiliations and add them to the data
334
    object. If no party can be found, returns the data unchanged.
335
    """
336
    if not party_str:
1✔
337
        return raw_data
1✔
338

339
    party = get_party(
1✔
340
        description_model=None, description_str=party_str, sopn=sopn
341
    )
342

343
    if not party:
1✔
344
        return raw_data
1✔
345

346
    raw_data["previous_party_affiliations"] = [party.ec_id]
1✔
347
    return raw_data
1✔
348

349

350
def parse_table(sopn, data):
1✔
351
    data.columns = clean_row(data.columns)
1✔
352

353
    name_fields = get_name_fields(data.columns)
1✔
354

355
    # if we have more than one name field try to order them
356
    if len(name_fields) > 1:
1!
357
        name_fields = order_name_fields(name_fields)
×
358

359
    description_field = guess_description_field(data.columns)
1✔
360
    previous_party_affiliations_field = guess_previous_party_affiliations_field(
1✔
361
        data=data.columns, sopn=sopn
362
    )
363

364
    ballot_data = []
1✔
365
    for row in iter_rows(data):
1✔
366
        name = get_name(row, name_fields)
1✔
367
        # if we couldnt parse a candidate name skip this row
368
        if not name:
1!
369
            continue
×
370

371
        description_obj = get_description(
1✔
372
            description=row[description_field], sopn=sopn
373
        )
374
        party_obj = get_party(
1✔
375
            description_model=description_obj,
376
            description_str=row[description_field],
377
            sopn=sopn,
378
        )
379
        if not party_obj:
1!
380
            continue
×
381

382
        data = {"name": name, "party_id": party_obj.ec_id}
1✔
383
        if description_obj:
1!
384
            data["description_id"] = description_obj.pk
×
385

386
        if previous_party_affiliations_field:
1✔
387
            data = add_previous_party_affiliations(
1✔
388
                party_str=row[previous_party_affiliations_field],
389
                raw_data=data,
390
                sopn=sopn,
391
            )
392

393
        ballot_data.append(data)
1✔
394
    return ballot_data
1✔
395

396

397
def parse_raw_data_for_ballot(ballot):
1✔
398
    """
399

400
    :type ballot: candidates.models.Ballot
401
    """
402
    if ballot.candidates_locked:
1!
403
        raise ValueError(
×
404
            f"Can't parse a locked ballot {ballot.ballot_paper_id}"
405
        )
406

407
    if ballot.suggestedpostlock_set.exists():
1!
408
        raise ValueError(
×
409
            f"Can't parse a ballot with lock suggestions {ballot.ballot_paper_id}"
410
        )
411

412
    try:
1✔
413
        parsed_sopn_model = ballot.sopn.parsedsopn
1✔
414
    except ParsedSOPN.DoesNotExist:
1✔
415
        raise ValueError(f"No ParsedSOPN for {ballot.ballot_paper_id}")
1✔
416

417
    data = parsed_sopn_model.as_pandas
1✔
418
    cell_counts = [len(merge_row_cells(c)) for c in iter_rows(data)]
1✔
419

420
    header_found = False
1✔
421
    avg_row = sum(cell_counts) / float(len(cell_counts))
1✔
422
    for row in iter_rows(data):
1✔
423
        if not header_found:
1✔
424
            if looks_like_header(row, avg_row):
1!
425
                data.columns = row
1✔
426
                data = data.drop(row.name)
1✔
427
                header_found = True
1✔
428
            else:
429
                try:
×
430
                    data = data.drop(row.name)
×
431
                except IndexError:
×
432
                    break
×
433
    if not header_found:
1!
434
        # Don't try to parse if we don't think we know the header
435
        print(f"We couldnt find a header for {ballot.ballot_paper_id}")
×
436
        return None
×
437
    # We're now in a position where we think we have the table we want
438
    # with the columns set and other header rows removed.
439
    # Time to parse it in to names and parties
440
    try:
1✔
441
        ballot_data = parse_table(parsed_sopn_model, data)
1✔
442
    except ValueError as e:
×
443
        # Something went wrong. This will happen a lot. let's move on
444
        print(f"Error attempting to parse a table for {ballot.ballot_paper_id}")
×
445
        print(e.args[0])
×
446
        return None
×
447

448
    if ballot_data:
1!
449
        # Check there isn't a rawpeople object from another (better) source
450
        rawpeople_qs = RawPeople.objects.filter(
1✔
451
            ballot=parsed_sopn_model.sopn.ballot
452
        ).exclude(source_type=RawPeople.SOURCE_PARSED_PDF)
453
        if not rawpeople_qs.exists():
1!
454
            RawPeople.objects.update_or_create(
1✔
455
                ballot=parsed_sopn_model.sopn.ballot,
456
                defaults={
457
                    "data": ballot_data,
458
                    "source": "Parsed from {}".format(
459
                        parsed_sopn_model.sopn.source_url
460
                    ),
461
                    "source_type": RawPeople.SOURCE_PARSED_PDF,
462
                },
463
            )
464
        # We've done the parsing, so let's still save the result
465
        storage = DefaultStorage()
1✔
466
        desired_storage_path = join(
1✔
467
            "raw_people",
468
            "{}.json".format(parsed_sopn_model.sopn.ballot.ballot_paper_id),
469
        )
470
        storage.save(
1✔
471
            desired_storage_path,
472
            ContentFile(json.dumps(ballot_data, indent=4).encode("utf8")),
473
        )
474

475
        parsed_sopn_model.status = "parsed"
1✔
476
        parsed_sopn_model.save()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc