• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / rtmis / #1109

16 Apr 2024 05:26AM UTC coverage: 79.111% (-0.04%) from 79.155%
#1109

Pull #1343

coveralls-python

web-flow
Merge branch 'develop' into feature/1148-update-sqlite-when-bulk-uploads
Pull Request #1343: [#1148] Generate sqlite after administration bulk uploads

2426 of 3228 branches covered (75.15%)

Branch coverage included in aggregate %.

5800 of 7170 relevant lines covered (80.89%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

21.05
backend/api/v1/v1_jobs/job.py
1
import logging
1✔
2
import os
1✔
3
from django_q.models import Task
1✔
4

5
import pandas as pd
1✔
6
from django.utils import timezone
1✔
7
from django_q.tasks import async_task
1✔
8
from api.v1.v1_jobs.administrations_bulk_upload import (
1✔
9
        seed_administration_data,
10
        validate_administrations_bulk_upload)
11

12
from api.v1.v1_profile.constants import UserRoleTypes
1✔
13
from api.v1.v1_forms.models import Forms
1✔
14
from api.v1.v1_jobs.constants import JobStatus, JobTypes
1✔
15
# from api.v1.v1_jobs.functions import HText
16
from api.v1.v1_jobs.models import Jobs
1✔
17
from api.v1.v1_jobs.seed_data import seed_excel_data
1✔
18
from api.v1.v1_jobs.validate_upload import validate
1✔
19
from api.v1.v1_profile.models import Administration
1✔
20
from api.v1.v1_users.models import SystemUser
1✔
21
from utils import storage
1✔
22
from utils.email_helper import send_email, EmailTypes
1✔
23
from utils.export_form import generate_definition_sheet
1✔
24
from utils.functions import update_date_time_format
1✔
25
from utils.storage import upload
1✔
26
from utils.custom_generator import generate_sqlite
1✔
27

28
logger = logging.getLogger(__name__)
1✔
29

30

31
def download(form: Forms, administration_ids):
1✔
32
    filter_data = {}
1✔
33
    if administration_ids:
1!
34
        filter_data['administration_id__in'] = administration_ids
1✔
35
    data = form.form_form_data.filter(**filter_data).order_by('-id')
1✔
36
    return [d.to_data_frame for d in data]
1✔
37

38

39
def rearrange_columns(col_names: list):
1✔
40
    meta_columns = ["id", "created_at", "created_by", "updated_at",
×
41
                    "updated_by", "datapoint_name", "administration",
42
                    "geolocation"]
43
    col_question = list(filter(lambda x: x not in meta_columns, col_names))
×
44
    if len(col_question) == len(col_names):
×
45
        return col_question
×
46
    col_names = meta_columns + col_question
×
47
    return col_names
×
48

49

50
def job_generate_download(job_id, **kwargs):
1✔
51
    job = Jobs.objects.get(pk=job_id)
×
52
    file_path = './tmp/{0}'.format(job.result)
×
53
    if os.path.exists(file_path):
×
54
        os.remove(file_path)
×
55
    administration_ids = False
×
56
    administration_name = "All Administration Level"
×
57
    if kwargs.get('administration'):
×
58
        administration = Administration.objects.get(
×
59
            pk=kwargs.get('administration'))
60
        if administration.path:
×
61
            filter_path = '{0}{1}.'.format(administration.path,
×
62
                                           administration.id)
63
        else:
64
            filter_path = f"{administration.id}."
×
65
        administration_ids = list(
×
66
            Administration.objects.filter(
67
                path__startswith=filter_path).values_list('id',
68
                                                          flat=True))
69
        administration_ids.append(administration.id)
×
70

71
        administration_name = list(
×
72
            Administration.objects.filter(
73
                path__startswith=filter_path).values_list('name',
74
                                                          flat=True))
75
    form = Forms.objects.get(pk=job.info.get('form_id'))
×
76
    data = download(form=form, administration_ids=administration_ids)
×
77
    df = pd.DataFrame(data)
×
78
    col_names = rearrange_columns(list(df))
×
79
    df = df[col_names]
×
80
    writer = pd.ExcelWriter(file_path, engine='xlsxwriter')
×
81
    df.to_excel(writer, sheet_name='data', index=False)
×
82
    generate_definition_sheet(form=form, writer=writer)
×
83
    context = [{
×
84
        "context": "Form Name",
85
        "value": form.name
86
    }, {
87
        "context": "Download Date",
88
        "value": update_date_time_format(job.created)
89
    }, {
90
        "context": "Administration",
91
        "value": ','.join(administration_name) if isinstance(
92
            administration_name, list) else administration_name
93
    }]
94

95
    context = pd.DataFrame(context).groupby(["context", "value"],
×
96
                                            sort=False).first()
97
    context.to_excel(writer, sheet_name='context', startrow=0, header=False)
×
98
    workbook = writer.book
×
99
    worksheet = writer.sheets['context']
×
100
    f = workbook.add_format({
×
101
        'align': 'left',
102
        'bold': False,
103
        'border': 0,
104
    })
105
    worksheet.set_column('A:A', 20, f)
×
106
    worksheet.set_column('B:B', 30, f)
×
107
    merge_format = workbook.add_format({
×
108
        'bold': True,
109
        'border': 1,
110
        'align': 'center',
111
        'valign': 'vcenter',
112
        'fg_color': '#45add9',
113
        'color': '#ffffff',
114
    })
115
    worksheet.merge_range('A1:B1', 'Context', merge_format)
×
116
    writer.save()
×
117
    url = upload(file=file_path, folder='download')
×
118
    return url
×
119

120

121
def job_generate_download_result(task):
1✔
122
    job = Jobs.objects.get(task_id=task.id)
×
123
    job.attempt = job.attempt + 1
×
124
    if task.success:
×
125
        job.status = JobStatus.done
×
126
        job.available = timezone.now()
×
127
    else:
128
        job.status = JobStatus.failed
×
129
    job.save()
×
130

131

132
def seed_data_job(job_id):
1✔
133
    try:
×
134
        job = Jobs.objects.get(pk=job_id)
×
135
        seed_excel_data(job)
×
136
    except (ValueError, OSError, NameError, TypeError):
×
137
        print("Error seed data job")
×
138
        return False
×
139
    except Exception as unknown_error:
×
140
        print("Unknown error seed data job", unknown_error)
×
141
        return False
×
142
    return True
×
143

144

145
def seed_data_job_result(task):
1✔
146
    job = Jobs.objects.get(task_id=task.id)
×
147
    job.attempt = job.attempt + 1
×
148
    is_super_admin = job.user.user_access.role == UserRoleTypes.super_admin
×
149
    if task.result:
×
150
        job.status = JobStatus.done
×
151
        job.available = timezone.now()
×
152
        form_id = job.info.get("form")
×
153
        form = Forms.objects.filter(pk=int(form_id)).first()
×
154
        file = job.info.get("file")
×
155
        storage.download(f"upload/{file}")
×
156
        df = pd.read_excel(f"./tmp/{file}", sheet_name='data')
×
157
        subject = "New Data Uploaded" if is_super_admin \
×
158
            else 'New Request @{0}'.format(job.user.get_full_name())
159
        data = {
×
160
            'subject': subject,
161
            'title': "New Data Submission",
162
            'send_to': [job.user.email],
163
            'listing': [{
164
                'name': "Upload Date",
165
                'value': job.created.strftime("%m-%d-%Y, %H:%M:%S"),
166
            }, {
167
                'name': "Questionnaire",
168
                'value': form.name
169
            }, {
170
                'name': "Number of Records",
171
                'value': df.shape[0]
172
            }],
173
            'is_super_admin': is_super_admin
174
        }
175
        send_email(context=data, type=EmailTypes.new_request)
×
176
    else:
177
        job.status = JobStatus.failed
×
178
    job.save()
×
179

180

181
def validate_excel(job_id):
1✔
182
    job = Jobs.objects.get(pk=job_id)
×
183
    storage.download(f"upload/{job.info.get('file')}")
×
184
    data = validate(job.info.get('form'), job.info.get('administration'),
×
185
                    f"./tmp/{job.info.get('file')}")
186

187
    if len(data):
×
188
        form_id = job.info.get("form")
×
189
        form = Forms.objects.filter(pk=int(form_id)).first()
×
190
        file = job.info.get("file")
×
191
        df = pd.read_excel(f"./tmp/{file}", sheet_name='data')
×
192
        error_list = pd.DataFrame(data)
×
193
        error_list = error_list[list(
×
194
            filter(lambda x: x != "error", list(error_list)))]
195
        error_file = f"./tmp/error-{job_id}.csv"
×
196
        error_list.to_csv(error_file, index=False)
×
197
        data = {
×
198
            'send_to': [job.user.email],
199
            'listing': [{
200
                'name': "Upload Date",
201
                'value': job.created.strftime("%m-%d-%Y, %H:%M:%S"),
202
            }, {
203
                'name': "Questionnaire",
204
                'value': form.name
205
            }, {
206
                'name': "Number of Records",
207
                'value': df.shape[0]
208
            }],
209
        }
210
        send_email(context=data,
×
211
                   type=EmailTypes.upload_error,
212
                   path=error_file,
213
                   content_type='text/csv')
214
        return False
×
215
    return True
×
216

217

218
def validate_excel_result(task):
1✔
219
    job = Jobs.objects.get(task_id=task.id)
×
220
    job.attempt = job.attempt + 1
×
221
    job_info = job.info
×
222
    if task.result:
×
223
        job.status = JobStatus.done
×
224
        job.available = timezone.now()
×
225
        job.save()
×
226
        job_info.update({'ref_job_id': job.id})
×
227
        new_job = Jobs.objects.create(
×
228
            result=job.info.get('file'),
229
            type=JobTypes.seed_data,
230
            status=JobStatus.on_progress,
231
            user=job.user,
232
            info=job_info
233
        )
234
        task_id = async_task(
×
235
            'api.v1.v1_jobs.job.seed_data_job', new_job.id,
236
            hook='api.v1.v1_jobs.job.seed_data_job_result')
237
        new_job.task_id = task_id
×
238
        new_job.save()
×
239
    else:
240
        job.status = JobStatus.failed
×
241
        job.save()
×
242

243

244
def handle_administrations_bulk_upload(filename, user_id, upload_time):
1✔
245
    user = SystemUser.objects.get(id=user_id)
×
246
    storage.download(f"upload/{filename}")
×
247
    file_path = f"./tmp/{filename}"
×
248
    errors = validate_administrations_bulk_upload(file_path)
×
249
    df = pd.read_excel(file_path, sheet_name='data')
×
250
    email_context = {
×
251
        'send_to': [user.email],
252
        'listing': [
253
            {
254
                'name': 'Upload Date',
255
                'value': upload_time.strftime('%m-%d-%Y, %H:%M:%S'),
256
            },
257
            {
258
                'name': 'Questionnaire',
259
                'value': 'Administrative List',
260
            },
261
            {
262
                'name': "Number of Records",
263
                'value': df.shape[0]
264
            },
265
        ]
266
    }
267
    if len(errors):
×
268
        logger.error(errors)
×
269
        error_file = (
×
270
            "./tmp/administration-error-"
271
            f"{upload_time.strftime('%Y%m%d%H%M%S')}-{user.id}.csv"
272
        )
273
        error_list = pd.DataFrame(errors)
×
274
        error_list.to_csv(error_file, index=False)
×
275
        send_email(context=email_context,
×
276
                   type=EmailTypes.upload_error,
277
                   path=error_file,
278
                   content_type='text/csv')
279
        return
×
280
    seed_administration_data(file_path)
×
281
    generate_sqlite(Administration)
×
282
    send_email(context=email_context, type=EmailTypes.administration_upload)
×
283

284

285
def handle_administrations_bulk_upload_failure(task: Task):
1✔
286
    if task.success:
×
287
        return
×
288
    logger.error({
×
289
        'error': 'Failed running background job',
290
        'id': task.id,
291
        'name': task.name,
292
        'started': task.started,
293
        'stopped': task.stopped,
294
        'args': task.args,
295
        'kwargs': task.kwargs,
296
        'body': task.result,
297
    })
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc