• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DemocracyClub / UK-Polling-Stations / 59b19fef-6de2-484b-83e8-fe0e256ba1cb

31 May 2024 09:33AM UTC coverage: 71.051% (+0.02%) from 71.032%
59b19fef-6de2-484b-83e8-fe0e256ba1cb

push

circleci

symroe
Take ballot JSON from EE

This isn't ideal, but there are slight differences between the JSON we
have from WCIVF and the JSON we have from EE.

To save time, just take the EE JSON

We can resolve the other issues at a later date

1 of 5 new or added lines in 1 file covered. (20.0%)

2 existing lines in 1 file now uncovered.

3765 of 5299 relevant lines covered (71.05%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/polling_stations/apps/addressbase/management/commands/split_and_load_csv_to_database.py
1
import multiprocessing
×
2
import os
×
3
import subprocess
×
4
import tempfile
×
5
import time
×
6
from pathlib import Path
×
7

8
import boto3
×
9
from django.conf import settings
×
10
from django.core.management import BaseCommand
×
11
from django.db import DEFAULT_DB_ALIAS, connections
×
12

13
APPROX_ADDRESS_BASE_BYTES = 3_300_000_000
×
14

15

16
def import_single_file(file_name, table_name, database):
×
17
    file_sql_template = f"""
×
18
        BEGIN;
19
        SET LOCAL synchronous_commit TO OFF;
20
        COPY {table_name} FROM STDIN CSV;
21
        COMMIT;
22
    """
23
    host = settings.DATABASES[database]["HOST"]
×
24
    password = settings.DATABASES[database]["PASSWORD"]
×
25
    database_name = settings.DATABASES[database]["NAME"]
×
26

27
    command = f"""cat {file_name} | psql postgresql://postgres:{password}@{host}/{database_name} -c "{file_sql_template}" """
×
28
    process = subprocess.Popen(
×
29
        command,
30
        shell=True,
31
        stdout=subprocess.PIPE,
32
    )
33
    process.communicate()
×
34
    return True
×
35

36

37
class Command(BaseCommand):
×
38
    def add_arguments(self, parser):
×
39
        parser.add_argument(
×
40
            "--database",
41
            default=DEFAULT_DB_ALIAS,
42
            help=f"Nominates a database to import in to. Defaults to the '{DEFAULT_DB_ALIAS}' database.",
43
        )
44
        parser.add_argument(
×
45
            "--processes",
46
            default=10,
47
            type=int,
48
            help="The number of jobs to run in parallel",
49
        )
50
        parser.add_argument(
×
51
            "--import-type",
52
            choices=["addressbase", "uprntocouncil"],
53
            required=True,
54
            help="The type of data to import",
55
        )
56
        parser.add_argument(
×
57
            "--local-file-path",
58
            action="store",
59
            help="If provided, use a local file rather than downloading",
60
        )
61

62
    def handle(self, *args, **options):
×
63
        self.database = options["database"]
×
64
        self.processes = options["processes"]
×
65
        self.import_type = options["import_type"]
×
66
        _tmp_dir = tempfile.TemporaryDirectory()
×
67
        self.tmp_dir = Path(_tmp_dir.name)
×
68
        self.local_file_path = Path(options["local_file_path"])
×
69
        if self.local_file_path:
×
70
            self.file_path = self.local_file_path
×
71
        self.s3_client = boto3.client(
×
72
            "s3", region_name=os.environ.get("AWS_REGION", "eu-west-2")
73
        )
74
        if self.import_type == "addressbase":
×
75
            self.table_name = "addressbase_address"
×
76
        else:
77
            self.table_name = "addressbase_uprntocouncil"
×
78

79
        if not self.local_file_path:
×
80
            # Download the file to the tempdir
81
            self.download_file()
×
82
        # Split the file and save the parts in a list
83
        self.split_files = self.split_file()
×
84

85
        # Pass that list to the import function
86
        with connections[self.database].cursor() as cursor:
×
87
            self.cursor = cursor
×
88
            self.cursor.execute(
×
89
                f"ALTER TABLE {self.table_name} SET (autovacuum_enabled = false);"
90
            )
91
            self.stdout.write("clearing existing data..")
×
92
            cursor.execute(f"TRUNCATE TABLE {self.table_name} CASCADE;")
×
93
            self.run_processes()
×
94
            self.cursor.execute(
×
95
                f"ALTER TABLE {self.table_name} SET (autovacuum_enabled = true);"
96
            )
97

98
        _tmp_dir.cleanup()
×
99

100
    def download_file(self):
×
101
        """
102
        Find the latest file of the file type and download it to the temp_dir
103

104
        """
105
        files = self.s3_client.list_objects_v2(
×
106
            Bucket=settings.PRIVATE_DATA_BUCKET_NAME, Prefix=f"{self.import_type}/"
107
        )["Contents"]
108

109
        latest_file_key = sorted(files, key=lambda f: f["LastModified"])[0]["Key"]
×
110
        print(latest_file_key)
×
111
        file = Path(self.tmp_dir.name) / self.import_type / "full.csv"
×
112
        file.parent.mkdir(exist_ok=True, parents=True)
×
113
        self.file_path = file
×
114
        with file.open("wb") as f:
×
115
            self.s3_client.download_fileobj(
×
116
                settings.PRIVATE_DATA_BUCKET_NAME, latest_file_key, f
117
            )
118

119
    def run_processes(self):
×
120
        pool = multiprocessing.Pool(self.processes)
×
121
        results = []
×
122
        for file_name in self.split_files:
×
123
            result = pool.apply_async(
×
124
                import_single_file, (file_name, self.table_name, self.database)
125
            )
126
            results.append(result)
×
127
        pool.close()
×
128
        while True:
×
129
            time.sleep(1)
×
130
            # catch exception if results are not ready yet
131
            self.cursor.execute(
×
132
                f"select SUM(bytes_processed) / {APPROX_ADDRESS_BASE_BYTES} from pg_stat_progress_copy;"
133
            )
134
            self.stdout.write(f"Rough % done: {self.cursor.fetchone()}")
×
135
            ready = [result.ready() for result in results]
×
136
            successful = [result.successful() for result in results if result.ready()]
×
137
            self.stdout.write(f"{ready=}")
×
138
            self.stdout.write(f"{successful=}")
×
139

140
            # exit loop if all tasks returned success
141
            if len(successful) == self.processes and all(successful):
×
142
                break
×
143
            # raise exception reporting exceptions received from workers
144
            if len(successful) == self.processes:
×
145
                raise Exception(
×
146
                    f"Workers raised following exceptions {[result._value for result in results if not result.successful()]}"
147
                )
148

149
    def split_file(self):
×
150
        self.split_dir = self.tmp_dir / "split"
×
151
        self.split_dir.mkdir(parents=True, exist_ok=True)
×
152
        self.stdout.write(
×
153
            f"Splitting {self.file_path} in to {self.processes} parts, saving to {self.split_dir}"
154
        )
155
        args = [
×
156
            "split",
157
            "-n",
158
            f"l/{self.processes}",
159
            "--additional-suffix=.csv",
160
            f"{self.local_file_path}",
161
            f"{self.split_dir}/{self.import_type}_split_",
162
        ]
163
        command = subprocess.Popen(args)
×
164
        command.communicate()
×
165
        return list(self.split_dir.glob("*"))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc