• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DemocracyClub / uk-geo-utils / c7bb37cf-7e7b-422f-a9da-cef3986fe33f

13 Nov 2024 10:40AM UTC coverage: 90.629% (-0.5%) from 91.161%
c7bb37cf-7e7b-422f-a9da-cef3986fe33f

Pull #33

circleci

GeoWill
Add a flag to specify target db for importers
Pull Request #33: Feature/onspd header checks and db flag

46 of 46 new or added lines in 4 files covered. (100.0%)

11 existing lines in 1 file now uncovered.

706 of 779 relevant lines covered (90.63%)

3.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.06
/uk_geo_utils/base_importer.py
1
import abc
4✔
2
import shutil
4✔
3
import tempfile
4✔
4
import urllib.request
4✔
5
import zipfile
4✔
6
from pathlib import Path
4✔
7

8
import psutil
4✔
9
from django.core.management.base import BaseCommand
4✔
10
from django.db import DEFAULT_DB_ALIAS, connections, transaction
4✔
11

12

13
def unzip(filepath):
4✔
14
    zip_file = zipfile.ZipFile(filepath, "r")
×
15
    tmpdir = tempfile.mkdtemp()
×
16
    zip_file.extractall(tmpdir)
×
17
    return tmpdir
×
18

19

20
def check_memory(required_memory: int = 2):
4✔
21
    # Downloading, unzipping and working with the ONSPD
22
    # requires a decent chunk of memory to play with.
23
    # Running this import on a tiny instance like a
24
    # t2.micro will cause an Out Of Memory error
25

26
    # By default ensure we've got >2Gb total before we start
27
    mem = psutil.virtual_memory()
4✔
28
    gb = ((mem.total / 1024) / 1024) / 1024
4✔
29
    return gb >= required_memory
4✔
30

31

32
class BaseImporter(BaseCommand):
4✔
33
    def __init__(self, *args, **kwargs):
4✔
34
        super().__init__(*args, **kwargs)
4✔
35
        self.foreign_key_constraints = None
4✔
36
        self.indexes = None
4✔
37
        self.primary_key_constraint = None
4✔
38
        self.tempdir = None
4✔
39
        self.data_path = None
4✔
40
        self.cursor = None
4✔
41
        self.table_name = self.get_table_name()
4✔
42
        self.temp_table_name = self.table_name + "_temp"
4✔
43

44
    def add_arguments(self, parser):
4✔
45
        group = parser.add_mutually_exclusive_group(required=True)
4✔
46
        group.add_argument("--url", action="store")
4✔
47
        group.add_argument("--data-path", action="store")
4✔
48
        parser.add_argument("--database", default=DEFAULT_DB_ALIAS)
4✔
49

50
    @abc.abstractmethod
4✔
51
    def get_table_name(self) -> str:
4✔
52
        pass
×
53

54
    def get_data_path(self, options):
4✔
55
        data_path = None
4✔
56

57
        if options.get("data_path"):
4✔
58
            self.data_path = options["data_path"]
4✔
59

60
        if url := options.get("url"):
4✔
61
            self.stdout.write(f"Downloading data from {url}")
×
62
            tmp = tempfile.NamedTemporaryFile()
×
63
            urllib.request.urlretrieve(url, tmp.name)
×
64
            self.tempdir = unzip(tmp.name)
×
65
            self.data_path = Path(self.tempdir) / "Data"
×
66

67
        return data_path
4✔
68

69
    @abc.abstractmethod
4✔
70
    def import_data_to_temp_table(self):
4✔
71
        pass
×
72

73
    def get_index_statements(self):
4✔
74
        self.cursor.execute(f"""
4✔
75
            SELECT tablename, indexname, indexdef 
76
            FROM pg_indexes 
77
            WHERE tablename='{self.table_name}' 
78
        """)
79
        results = self.cursor.fetchall()
4✔
80

81
        indexes = []
4✔
82
        for row in results:
4✔
83
            original_index_name = row[1]
4✔
84
            original_index_create_statement = row[2]
4✔
85
            temp_index_name = original_index_name + "_temp"
4✔
86
            temp_index_create_statement = self.make_temp_index_create_statement(
4✔
87
                original_index_create_statement,
88
                original_index_name,
89
                temp_index_name,
90
            )
91
            rename_temp_index_statement = (
4✔
92
                f"ALTER INDEX {temp_index_name} RENAME TO {original_index_name}"
93
            )
94
            indexes.append(
4✔
95
                {
96
                    "index_name": original_index_name,
97
                    "temp_index_name": temp_index_name,
98
                    "original_index_create_statement": original_index_create_statement,
99
                    "temp_index_create_statement": temp_index_create_statement,
100
                    "rename_temp_index_statement": rename_temp_index_statement,
101
                }
102
            )
103

104
        return indexes
4✔
105

106
    def make_temp_index_create_statement(
4✔
107
        self,
108
        original_index_create_statement,
109
        original_index_name,
110
        temp_index_name,
111
    ):
112
        # we expect the statement to be of the form
113
        # CREATE [UNIQUE] INDEX $index ON $table USING $fields"
114
        temp_index_create_statement = original_index_create_statement.replace(
4✔
115
            f"INDEX {original_index_name}",
116
            f"INDEX IF NOT EXISTS {temp_index_name}",
117
        )
118
        return temp_index_create_statement.replace(
4✔
119
            f"ON public.{self.table_name}", f"ON public.{self.temp_table_name}"
120
        )
121

122
    def build_temp_indexes(self):
4✔
123
        self.stdout.write(f"Building indexes on {self.temp_table_name}...")
4✔
124
        for index in self.indexes:
4✔
125
            self.stdout.write(
4✔
126
                f"Executing: {index['temp_index_create_statement']}"
127
            )
128
            self.cursor.execute(index["temp_index_create_statement"])
4✔
129

130
    def get_primary_key_constraint(self):
4✔
131
        pkey_sql = f"""
4✔
132
            SELECT conname, pg_get_constraintdef(oid) 
133
            FROM pg_constraint 
134
            WHERE conrelid = '{self.table_name}'::regclass AND contype = 'p';
135
        """
136
        self.cursor.execute(pkey_sql)
4✔
137
        results = self.cursor.fetchall()
4✔
138
        num_keys = len(results)
4✔
139
        if num_keys != 1:
4✔
140
            raise ValueError(
×
141
                f"Expected there to be 1 primary key. But {num_keys} found."
142
            )
143

144
        self.stdout.write("Found primary key constraint")
4✔
145
        constraint_name = results[0][0]
4✔
146
        temp_name = constraint_name + "_temp"
4✔
147
        constraintdef = results[0][1]
4✔
148
        return {
4✔
149
            "constraint_name": constraint_name,
150
            "temp_name": temp_name,
151
            "constraintdef": constraintdef,
152
            "temp_constraint_create_statement": f"ALTER TABLE {self.temp_table_name} ADD CONSTRAINT {temp_name} {constraintdef}",
153
        }
154

155
    def add_temp_primary_key(self):
4✔
156
        self.stdout.write(f"Adding primary key to {self.temp_table_name}...")
4✔
157
        self.stdout.write(
4✔
158
            f"Executing: {self.primary_key_constraint['temp_constraint_create_statement']}"
159
        )
160
        self.cursor.execute(
4✔
161
            self.primary_key_constraint["temp_constraint_create_statement"]
162
        )
163

164
    def get_foreign_key_constraints(self):
4✔
165
        fkey_sql = f"""
4✔
166
            SELECT conname AS constraint_name, confrelid::regclass::text AS refrenced_table, pg_get_constraintdef(oid), conrelid::regclass::text AS referencing_table
167
            FROM pg_constraint 
168
            WHERE contype = 'f'
169
                AND (
170
                    conrelid = '{self.table_name}'::regclass 
171
                    OR
172
                    confrelid = '{self.table_name}'::regclass
173
                )      
174
        """
175
        self.cursor.execute(fkey_sql)
4✔
176
        results = self.cursor.fetchall()
4✔
177

178
        self.stdout.write(
4✔
179
            f"Found {len(results)} foreign key constraints, where {self.table_name} is the referencing or referenced table"
180
        )
181

182
        fk_constraints = []
4✔
183
        seen = set()
4✔
184

185
        for row in results:
4✔
UNCOV
186
            constraint_name = row[0]
×
UNCOV
187
            if constraint_name not in seen:
×
UNCOV
188
                seen.add(constraint_name)
×
189
                # referenced_table_name = row[1]
UNCOV
190
                constraintdef = row[2]
×
UNCOV
191
                referencing_table = row[3]
×
192

UNCOV
193
                fk_constraints.append(
×
194
                    {
195
                        "constraint_name": constraint_name,
196
                        "create_statement": f"ALTER TABLE {referencing_table} ADD CONSTRAINT {constraint_name} {constraintdef}",
197
                        "delete_statement": f"ALTER TABLE {referencing_table} DROP CONSTRAINT IF EXISTS {constraint_name}",
198
                    }
199
                )
200

201
        return fk_constraints
4✔
202

203
    def drop_foreign_keys(self):
4✔
UNCOV
204
        self.stdout.write("Dropping foreign keys...")
×
UNCOV
205
        for constraint in self.foreign_key_constraints:
×
UNCOV
206
            self.stdout.write(f"Executing: {constraint['delete_statement']}")
×
UNCOV
207
            self.cursor.execute(constraint["delete_statement"])
×
208

209
    def check_foreign_key_exists(self, constraint_name):
4✔
210
        # This check is especially useful when we're importing addressbase and uprntocouncil in WDIV.
211
        # This is because there is a fk from one to the other, so they both pick up the same fk.
212
        # You can't do a try/except on the create because then any subsequent commands will fail with
213
        # SQLSTATE code:
214
        #  > The 25P02 error code in PostgreSQL is associated with the “in_failed_sql_transaction” state.
215
        #  > This error indicates that you are trying to execute an SQL command after a previous command
216
        #  > in the same transaction has failed. Once a transaction encounters an error, it becomes ‘tainted’,
217
        #  > and PostgreSQL will not allow any further SQL commands to be executed until the transaction is
218
        #  > either rolled back or the failed command is resolved with a savepoint.
219
        # https://philipmcclarence.com/how-to-diagnose-and-fix-the-25p02-in_failed_sql_transaction-error-code-in-postgres/
220

221
        self.cursor.execute(f"""
×
222
            SELECT 1
223
            FROM pg_constraint 
224
            WHERE contype = 'f'
225
                AND conname = '{constraint_name}'
226
        """)
227
        return bool(self.cursor.fetchone())
×
228

229
    def add_foreign_keys(self):
4✔
230
        self.stdout.write("Creating foreign keys...")
×
231
        for constraint in self.foreign_key_constraints:
×
232
            if self.check_foreign_key_exists(constraint["constraint_name"]):
×
233
                self.stdout.write(
×
234
                    f"Foreign key {constraint['constraint_name']} already exists - skipping"
235
                )
236
                continue
×
237

238
            self.stdout.write(f"Executing: {constraint['create_statement']}")
×
239
            self.cursor.execute(constraint["create_statement"])
×
240

241
    def create_temp_table(self):
4✔
242
        self.stdout.write(
4✔
243
            f"Creating temp table called {self.temp_table_name}..."
244
        )
245
        self.cursor.execute(f"DROP TABLE IF EXISTS {self.temp_table_name};")
4✔
246
        self.cursor.execute(
4✔
247
            f"CREATE TABLE {self.temp_table_name} AS SELECT * FROM {self.table_name} LIMIT 0;"
248
        )
249

250
    def drop_old_table(self):
4✔
251
        self.stdout.write("Dropping old table...")
4✔
252
        drop_table_statement = f"DROP TABLE {self.table_name} CASCADE "
4✔
253
        self.stdout.write(f"Executing: {drop_table_statement}")
4✔
254
        self.cursor.execute(drop_table_statement)
4✔
255

256
    def rename_temp_table(self):
4✔
257
        self.stdout.write("Renaming temp table...")
4✔
258
        rename_table_statement = (
4✔
259
            f"ALTER TABLE {self.temp_table_name} RENAME TO {self.table_name}"
260
        )
261
        self.stdout.write(f"Executing: {rename_table_statement}")
4✔
262
        self.cursor.execute(rename_table_statement)
4✔
263
        self.stdout.write("Renaming primary key...")
4✔
264
        primary_key_rename_statement = f"ALTER TABLE {self.table_name} RENAME CONSTRAINT {self.primary_key_constraint['temp_name']} TO {self.primary_key_constraint['constraint_name']}"
4✔
265
        self.stdout.write(f"Executing: {primary_key_rename_statement}")
4✔
266
        self.cursor.execute(primary_key_rename_statement)
4✔
267
        self.rename_temp_indexes()
4✔
268

269
    def rename_temp_indexes(self):
4✔
270
        index_rename_statements = []
4✔
271
        for index in self.indexes:
4✔
272
            if (
4✔
273
                index["index_name"]
274
                == self.primary_key_constraint["constraint_name"]
275
            ):
276
                self.stdout.write(
4✔
277
                    f"Skipping rename of {index['index_name']} because renaming primary key constraint renamed it already"
278
                )
279
            else:
280
                index_rename_statements.append(
4✔
281
                    index["rename_temp_index_statement"]
282
                )
283

284
        for statement in index_rename_statements:
4✔
285
            self.stdout.write(f"Executing: {statement}")
4✔
286
            self.cursor.execute(statement)
4✔
287

288
    def check_for_other_constraints(self):
4✔
289
        self.stdout.write(
4✔
290
            "Checking for non primary key/foreign key constraints..."
291
        )
292
        self.cursor.execute(f"""
4✔
293
            SELECT 1
294
            FROM pg_constraint
295
            WHERE
296
                contype NOT IN ('p', 'f')
297
                AND
298
                conrelid = '{self.table_name}'::regclass;
299
        """)
300
        result = self.cursor.fetchone()
4✔
301
        if result:
4✔
302
            raise (
×
303
                Exception(
304
                    "Non primary key/foreign key constraints found. Aborting import, to avoid overwriting."
305
                )
306
            )
307
        self.stdout.write("...none found. Continuing.")
4✔
308

309
    def get_constraints_and_index_statements(self):
4✔
310
        self.stdout.write(
4✔
311
            f"Getting constraints and indexes for {self.table_name}"
312
        )
313
        self.primary_key_constraint = self.get_primary_key_constraint()
4✔
314
        self.indexes = self.get_index_statements()
4✔
315
        self.foreign_key_constraints = self.get_foreign_key_constraints()
4✔
316
        self.check_for_other_constraints()
4✔
317

318
    def handle(self, *args, **options):
4✔
319
        if not check_memory():
4✔
320
            raise Exception(
×
321
                "This instance has less than the recommended memory. Try running the import from a larger instance."
322
            )
323

324
        db_name = options["database"]
4✔
325
        self.cursor = connections[db_name].cursor()
4✔
326

327
        self.get_data_path(options)
4✔
328

329
        self.get_constraints_and_index_statements()
4✔
330

331
        try:
4✔
332
            # Create empty temp tables
333
            self.create_temp_table()
4✔
334

335
            # import data into the temp table
336
            self.import_data_to_temp_table()
4✔
337

338
            # Add temp primary keys
339
            self.add_temp_primary_key()
4✔
340

341
            # Add temp indexes
342
            self.build_temp_indexes()
4✔
343

344
            with transaction.atomic():
4✔
345
                # Drop Foreign keys
346
                if self.foreign_key_constraints:
4✔
UNCOV
347
                    self.drop_foreign_keys()
×
348

349
                # drop old table
350
                self.drop_old_table()
4✔
351

352
                # Rename temp table to original names, pkey and indexes
353
                self.rename_temp_table()
4✔
354

355
                # Add Foreign keys
356
                if self.foreign_key_constraints:
4✔
357
                    self.add_foreign_keys()
×
358

359
        finally:
360
            self.db_cleanup()
4✔
361
            self.file_cleanup()
4✔
362

363
        self.stdout.write("...done")
4✔
364

365
    def db_cleanup(self):
4✔
366
        self.stdout.write("Dropping temp table if exists...")
4✔
367
        self.cursor.execute(
4✔
368
            f"DROP TABLE IF EXISTS {self.temp_table_name} CASCADE;"
369
        )
370

371
    def file_cleanup(self):
4✔
372
        if self.tempdir:
4✔
373
            self.stdout.write(f"Cleaning up temp files in {self.tempdir}")
×
374
            try:
×
375
                shutil.rmtree(self.tempdir)
×
376
            except OSError:
×
377
                self.stdout.write("Failed to clean up temp files.")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc