• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DemocracyClub / UK-Polling-Stations / 8d33cdd0-1f72-42d1-94ef-0c20058ec776

24 May 2024 12:50PM UTC coverage: 72.172%. Remained the same
8d33cdd0-1f72-42d1-94ef-0c20058ec776

push

circleci

awdem
Import script for Mid Suffolk (2024-07-04) (closes #7060)

3675 of 5092 relevant lines covered (72.17%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/polling_stations/apps/addressbase/management/commands/split_and_load_csv_to_database.py
1
import multiprocessing
×
2
import os
×
3
import subprocess
×
4
import tempfile
×
5
import time
×
6
from pathlib import Path
×
7

8
import boto3
×
9
from django.conf import settings
×
10
from django.core.management import BaseCommand
×
11
from django.db import DEFAULT_DB_ALIAS, connections
×
12

13
APPROX_ADDRESS_BASE_BYTES = 3_300_000_000
×
14

15

16
def import_single_file(file_name, table_name, database):
×
17
    file_sql_template = f"""
×
18
        BEGIN;
19
        SET LOCAL synchronous_commit TO OFF;
20
        COPY {table_name} FROM STDIN CSV;
21
        COMMIT;
22
    """
23
    host = settings.DATABASES[database]["HOST"]
×
24
    password = settings.DATABASES[database]["PASSWORD"]
×
25
    database_name = settings.DATABASES[database]["NAME"]
×
26

27
    command = f"""cat {file_name} | psql postgresql://postgres:{password}@{host}/{database_name} -c "{file_sql_template}" """
×
28
    process = subprocess.Popen(
×
29
        command,
30
        shell=True,
31
        stdout=subprocess.PIPE,
32
    )
33
    process.communicate()
×
34
    return True
×
35

36

37
class Command(BaseCommand):
×
38
    def add_arguments(self, parser):
×
39
        parser.add_argument(
×
40
            "--database",
41
            default=DEFAULT_DB_ALIAS,
42
            help=f"Nominates a database to import in to. Defaults to the '{DEFAULT_DB_ALIAS}' database.",
43
        )
44
        parser.add_argument(
×
45
            "--processes",
46
            default=10,
47
            type=int,
48
            help="The number of jobs to run in parallel",
49
        )
50
        parser.add_argument(
×
51
            "--import-type",
52
            choices=["addressbase", "uprntocouncil"],
53
            required=True,
54
            help="The type of data to import",
55
        )
56
        parser.add_argument(
×
57
            "--local-file-path",
58
            action="store",
59
            help="If provided, use a local file rather than downloading",
60
        )
61

62
    def handle(self, *args, **options):
×
63
        self.database = options["database"]
×
64
        self.processes = options["processes"]
×
65
        self.import_type = options["import_type"]
×
66
        _tmp_dir = tempfile.TemporaryDirectory()
×
67
        self.tmp_dir = Path(_tmp_dir.name)
×
68
        self.local_file_path = Path(options["local_file_path"])
×
69
        if self.local_file_path:
×
70
            self.file_path = self.local_file_path
×
71
        self.s3_client = boto3.client(
×
72
            "s3", region_name=os.environ.get("AWS_REGION", "eu-west-2")
73
        )
74
        if self.import_type == "addressbase":
×
75
            self.table_name = "addressbase_address"
×
76
        else:
77
            self.table_name = "addressbase_uprntocouncil"
×
78

79
        if not self.local_file_path:
×
80
            # Download the file to the tempdir
81
            self.download_file()
×
82
        # Split the file and save the parts in a list
83
        self.split_files = self.split_file()
×
84

85
        # Pass that list to the import function
86
        with connections[self.database].cursor() as cursor:
×
87
            self.cursor = cursor
×
88
            self.cursor.execute(
×
89
                f"ALTER TABLE {self.table_name} SET (autovacuum_enabled = false);"
90
            )
91
            self.stdout.write("clearing existing data..")
×
92
            cursor.execute(f"TRUNCATE TABLE {self.table_name} CASCADE;")
×
93
            self.run_processes()
×
94
            self.cursor.execute(
×
95
                f"ALTER TABLE {self.table_name} SET (autovacuum_enabled = true);"
96
            )
97

98
        _tmp_dir.cleanup()
×
99

100
    def download_file(self):
×
101
        """
102
        Find the latest file of the file type and download it to the temp_dir
103

104
        """
105
        files = self.s3_client.list_objects_v2(
×
106
            Bucket=settings.PRIVATE_DATA_BUCKET_NAME, Prefix=f"{self.import_type}/"
107
        )["Contents"]
108

109
        latest_file_key = sorted(files, key=lambda f: f["LastModified"])[0]["Key"]
×
110
        print(latest_file_key)
×
111
        file = Path(self.tmp_dir.name) / self.import_type / "full.csv"
×
112
        file.parent.mkdir(exist_ok=True, parents=True)
×
113
        self.file_path = file
×
114
        with file.open("wb") as f:
×
115
            self.s3_client.download_fileobj(
×
116
                settings.PRIVATE_DATA_BUCKET_NAME, latest_file_key, f
117
            )
118

119
    def run_processes(self):
×
120
        pool = multiprocessing.Pool(self.processes)
×
121
        results = []
×
122
        for file_name in self.split_files:
×
123
            result = pool.apply_async(
×
124
                import_single_file, (file_name, self.table_name, self.database)
125
            )
126
            results.append(result)
×
127
        pool.close()
×
128
        while True:
×
129
            time.sleep(1)
×
130
            # catch exception if results are not ready yet
131
            self.cursor.execute(
×
132
                f"select SUM(bytes_processed) / {APPROX_ADDRESS_BASE_BYTES} from pg_stat_progress_copy;"
133
            )
134
            self.stdout.write(f"Rough % done: {self.cursor.fetchone()}")
×
135
            ready = [result.ready() for result in results]
×
136
            successful = [result.successful() for result in results if result.ready()]
×
137
            self.stdout.write(f"{ready=}")
×
138
            self.stdout.write(f"{successful=}")
×
139

140
            # exit loop if all tasks returned success
141
            if len(successful) == self.processes and all(successful):
×
142
                break
×
143
            # raise exception reporting exceptions received from workers
144
            if len(successful) == self.processes:
×
145
                raise Exception(
×
146
                    f"Workers raised following exceptions {[result._value for result in results if not result.successful()]}"
147
                )
148

149
    def split_file(self):
×
150
        self.split_dir = self.tmp_dir / "split"
×
151
        self.split_dir.mkdir(parents=True, exist_ok=True)
×
152
        self.stdout.write(
×
153
            f"Splitting {self.file_path} in to {self.processes} parts, saving to {self.split_dir}"
154
        )
155
        args = [
×
156
            "split",
157
            "-n",
158
            f"l/{self.processes}",
159
            "--additional-suffix=.csv",
160
            f"{self.local_file_path}",
161
            f"{self.split_dir}/{self.import_type}_split_",
162
        ]
163
        command = subprocess.Popen(args)
×
164
        command.communicate()
×
165
        return list(self.split_dir.glob("*"))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc