• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DemocracyClub / UK-Polling-Stations / f3a42ba2-52f0-40ce-9da6-3c7da8c69680

11 Dec 2024 12:11PM UTC coverage: 71.562% (-0.4%) from 71.931%
f3a42ba2-52f0-40ce-9da6-3c7da8c69680

Pull #8183

circleci

GeoWill
Use atomic transaction for import step of import_eoni
Pull Request #8183: Feature/import eoni from s3 management command

6 of 25 new or added lines in 1 file covered. (24.0%)

3 existing lines in 2 files now uncovered.

3991 of 5577 relevant lines covered (71.56%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.55
/polling_stations/apps/addressbase/management/commands/update_addressbase.py
1
import tempfile
1✔
2
from pathlib import Path
1✔
3

4
import boto3
1✔
5
from django.core.management.base import BaseCommand
1✔
6
from django.db import connections, transaction
1✔
7
from uk_geo_utils.base_importer import BaseImporter
1✔
8

9
from councils.models import Council
1✔
10
from data_importers.event_helpers import record_teardown_event
1✔
11
from data_importers.models import DataQuality
1✔
12
from polling_stations.db_routers import get_principal_db_name
1✔
13
from pollingstations.models import PollingDistrict, PollingStation, AdvanceVotingStation
1✔
14

15
PRINCIPAL_DB_NAME = get_principal_db_name()
1✔
16

17

18
def get_file_from_s3(uri):
1✔
19
    if not uri.startswith("s3://"):
×
20
        raise Exception("url must start with s3://")
×
21
    if not uri.lower().endswith(".csv"):
×
22
        raise Exception("url must end with .csv")
×
23
    bucket = uri.split("/")[2]
×
24
    key = "/".join(uri.split("/")[3:])
×
25
    tmp = tempfile.NamedTemporaryFile(delete=False)
×
26
    bucket = boto3.resource("s3").Bucket(bucket)
×
27
    bucket.download_file(key, tmp.name)
×
28
    return tmp.name
×
29

30

31
class AddressbaseUpdater(BaseImporter):
1✔
32
    def get_table_name(self):
1✔
33
        return "addressbase_address"
1✔
34

35
    def import_data_to_temp_table(self):
1✔
36
        copy_string = f"""
1✔
37
        COPY {self.temp_table_name} (UPRN, address, postcode, location, addressbase_postal)
38
        FROM STDIN
39
        WITH (FORMAT CSV, DELIMITER ',', QUOTE '"');
40
        """
41
        self.stdout.write(f"Executing: {copy_string}")
1✔
42
        with open(self.data_path) as f:
1✔
43
            self.cursor.copy_expert(copy_string, f)
1✔
44

45

46
class UprnToCouncilUpdater(BaseImporter):
1✔
47
    def get_table_name(self):
1✔
48
        return "addressbase_uprntocouncil"
1✔
49

50
    def import_data_to_temp_table(self):
1✔
51
        copy_string = f"""
1✔
52
        COPY {self.temp_table_name} (uprn, lad, polling_station_id, advance_voting_station_id)
53
        FROM STDIN
54
        WITH (FORMAT CSV, DELIMITER ',', null '\\N');
55
        """
56
        self.stdout.write(f"Executing: {copy_string}")
1✔
57
        with open(self.data_path) as f:
1✔
58
            self.cursor.copy_expert(copy_string, f)
1✔
59

60

61
class Command(BaseCommand):
1✔
62
    """
63
    Usage:
64

65
    ./manage.py update_addressbase \
66
        --addressbase-s3-uri='s3://bucket/addressbase/sample/addressbase_cleaned/addressbase_cleaned.csv' \
67
        --uprntocouncil-s3-uri='s3://bucket/addressbase/sample/uprn-to-council/uprn-to-councils.csv'
68
    or
69
     ./manage.py update_addressbase \
70
        --addressbase-path /path/to/addressbase_cleaned.csv \
71
        --uprntocouncil-path /path/to/uprn-to-councils.csv
72

73
    """
74

75
    help = (
1✔
76
        "Updates both Addressbase and UPRN to Council mapping tables from local files"
77
    )
78

79
    def __init__(self, *args, **kwargs):
1✔
80
        super().__init__(*args, **kwargs)
1✔
81
        self.clean_up_uprntocouncil_file = False
1✔
82
        self.clean_up_addressbase_file = False
1✔
83

84
    def add_arguments(self, parser):
1✔
85
        addressbase_group = parser.add_mutually_exclusive_group(required=True)
1✔
86
        addressbase_group.add_argument(
1✔
87
            "--addressbase-path",
88
            help="Local path to the Addressbase data file",
89
        )
90
        addressbase_group.add_argument(
1✔
91
            "--addressbase-s3-uri",
92
            help="S3 URI for Addressbase data file",
93
        )
94
        uprntocouncil_group = parser.add_mutually_exclusive_group(required=True)
1✔
95
        uprntocouncil_group.add_argument(
1✔
96
            "--uprntocouncil-path",
97
            help="Local path to the UPRN to Council data file",
98
        )
99
        uprntocouncil_group.add_argument(
1✔
100
            "--uprntocouncil-s3-uri",
101
            help="S3 URI for UPRN to Council data file",
102
        )
103
        parser.add_argument(
1✔
104
            "--database",
105
            default=PRINCIPAL_DB_NAME,
106
            help="Database name. Defaults to PRINCIPAL_DB_NAME - i.e. RDS if you're on EC2",
107
        )
108

109
    def teardown(self):
1✔
110
        self.stdout.write(
1✔
111
            "New addresses imported. Deleting all Polling Stations, Advance Polling Stations and Polling Districts..."
112
        )
113
        for council in Council.objects.with_polling_stations_in_db():
1✔
114
            record_teardown_event(council.council_id)
1✔
115
            PollingDistrict.objects.all().delete()
1✔
116
            PollingStation.objects.all().delete()
1✔
117
            AdvanceVotingStation.objects.all().delete()
1✔
118
            DataQuality.objects.all().update(
1✔
119
                report="", num_addresses=0, num_districts=0, num_stations=0
120
            )
121

122
        self.stdout.write("..deleted.")
1✔
123

124
    def handle(self, *args, **options):
1✔
125
        addressbase_path = options.get("addressbase_path", None)
1✔
126
        uprntocouncil_path = options.get("uprntocouncil_path", None)
1✔
127
        if options.get("addressbase_s3_uri"):
1✔
128
            addressbase_path = get_file_from_s3(options["addressbase_s3_uri"])
×
129
            self.clean_up_addressbase_file = True
×
130
        if options.get("uprntocouncil_s3_uri"):
1✔
131
            uprntocouncil_path = get_file_from_s3(options["uprntocouncil_s3_uri"])
×
132
            self.clean_up_uprntocouncil_file = True
×
133

134
        self.stdout.write(f"addressbase_path set to {addressbase_path}")
1✔
135
        self.stdout.write(f"uprntocouncil_path to {uprntocouncil_path}")
1✔
136

137
        database_name = options["database"]
1✔
138

139
        # Get the principal (i.e. RDS) DB
140
        cursor = connections[database_name].cursor()
1✔
141

142
        # Create addressbase updater and set cursor
143
        addressbase_updater = AddressbaseUpdater()
1✔
144
        addressbase_updater.cursor = cursor
1✔
145

146
        # Create addressbase updater and set cursor
147
        uprntocouncil_updater = UprnToCouncilUpdater()
1✔
148
        uprntocouncil_updater.cursor = cursor
1✔
149

150
        # Set the data_path on each updater instance
151
        addressbase_updater.data_path = addressbase_path
1✔
152
        uprntocouncil_updater.data_path = uprntocouncil_path
1✔
153

154
        # Get constraints and index information for both tables
155
        addressbase_updater.get_constraints_and_index_statements()
1✔
156
        uprntocouncil_updater.get_constraints_and_index_statements()
1✔
157

158
        try:
1✔
159
            # Create empty temp tables for both
160
            self.stdout.write("Creating temporary tables...")
1✔
161
            addressbase_updater.create_temp_table()
1✔
162
            uprntocouncil_updater.create_temp_table()
1✔
163

164
            # Import data into temp tables
165
            self.stdout.write("Importing data into temporary tables...")
1✔
166
            addressbase_updater.import_data_to_temp_table()
1✔
167
            uprntocouncil_updater.import_data_to_temp_table()
1✔
168

169
            # Add temp primary keys
170
            self.stdout.write("Adding primary keys to temporary tables...")
1✔
171
            addressbase_updater.add_temp_primary_key()
1✔
172
            uprntocouncil_updater.add_temp_primary_key()
1✔
173

174
            # Build temp indexes
175
            self.stdout.write("Building indexes on temporary tables...")
1✔
176
            addressbase_updater.build_temp_indexes()
1✔
177
            uprntocouncil_updater.build_temp_indexes()
1✔
178

179
            # Perform the table swaps in a single transaction
180
            with transaction.atomic():
1✔
181
                self.stdout.write("Starting atomic transaction for table swaps...")
1✔
182

183
                # Drop all foreign keys first
184
                if addressbase_updater.foreign_key_constraints:
1✔
185
                    addressbase_updater.drop_foreign_keys()
1✔
186
                if uprntocouncil_updater.foreign_key_constraints:
1✔
187
                    uprntocouncil_updater.drop_foreign_keys()
1✔
188

189
                # Rename old tables
190
                addressbase_updater.drop_old_table()
1✔
191
                uprntocouncil_updater.drop_old_table()
1✔
192

193
                # Rename temp tables
194
                addressbase_updater.rename_temp_table()
1✔
195
                uprntocouncil_updater.rename_temp_table()
1✔
196

197
                # Add foreign keys back
198
                if addressbase_updater.foreign_key_constraints:
1✔
199
                    addressbase_updater.add_foreign_keys()
1✔
200
                if uprntocouncil_updater.foreign_key_constraints:
1✔
201
                    uprntocouncil_updater.add_foreign_keys()
1✔
202
                self.teardown()
1✔
203
            self.stdout.write(
1✔
204
                self.style.SUCCESS(
205
                    "Successfully updated both Addressbase and UPRN to Council tables"
206
                )
207
            )
208

209
        finally:
210
            # Clean up both updaters
211
            self.stdout.write("Cleaning up...")
1✔
212
            addressbase_updater.db_cleanup()
1✔
213
            uprntocouncil_updater.db_cleanup()
1✔
214

215
            if self.clean_up_addressbase_file:
1✔
UNCOV
216
                Path(addressbase_path).unlink()
×
217
            if self.clean_up_uprntocouncil_file:
1✔
UNCOV
218
                Path(uprntocouncil_path).unlink()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc