• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / iwsims-demo / #72

28 Apr 2025 03:28AM UTC coverage: 86.134% (+1.1%) from 85.024%
#72

push

coveralls-python

web-flow
Merge pull request #20 from akvo/feature/19-eng-1231-dynamic-level-approval

Feature/19 eng 1231 dynamic level approval

2646 of 3188 branches covered (83.0%)

Branch coverage included in aggregate %.

5995 of 6844 relevant lines covered (87.59%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.89
backend/api/v1/v1_jobs/job.py
1
import logging
1✔
2
import os
1✔
3
from django_q.models import Task
1✔
4

5
import pandas as pd
1✔
6
from django.utils import timezone
1✔
7
from django_q.tasks import async_task
1✔
8
from django.db.models import Subquery, Max
1✔
9
from api.v1.v1_jobs.administrations_bulk_upload import (
1✔
10
    seed_administration_data,
11
    validate_administrations_bulk_upload,
12
)
13
from utils.upload_entities import (
1✔
14
    validate_entity_file,
15
    validate_entity_data,
16
)
17
from api.v1.v1_profile.constants import UserRoleTypes
1✔
18
from api.v1.v1_forms.models import Forms, QuestionOptions
1✔
19
from api.v1.v1_forms.constants import SubmissionTypes, QuestionTypes
1✔
20
from api.v1.v1_jobs.constants import JobStatus, JobTypes
1✔
21

22
# from api.v1.v1_jobs.functions import HText
23
from api.v1.v1_jobs.models import Jobs
1✔
24
from api.v1.v1_jobs.seed_data import seed_excel_data
1✔
25
from api.v1.v1_jobs.validate_upload import validate
1✔
26
from api.v1.v1_profile.models import Administration, EntityData
1✔
27
from api.v1.v1_users.models import SystemUser
1✔
28
from utils import storage
1✔
29
from utils.email_helper import send_email, EmailTypes
1✔
30
from utils.export_form import (
1✔
31
    generate_definition_sheet,
32
    get_question_names,
33
    blank_data_template,
34
    meta_columns,
35
)
36
from utils.functions import update_date_time_format
1✔
37
from utils.storage import upload
1✔
38
from utils.custom_generator import generate_sqlite
1✔
39

40
logger = logging.getLogger(__name__)
1✔
41

42

43
def download_data(
1✔
44
    form: Forms, administration_ids, download_type="all", submission_type=None
45
):
46
    filter_data = {}
1✔
47
    if administration_ids and submission_type:
1!
48
        filter_data["administration_id__in"] = administration_ids
×
49
    if submission_type:
1!
50
        filter_data["submission_type"] = submission_type
×
51
    else:
52
        filter_data["submission_type__in"] = [
1✔
53
            SubmissionTypes.registration,
54
            SubmissionTypes.monitoring,
55
        ]
56
    data = form.form_form_data.filter(**filter_data)
1✔
57
    if download_type == "recent":
1✔
58
        latest_per_uuid = (
1✔
59
            data.values("uuid")
60
            .annotate(latest_created=Max("created"))
61
            .values("latest_created")
62
        )
63
        data = data.filter(created__in=Subquery(latest_per_uuid))
1✔
64
    return [d.to_data_frame for d in data.order_by("id")]
1✔
65

66

67
def get_answer_label(answer_values, question_id):
1✔
68
    if answer_values is None or answer_values != answer_values:
×
69
        return answer_values
×
70
    answer_value = answer_values.split("|")
×
71
    answer_label = []
×
72
    for value in answer_value:
×
73
        options = QuestionOptions.objects.filter(
×
74
            question_id=question_id, value=value
75
        ).first()
76
        if options:
×
77
            answer_label.append(options.label)
×
78
    return "|".join(answer_label)
×
79

80

81
def generate_data_sheet(
1✔
82
    writer: pd.ExcelWriter,
83
    form: Forms,
84
    administration_ids=None,
85
    submission_type: SubmissionTypes = None,
86
    download_type: str = "all",
87
    use_label: bool = False,
88
) -> None:
89
    questions = get_question_names(form=form)
1✔
90
    data = download_data(
1✔
91
        form=form,
92
        administration_ids=administration_ids,
93
        submission_type=submission_type,
94
        download_type=download_type,
95
    )
96
    if len(data):
1!
97
        df = pd.DataFrame(data)
1✔
98
        new_columns = {}
1✔
99
        for question in questions:
1✔
100
            question_id, question_name, question_type = question
1✔
101
            if question_name not in df:
1!
102
                df[question_name] = None
×
103
            if use_label:
1!
104
                if question_type in [
×
105
                    QuestionTypes.option,
106
                    QuestionTypes.multiple_option,
107
                ]:
108
                    new_columns[question_name] = df[question_name].apply(
×
109
                        lambda x: get_answer_label(x, question_id)
110
                    )
111
        question_names = [question[1] for question in questions]
1✔
112
        if use_label:
1!
113
            new_df = pd.DataFrame(new_columns)
×
114
            df.drop(columns=list(new_df), inplace=True)
×
115
            df = pd.concat([df, new_df], axis=1)
×
116
        # Reorder columns
117
        df = df[meta_columns + question_names]
1✔
118
        df.to_excel(writer, sheet_name="data", index=False)
1✔
119
        generate_definition_sheet(form=form, writer=writer)
1✔
120
    else:
121
        blank_data_template(form=form, writer=writer)
×
122

123

124
def job_generate_data_download(job_id, **kwargs):
1✔
125
    job = Jobs.objects.get(pk=job_id)
1✔
126
    file_path = "./tmp/{0}".format(job.result)
1✔
127
    if os.path.exists(file_path):
1!
128
        os.remove(file_path)
×
129
    administration_ids = False
1✔
130
    administration_name = "All Administration Level"
1✔
131
    if kwargs.get("administration"):
1✔
132
        administration = Administration.objects.get(
1✔
133
            pk=kwargs.get("administration")
134
        )
135
        if administration.path:
1!
136
            filter_path = "{0}{1}.".format(
1✔
137
                administration.path, administration.id
138
            )
139
        else:
140
            filter_path = f"{administration.id}."
×
141
        administration_ids = list(
1✔
142
            Administration.objects.filter(
143
                path__startswith=filter_path
144
            ).values_list("id", flat=True)
145
        )
146
        administration_ids.append(administration.id)
1✔
147

148
        administration_name = list(
1✔
149
            Administration.objects.filter(
150
                path__startswith=filter_path
151
            ).values_list("name", flat=True)
152
        )
153
    form = Forms.objects.get(pk=job.info.get("form_id"))
1✔
154
    download_type = kwargs.get("download_type")
1✔
155
    submission_type = kwargs.get("submission_type")
1✔
156
    writer = pd.ExcelWriter(file_path, engine="xlsxwriter")
1✔
157

158
    generate_data_sheet(
1✔
159
        writer=writer,
160
        form=form,
161
        administration_ids=administration_ids,
162
        submission_type=submission_type,
163
        download_type=download_type,
164
        use_label=job.info.get("use_label"),
165
    )
166

167
    context = [
1✔
168
        {"context": "Form Name", "value": form.name},
169
        {
170
            "context": "Download Date",
171
            "value": update_date_time_format(job.created),
172
        },
173
        {
174
            "context": "Administration",
175
            "value": ",".join(administration_name)
176
            if isinstance(administration_name, list)
177
            else administration_name,
178
        },
179
    ]
180

181
    context = (
1✔
182
        pd.DataFrame(context).groupby(["context", "value"], sort=False).first()
183
    )
184
    context.to_excel(writer, sheet_name="context", startrow=0, header=False)
1✔
185
    workbook = writer.book
1✔
186
    worksheet = writer.sheets["context"]
1✔
187
    f = workbook.add_format(
1✔
188
        {
189
            "align": "left",
190
            "bold": False,
191
            "border": 0,
192
        }
193
    )
194
    worksheet.set_column("A:A", 20, f)
1✔
195
    worksheet.set_column("B:B", 30, f)
1✔
196
    merge_format = workbook.add_format(
1✔
197
        {
198
            "bold": True,
199
            "border": 1,
200
            "align": "center",
201
            "valign": "vcenter",
202
            "fg_color": "#45add9",
203
            "color": "#ffffff",
204
        }
205
    )
206
    worksheet.merge_range("A1:B1", "Context", merge_format)
1✔
207
    writer.save()
1✔
208
    url = upload(file=file_path, folder="download")
1✔
209
    return url
1✔
210

211

212
def job_generate_data_download_result(task):
1✔
213
    job = Jobs.objects.get(task_id=task.id)
×
214
    job.attempt = job.attempt + 1
×
215
    if task.success:
×
216
        job.status = JobStatus.done
×
217
        job.available = timezone.now()
×
218
    else:
219
        job.status = JobStatus.failed
×
220
    job.save()
×
221

222

223
def seed_data_job(job_id):
1✔
224
    try:
×
225
        job = Jobs.objects.get(pk=job_id)
×
226
        seed_excel_data(job)
×
227
    except (ValueError, OSError, NameError, TypeError):
×
228
        print("Error seed data job")
×
229
        return False
×
230
    except Exception as unknown_error:
×
231
        print("Unknown error seed data job", unknown_error)
×
232
        return False
×
233
    return True
×
234

235

236
def seed_data_job_result(task):
1✔
237
    job = Jobs.objects.get(task_id=task.id)
×
238
    job.attempt = job.attempt + 1
×
239
    is_super_admin = job.user.user_access.role == UserRoleTypes.super_admin
×
240
    if task.result:
×
241
        job.status = JobStatus.done
×
242
        job.available = timezone.now()
×
243
        form_id = job.info.get("form")
×
244
        form = Forms.objects.filter(pk=int(form_id)).first()
×
245
        file = job.info.get("file")
×
246
        storage.download(f"upload/{file}")
×
247
        df = pd.read_excel(f"./tmp/{file}", sheet_name="data")
×
248
        subject = (
×
249
            "New Data Uploaded"
250
            if is_super_admin
251
            else "New Request @{0}".format(job.user.get_full_name())
252
        )
253
        data = {
×
254
            "subject": subject,
255
            "title": "New Data Submission",
256
            "send_to": [job.user.email],
257
            "listing": [
258
                {
259
                    "name": "Upload Date",
260
                    "value": job.created.strftime("%m-%d-%Y, %H:%M:%S"),
261
                },
262
                {"name": "Questionnaire", "value": form.name},
263
                {"name": "Number of Records", "value": df.shape[0]},
264
            ],
265
            "is_super_admin": is_super_admin,
266
        }
267
        send_email(context=data, type=EmailTypes.new_request)
×
268
    else:
269
        job.status = JobStatus.failed
×
270
    job.save()
×
271

272

273
def validate_excel(job_id):
1✔
274
    job = Jobs.objects.get(pk=job_id)
×
275
    storage.download(f"upload/{job.info.get('file')}")
×
276
    data = validate(
×
277
        job.info.get("form"),
278
        job.info.get("administration"),
279
        f"./tmp/{job.info.get('file')}",
280
    )
281

282
    if len(data):
×
283
        form_id = job.info.get("form")
×
284
        form = Forms.objects.filter(pk=int(form_id)).first()
×
285
        file = job.info.get("file")
×
286
        df = pd.read_excel(f"./tmp/{file}", sheet_name="data")
×
287
        error_list = pd.DataFrame(data)
×
288
        error_list = error_list[
×
289
            list(filter(lambda x: x != "error", list(error_list)))
290
        ]
291
        error_file = f"./tmp/error-{job_id}.csv"
×
292
        error_list.to_csv(error_file, index=False)
×
293
        data = {
×
294
            "send_to": [job.user.email],
295
            "listing": [
296
                {
297
                    "name": "Upload Date",
298
                    "value": job.created.strftime("%m-%d-%Y, %H:%M:%S"),
299
                },
300
                {"name": "Questionnaire", "value": form.name},
301
                {"name": "Number of Records", "value": df.shape[0]},
302
            ],
303
        }
304
        send_email(
×
305
            context=data,
306
            type=EmailTypes.upload_error,
307
            path=error_file,
308
            content_type="text/csv",
309
        )
310
        return False
×
311
    return True
×
312

313

314
def validate_excel_result(task):
1✔
315
    job = Jobs.objects.get(task_id=task.id)
×
316
    job.attempt = job.attempt + 1
×
317
    job_info = job.info
×
318
    if task.result:
×
319
        job.status = JobStatus.done
×
320
        job.available = timezone.now()
×
321
        job.save()
×
322
        job_info.update({"ref_job_id": job.id})
×
323
        new_job = Jobs.objects.create(
×
324
            result=job.info.get("file"),
325
            type=JobTypes.seed_data,
326
            status=JobStatus.on_progress,
327
            user=job.user,
328
            info=job_info,
329
        )
330
        task_id = async_task(
×
331
            "api.v1.v1_jobs.job.seed_data_job",
332
            new_job.id,
333
            hook="api.v1.v1_jobs.job.seed_data_job_result",
334
        )
335
        new_job.task_id = task_id
×
336
        new_job.save()
×
337
    else:
338
        job.status = JobStatus.failed
×
339
        job.save()
×
340

341

342
def handle_administrations_bulk_upload(filename, user_id, upload_time):
1✔
343
    user = SystemUser.objects.get(id=user_id)
×
344
    storage.download(f"upload/{filename}")
×
345
    file_path = f"./tmp/{filename}"
×
346
    errors = validate_administrations_bulk_upload(file_path)
×
347
    xlsx = pd.ExcelFile(file_path)
×
348
    if "data" not in xlsx.sheet_names:
×
349
        logger.error(f"Sheet 'data' not found in {filename}")
×
350
        send_email(
×
351
            context={
352
                "send_to": [user.email],
353
                "listing": [
354
                    {
355
                        "name": "Upload Date",
356
                        "value": upload_time.strftime("%m-%d-%Y, %H:%M:%S"),
357
                    },
358
                    {
359
                        "name": "Questionnaire",
360
                        "value": "Administrative List",
361
                    },
362
                    {
363
                        "name": "Error",
364
                        "value": 'Sheet "data" not found',
365
                    },
366
                ],
367
            },
368
            type=EmailTypes.upload_error,
369
        )
370
        return
×
371
    df = pd.read_excel(file_path, sheet_name="data")
×
372
    email_context = {
×
373
        "send_to": [user.email],
374
        "listing": [
375
            {
376
                "name": "Upload Date",
377
                "value": upload_time.strftime("%m-%d-%Y, %H:%M:%S"),
378
            },
379
            {
380
                "name": "Questionnaire",
381
                "value": "Administrative List",
382
            },
383
            {"name": "Number of Records", "value": df.shape[0]},
384
        ],
385
    }
386
    if len(errors):
×
387
        logger.error(errors)
×
388
        error_file = (
×
389
            "./tmp/administration-error-"
390
            f"{upload_time.strftime('%Y%m%d%H%M%S')}-{user.id}.csv"
391
        )
392
        error_list = pd.DataFrame(errors)
×
393
        error_list.to_csv(error_file, index=False)
×
394
        send_email(
×
395
            context=email_context,
396
            type=EmailTypes.upload_error,
397
            path=error_file,
398
            content_type="text/csv",
399
        )
400
        return
×
401
    seed_administration_data(file_path)
×
402
    generate_sqlite(Administration)
×
403
    send_email(context=email_context, type=EmailTypes.administration_upload)
×
404

405

406
def handle_master_data_bulk_upload_failure(task: Task):
1✔
407
    if task.success:
×
408
        return
×
409
    logger.error(
×
410
        {
411
            "error": "Failed running background job",
412
            "id": task.id,
413
            "name": task.name,
414
            "started": task.started,
415
            "stopped": task.stopped,
416
            "args": task.args,
417
            "kwargs": task.kwargs,
418
            "body": task.result,
419
        }
420
    )
421

422

423
def handle_entities_error_upload(
1✔
424
    errors: list,
425
    email_context: dict,
426
    user: SystemUser,
427
    upload_time: timezone.datetime,
428
):
429
    logger.error(errors)
1✔
430
    error_file = (
1✔
431
        "./tmp/entity-error-"
432
        f"{upload_time.strftime('%Y%m%d%H%M%S')}-{user.id}.csv"
433
    )
434
    error_list = pd.DataFrame(errors)
1✔
435
    error_list.to_csv(error_file, index=False)
1✔
436
    send_email(
1✔
437
        context=email_context,
438
        type=EmailTypes.upload_error,
439
        path=error_file,
440
        content_type="text/csv",
441
    )
442

443

444
def handle_entities_bulk_upload(filename, user_id, upload_time):
1✔
445
    user = SystemUser.objects.get(id=user_id)
1✔
446
    storage.download(f"upload/{filename}")
1✔
447
    file_path = f"./tmp/{filename}"
1✔
448
    errors = validate_entity_file(file_path)
1✔
449
    email_context = {
1✔
450
        "send_to": [user.email],
451
        "listing": [
452
            {
453
                "name": "Upload Date",
454
                "value": upload_time.strftime("%m-%d-%Y, %H:%M:%S"),
455
            },
456
            {
457
                "name": "Questionnaire",
458
                "value": "Entity List",
459
            },
460
        ],
461
    }
462
    if len(errors):
1!
463
        handle_entities_error_upload(errors, email_context, user, upload_time)
1✔
464
        return
1✔
465
    errors = validate_entity_data(file_path)
×
466
    if len(errors):
×
467
        handle_entities_error_upload(errors, email_context, user, upload_time)
×
468
        return
×
469
    generate_sqlite(EntityData)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc