• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DemocracyClub / UK-Polling-Stations / 9196cfef-5c54-41a9-8fa7-2645b1a45fe6

pending completion
9196cfef-5c54-41a9-8fa7-2645b1a45fe6

push

circleci

symroe
Import script for Cambridge (2023-07-04) (closes #)

3190 of 4448 relevant lines covered (71.72%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/polling_stations/apps/addressbase/management/commands/split_and_load_csv_to_database.py
1
import multiprocessing
×
2
import os
×
3
import subprocess
×
4
import tempfile
×
5
import time
×
6
from pathlib import Path
×
7

8
import boto3
×
9
from django.core.management import BaseCommand
×
10
from django.db import connections, DEFAULT_DB_ALIAS
×
11
from django.conf import settings
×
12

13

14
APPROX_ADDRESS_BASE_BYTES = 3_300_000_000
×
15

16

17
def import_single_file(file_name, table_name, database):
×
18
    file_sql_template = f"""
×
19
        BEGIN;
20
        SET LOCAL synchronous_commit TO OFF;
21
        COPY {table_name} FROM STDIN CSV;
22
        COMMIT;
23
    """
24
    host = settings.DATABASES[database]["HOST"]
×
25
    password = settings.DATABASES[database]["PASSWORD"]
×
26
    database_name = settings.DATABASES[database]["NAME"]
×
27

28
    command = f"""cat {file_name} | psql postgresql://postgres:{password}@{host}/{database_name} -c "{file_sql_template}" """
×
29
    process = subprocess.Popen(
×
30
        command,
31
        shell=True,
32
        stdout=subprocess.PIPE,
33
    )
34
    process.communicate()
×
35
    return True
×
36

37

38
class Command(BaseCommand):
×
39
    def add_arguments(self, parser):
×
40
        parser.add_argument(
×
41
            "--database",
42
            default=DEFAULT_DB_ALIAS,
43
            help=f"Nominates a database to import in to. Defaults to the '{DEFAULT_DB_ALIAS}' database.",
44
        )
45
        parser.add_argument(
×
46
            "--processes",
47
            default=10,
48
            type=int,
49
            help="The number of jobs to run in parallel",
50
        )
51
        parser.add_argument(
×
52
            "--import-type",
53
            choices=["addressbase", "uprntocouncil"],
54
            required=True,
55
            help="The type of data to import",
56
        )
57
        parser.add_argument(
×
58
            "--local-file-path",
59
            action="store",
60
            help="If provided, use a local file rather than downloading",
61
        )
62

63
    def handle(self, *args, **options):
×
64
        self.database = options["database"]
×
65
        self.processes = options["processes"]
×
66
        self.import_type = options["import_type"]
×
67
        _tmp_dir = tempfile.TemporaryDirectory()
×
68
        self.tmp_dir = Path(_tmp_dir.name)
×
69
        self.local_file_path = Path(options["local_file_path"])
×
70
        if self.local_file_path:
×
71
            self.file_path = self.local_file_path
×
72
        self.s3_client = boto3.client(
×
73
            "s3", region_name=os.environ.get("AWS_REGION", "eu-west-2")
74
        )
75
        if self.import_type == "addressbase":
×
76
            self.table_name = "addressbase_address"
×
77
        else:
78
            self.table_name = "addressbase_uprntocouncil"
×
79

80
        if not self.local_file_path:
×
81
            # Download the file to the tempdir
82
            self.download_file()
×
83
        # Split the file and save the parts in a list
84
        self.split_files = self.split_file()
×
85

86
        # Pass that list to the import function
87
        with connections[self.database].cursor() as cursor:
×
88
            self.cursor = cursor
×
89
            self.cursor.execute(
×
90
                f"ALTER TABLE {self.table_name} SET (autovacuum_enabled = false);"
91
            )
92
            self.stdout.write("clearing existing data..")
×
93
            cursor.execute(f"TRUNCATE TABLE {self.table_name} CASCADE;")
×
94
            self.run_processes()
×
95
            self.cursor.execute(
×
96
                f"ALTER TABLE {self.table_name} SET (autovacuum_enabled = true);"
97
            )
98

99
        _tmp_dir.cleanup()
×
100

101
    def download_file(self):
×
102
        """
103
        Find the latest file of the file type and download it to the temp_dir
104

105
        """
106
        files = self.s3_client.list_objects_v2(
×
107
            Bucket=settings.PRIVATE_DATA_BUCKET_NAME, Prefix=f"{self.import_type}/"
108
        )["Contents"]
109

110
        latest_file_key = sorted(files, key=lambda f: f["LastModified"])[0]["Key"]
×
111
        print(latest_file_key)
×
112
        file = Path(self.tmp_dir.name) / self.import_type / "full.csv"
×
113
        file.parent.mkdir(exist_ok=True, parents=True)
×
114
        self.file_path = file
×
115
        with file.open("wb") as f:
×
116
            self.s3_client.download_fileobj(
×
117
                settings.PRIVATE_DATA_BUCKET_NAME, latest_file_key, f
118
            )
119

120
    def run_processes(self):
×
121
        pool = multiprocessing.Pool(self.processes)
×
122
        results = []
×
123
        for file_name in self.split_files:
×
124
            result = pool.apply_async(
×
125
                import_single_file, (file_name, self.table_name, self.database)
126
            )
127
            results.append(result)
×
128
        pool.close()
×
129
        while True:
×
130
            time.sleep(1)
×
131
            # catch exception if results are not ready yet
132
            self.cursor.execute(
×
133
                f"select SUM(bytes_processed) / {APPROX_ADDRESS_BASE_BYTES} from pg_stat_progress_copy;"
134
            )
135
            self.stdout.write(f"Rough % done: {self.cursor.fetchone()}")
×
136
            ready = [result.ready() for result in results]
×
137
            successful = [result.successful() for result in results if result.ready()]
×
138
            self.stdout.write(f"{ready=}")
×
139
            self.stdout.write(f"{successful=}")
×
140

141
            # exit loop if all tasks returned success
142
            if len(successful) == self.processes and all(successful):
×
143
                break
×
144
            # raise exception reporting exceptions received from workers
145
            if len(successful) == self.processes:
×
146
                raise Exception(
×
147
                    f"Workers raised following exceptions {[result._value for result in results if not result.successful()]}"
148
                )
149

150
    def split_file(self):
×
151
        self.split_dir = self.tmp_dir / "split"
×
152
        self.split_dir.mkdir(parents=True, exist_ok=True)
×
153
        self.stdout.write(
×
154
            f"Splitting {self.file_path} in to {self.processes} parts, saving to {self.split_dir}"
155
        )
156
        args = [
×
157
            "split",
158
            "-n",
159
            f"l/{self.processes}",
160
            "--additional-suffix=.csv",
161
            f"{self.local_file_path}",
162
            f"{self.split_dir}/{self.import_type}_split_",
163
        ]
164
        command = subprocess.Popen(args)
×
165
        command.communicate()
×
166
        return list(self.split_dir.glob("*"))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc