• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / iwsims / #59

18 Jun 2026 07:20AM UTC coverage: 88.033% (-0.1%) from 88.13%
#59

push

coveralls-python

web-flow
Merge 5dfcb298b into a6f6761c9

5183 of 6053 branches covered (85.63%)

Branch coverage included in aggregate %.

9979 of 11170 relevant lines covered (89.34%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.55
backend/api/v1/v1_visualization/values_functions.py
1
from collections import defaultdict
1✔
2

3
from django.db.models import Count, Avg, F, OuterRef, Subquery
1✔
4
from django.db.models.functions import TruncDate, TruncMonth, Substr
1✔
5

6
from api.v1.v1_data.models import FormData
1✔
7
from api.v1.v1_forms.models import QuestionOptions
1✔
8
from api.v1.v1_visualization.constants import AGG_FUNCS
1✔
9
from api.v1.v1_visualization.models import (
1✔
10
    MVAnswerDenormalized,
11
    MVLatestMonitoring,
12
    MVParentAggregates,
13
)
14
from api.v1.v1_visualization.functions import (
1✔
15
    get_base_monitoring_qs,
16
    get_monitoring_data_ids,
17
    format_month_label,
18
    format_month_group,
19
    format_date_group,
20
    fill_month_gaps,
21
    fill_date_gaps,
22
    apply_administration_filter,
23
    apply_parent_criteria_to_qs,
24
)
25

26

27
def _should_fill_gaps(params):
1✔
28
    """Only gap-fill when both from_date and to_date are provided."""
29
    return bool(
1✔
30
        params.get("from_date") and params.get("to_date")
31
    )
32

33

34
def _date_answer_sq(date_qname):
1✔
35
    """Subquery: the date-question answer_name for the same data_id.
36

37
    Used to bucket value/option rows by the month of a separate date
38
    question (rather than the submission's own created timestamp).
39
    """
40
    return MVAnswerDenormalized.objects.filter(
×
41
        data_id=OuterRef("data_id"),
42
        question_name=date_qname,
43
        answer_name__isnull=False,
44
    ).values("answer_name")[:1]
45

46

47
def _finalize_month(data, params):
1✔
48
    """Apply month gap-fill (when bounded) and return (data, labels)."""
49
    if _should_fill_gaps(params):
1✔
50
        data = fill_month_gaps(
1✔
51
            data, params["from_date"], params["to_date"]
52
        )
53
    return data, [d["label"] for d in data]
1✔
54

55

56
def _finalize_date(data, params):
1✔
57
    """Apply day gap-fill (when bounded) and return (data, labels)."""
58
    if _should_fill_gaps(params):
1✔
59
        data = fill_date_gaps(
1✔
60
            data, params["from_date"], params["to_date"]
61
        )
62
    return data, [d["label"] for d in data]
1✔
63

64

65
def _total_parents_in_scope(form, params):
1✔
66
    """Count all parent registrations in scope, respecting filters."""
67
    scope_form = form.parent if form.parent else form
1✔
68
    qs = FormData.objects.filter(
1✔
69
        form=scope_form,
70
        parent__isnull=True,
71
        is_pending=False,
72
        is_draft=False,
73
    )
74
    administration_id = params.get("administration_id")
1✔
75
    if administration_id:
1!
76
        qs = apply_administration_filter(qs, administration_id)
1✔
77
    qs = apply_parent_criteria_to_qs(
1✔
78
        qs, True, params.get("parent_criteria"),
79
    )
80
    return qs.count()
1✔
81

82

83
def _count_no_info_parents(form, params, qualifying_ids):
1✔
84
    """Count datapoints in scope with no qualifying answer.
85

86
    For monitoring forms: counts parent registrations without any
87
    qualifying monitoring submission (gap in monitoring coverage).
88
    For registration forms: counts registrations that exist but have no
89
    answer for the question (field left blank / skipped).
90

91
    Respects administration_id and parent_criteria so the count
92
    reconciles with option counts under filtering (FR-3).
93
    """
94
    total = _total_parents_in_scope(form, params)
1✔
95
    return max(0, total - len(qualifying_ids))
1✔
96

97

98
# -- Count mode handler --
99

100
def handle_count_mode(form, params):
1✔
101
    """Handle count mode (no question_id)."""
102
    form_id = form.id
1✔
103
    monitoring = params.get("monitoring", "latest")
1✔
104
    group_by = params.get("group_by")
1✔
105
    value_type = params.get("value_type", "number")
1✔
106
    sum_by = params.get("sum_by")
1✔
107
    is_monitoring = form.parent is not None
1✔
108

109
    if is_monitoring and monitoring == "latest" \
1✔
110
            and sum_by == "parent_id":
111
        qs, is_latest, _ = get_base_monitoring_qs(
1✔
112
            form, form_id, params
113
        )
114
        count = qs.count()
1✔
115
        if value_type == "percentage":
1✔
116
            total = FormData.objects.filter(
1✔
117
                form=form.parent,
118
                parent__isnull=True,
119
                is_pending=False,
120
                is_draft=False,
121
            ).count()
122
            value = round(
1✔
123
                (count / total * 100), 2
124
            ) if total > 0 else 0
125
        else:
126
            value = count
1✔
127
        return (
1✔
128
            [{"value": value, "label": "Total"}],
129
            ["Total"],
130
        )
131

132
    qs, is_latest, _ = get_base_monitoring_qs(
1✔
133
        form, form_id, params
134
    )
135

136
    if not group_by:
1✔
137
        count = qs.count()
1✔
138
        if value_type == "percentage" and is_monitoring:
1✔
139
            total = FormData.objects.filter(
1✔
140
                form=form.parent,
141
                parent__isnull=True,
142
                is_pending=False,
143
                is_draft=False,
144
            ).count()
145
            value = round(
1✔
146
                (count / total * 100), 2
147
            ) if total > 0 else 0
148
        else:
149
            value = count
1✔
150
        return (
1✔
151
            [{"value": value, "label": "Total"}],
152
            ["Total"],
153
        )
154

155
    if group_by == "month":
1✔
156
        return _count_group_by_month(qs, is_latest, params)
1✔
157

158
    if group_by == "parent_id":
1✔
159
        return _count_group_by_parent(qs, is_latest)
1✔
160

161
    if group_by == "id":
1✔
162
        return _count_group_by_id(qs, is_latest)
1✔
163

164
    if group_by == "date":
1!
165
        return _count_group_by_date(qs, is_latest, params)
1✔
166

167
    return [{"value": 0, "label": "Total"}], ["Total"]
×
168

169

170
def _count_group_by_month(qs, is_latest, params):
1✔
171
    """Count grouped by month."""
172
    date_qname = params.get("date_question_name")
1✔
173

174
    if is_latest:
1✔
175
        data_ids = get_monitoring_data_ids(qs, is_latest)
1✔
176
        if date_qname:
1✔
177
            answer_qs = MVAnswerDenormalized.objects.filter(
1✔
178
                data_id__in=data_ids,
179
                question_name=date_qname,
180
                answer_name__isnull=False,
181
            )
182
            results = answer_qs.annotate(
1✔
183
                year_month=Substr("answer_name", 1, 7),
184
            ).values("year_month").annotate(
185
                count=Count("data_id", distinct=True),
186
            ).order_by("year_month")
187
            data = [
1✔
188
                {
189
                    "value": r["count"],
190
                    "label": format_month_label(
191
                        r["year_month"]
192
                    ),
193
                    "group": r["year_month"],
194
                }
195
                for r in results
196
            ]
197
        else:
198
            results = MVAnswerDenormalized.objects.filter(
1✔
199
                data_id__in=data_ids,
200
            ).annotate(
201
                month=TruncMonth("data_created"),
202
            ).values("month").annotate(
203
                count=Count("data_id", distinct=True),
204
            ).order_by("month")
205
            data = [
1✔
206
                {
207
                    "value": r["count"],
208
                    "label": format_month_label(r["month"]),
209
                    "group": format_month_group(r["month"]),
210
                }
211
                for r in results
212
            ]
213
    else:
214
        if date_qname:
1✔
215
            answer_qs = MVAnswerDenormalized.objects.filter(
1✔
216
                data_id__in=qs.values("id"),
217
                question_name=date_qname,
218
                answer_name__isnull=False,
219
            )
220
            results = answer_qs.annotate(
1✔
221
                year_month=Substr("answer_name", 1, 7),
222
            ).values("year_month").annotate(
223
                count=Count("data_id", distinct=True),
224
            ).order_by("year_month")
225
            data = [
1✔
226
                {
227
                    "value": r["count"],
228
                    "label": format_month_label(
229
                        r["year_month"]
230
                    ),
231
                    "group": r["year_month"],
232
                }
233
                for r in results
234
            ]
235
        else:
236
            results = qs.annotate(
1✔
237
                month=TruncMonth("created"),
238
            ).values("month").annotate(
239
                count=Count("id"),
240
            ).order_by("month")
241
            data = [
1✔
242
                {
243
                    "value": r["count"],
244
                    "label": format_month_label(r["month"]),
245
                    "group": format_month_group(r["month"]),
246
                }
247
                for r in results
248
            ]
249

250
    return _finalize_month(data, params)
1✔
251

252

253
def _count_group_by_parent(qs, is_latest):
1✔
254
    """Count grouped by parent_id."""
255
    if is_latest:
1✔
256
        data = [
1✔
257
            {
258
                "value": 1,
259
                "label": p.name,
260
                "group": str(p.id),
261
            }
262
            for p in qs.only("id", "name")
263
        ]
264
    else:
265
        results = qs.filter(
1✔
266
            parent__isnull=False,
267
        ).values(
268
            "parent_id",
269
            parent_name=F("parent__name"),
270
        ).annotate(
271
            count=Count("id"),
272
        ).order_by("parent_name")
273
        data = [
1✔
274
            {
275
                "value": r["count"],
276
                "label": r["parent_name"],
277
                "group": str(r["parent_id"]),
278
            }
279
            for r in results
280
        ]
281
    labels = [d["label"] for d in data]
1✔
282
    return data, labels
1✔
283

284

285
def _count_group_by_id(qs, is_latest):
1✔
286
    """Count grouped by individual record id (value=1 per row)."""
287
    if is_latest:
1✔
288
        data = [
1✔
289
            {
290
                "value": 1,
291
                "label": p.name,
292
                "group": str(p.latest_id),
293
            }
294
            for p in qs.only("id", "name")
295
        ]
296
    else:
297
        data = [
1✔
298
            {
299
                "value": 1,
300
                "label": r.name,
301
                "group": str(r.id),
302
            }
303
            for r in qs.only("id", "name").order_by("id")
304
        ]
305
    labels = [d["label"] for d in data]
1✔
306
    return data, labels
1✔
307

308

309
def _count_group_by_date(qs, is_latest, params):
1✔
310
    """Count grouped by individual date (not month bucket)."""
311
    date_qname = params.get("date_question_name")
1✔
312
    data_ids = get_monitoring_data_ids(qs, is_latest)
1✔
313

314
    if date_qname:
1!
315
        results = MVAnswerDenormalized.objects.filter(
1✔
316
            data_id__in=data_ids,
317
            question_name=date_qname,
318
            answer_name__isnull=False,
319
        ).annotate(
320
            day=Substr("answer_name", 1, 10),
321
        ).values("day").annotate(
322
            count=Count("data_id", distinct=True),
323
        ).order_by("day")
324
        data = [
1✔
325
            {
326
                "value": r["count"],
327
                "label": r["day"],
328
                "group": r["day"],
329
            }
330
            for r in results
331
        ]
332
    else:
333
        results = MVAnswerDenormalized.objects.filter(
×
334
            data_id__in=data_ids,
335
        ).annotate(
336
            day=TruncDate("data_created"),
337
        ).values("day").annotate(
338
            count=Count("data_id", distinct=True),
339
        ).order_by("day")
340
        data = [
×
341
            {
342
                "value": r["count"],
343
                "label": format_date_group(r["day"]),
344
                "group": format_date_group(r["day"]),
345
            }
346
            for r in results
347
        ]
348
    return _finalize_date(data, params)
1✔
349

350

351
# -- Option question handler --
352

353
def handle_option_question(form, question, params):
1✔
354
    """Handle option/multiple_option questions."""
355
    form_id = form.id
1✔
356
    group_by = params.get("group_by")
1✔
357
    option_value = params.get("option_value")
1✔
358
    sum_by = params.get("sum_by")
1✔
359
    value_type = params.get("value_type", "number")
1✔
360
    stack_by = params.get("stack_by")
1✔
361

362
    qs, is_latest, _ = get_base_monitoring_qs(
1✔
363
        form, form_id, params
364
    )
365
    data_ids = get_monitoring_data_ids(qs, is_latest)
1✔
366

367
    options = QuestionOptions.objects.filter(
1✔
368
        question=question,
369
    ).order_by("order")
370

371
    if option_value and group_by == "month":
1✔
372
        return _option_value_group_by_month(
1✔
373
            question, data_ids, option_value, sum_by, params
374
        )
375

376
    if option_value:
1✔
377
        return _option_value_filter(
1✔
378
            question, data_ids, qs, is_latest,
379
            option_value, sum_by, value_type,
380
            include_unanswered=params.get(
381
                "include_unanswered", False
382
            ),
383
            form=form,
384
            params=params,
385
            include_empty=params.get("include_empty", False),
386
        )
387

388
    if stack_by == "option" and group_by:
1✔
389
        return handle_stack_by_option(
1✔
390
            question, options, data_ids,
391
            qs, is_latest, params
392
        )
393

394
    if group_by == "option":
1!
395
        restricted = _extract_criteria_option_values(
1✔
396
            params, question.name
397
        )
398
        return _option_group_by_option(
1✔
399
            question, options, data_ids, qs,
400
            is_latest, value_type, restricted,
401
            include_unanswered=params.get(
402
                "include_unanswered", False
403
            ),
404
            form=form,
405
            params=params,
406
        )
407

408
    return [], []
×
409

410

411
def _option_value_filter(
1✔
412
    question, data_ids, qs, is_latest,
413
    option_value, sum_by, value_type,
414
    include_unanswered=False, form=None, params=None,
415
    include_empty=False,
416
):
417
    """Filter by specific option value and count.
418

419
    include_unanswered=True: parents with no answer for the question
420
    (monitored but null options) are added to the count.
421

422
    include_empty=True: parents with zero monitoring submissions
423
    (never visited) are added to the count. Takes precedence over
424
    include_unanswered when both are set, as the coverage-gap count
425
    already subsumes the answer-gap count.
426
    """
427
    count = MVAnswerDenormalized.objects.filter(
1✔
428
        data_id__in=data_ids,
429
        question_name=question.name,
430
        answer_options__contains=[option_value],
431
    )
432
    if sum_by == "parent_id":
1✔
433
        count = count.values(
1✔
434
            "parent_id"
435
        ).distinct().count()
436
    else:
437
        count = count.count()
1✔
438

439
    is_monitoring = form is not None and form.parent is not None
1✔
440
    extra = 0
1✔
441

442
    if include_empty and is_monitoring:
1✔
443
        monitored_parent_ids = set(
1✔
444
            FormData.objects.filter(id__in=data_ids)
445
            .values_list("parent_id", flat=True)
446
            .distinct()
447
        )
448
        extra = _count_no_info_parents(
1✔
449
            form, params or {}, monitored_parent_ids
450
        )
451
    elif include_unanswered and is_monitoring:
1✔
452
        all_answered_ids = set(
1✔
453
            MVAnswerDenormalized.objects.filter(
454
                data_id__in=data_ids,
455
                question_name=question.name,
456
                answer_options__isnull=False,
457
            ).values_list("parent_id", flat=True).distinct()
458
        )
459
        extra = _count_no_info_parents(
1✔
460
            form, params or {}, all_answered_ids
461
        )
462

463
    if value_type == "percentage":
1✔
464
        if (include_empty or include_unanswered) and is_monitoring:
1✔
465
            total = _total_parents_in_scope(form, params or {})
1✔
466
            numerator = count + extra
1✔
467
        else:
468
            total = qs.count() if is_latest else len(data_ids)
1✔
469
            numerator = count
1✔
470
        value = round(
1✔
471
            (numerator / total * 100), 2
472
        ) if total > 0 else 0
473
    else:
474
        value = count + extra
1✔
475

476
    return (
1✔
477
        [{"value": value, "label": option_value}],
478
        [option_value],
479
    )
480

481

482
def _option_value_group_by_month(
1✔
483
    question, data_ids, option_value, sum_by, params
484
):
485
    """Filter by option_value, then bucket by month.
486

487
    Used by charts like "Proposed completion date": filter to
488
    incomplete projects (option_value='no') and bucket the count
489
    by a date question (e.g. project deadline). When `sum_by` is
490
    `parent_id`, counts distinct parents per month.
491
    """
492
    date_qname = params.get("date_question_name")
1✔
493

494
    matching_ids = list(MVAnswerDenormalized.objects.filter(
1✔
495
        data_id__in=data_ids,
496
        question_name=question.name,
497
        answer_options__contains=[option_value],
498
    ).values_list("data_id", flat=True))
499

500
    if not matching_ids:
1✔
501
        data = []
1✔
502
    elif date_qname:
1!
503
        answer_qs = MVAnswerDenormalized.objects.filter(
1✔
504
            data_id__in=matching_ids,
505
            question_name=date_qname,
506
            answer_name__isnull=False,
507
        )
508
        if sum_by == "parent_id":
1!
509
            answer_qs = answer_qs.annotate(
1✔
510
                year_month=Substr("answer_name", 1, 7),
511
            ).values("year_month").annotate(
512
                count=Count("parent_id", distinct=True),
513
            ).order_by("year_month")
514
        else:
515
            answer_qs = answer_qs.annotate(
×
516
                year_month=Substr("answer_name", 1, 7),
517
            ).values("year_month").annotate(
518
                count=Count("data_id", distinct=True),
519
            ).order_by("year_month")
520
        data = [
1✔
521
            {
522
                "value": r["count"],
523
                "label": format_month_label(
524
                    r["year_month"]
525
                ),
526
                "group": r["year_month"],
527
            }
528
            for r in answer_qs
529
        ]
530
    else:
531
        mv_qs = MVAnswerDenormalized.objects.filter(
×
532
            data_id__in=matching_ids,
533
        ).annotate(
534
            month=TruncMonth("data_created"),
535
        ).values("month")
536
        if sum_by == "parent_id":
×
537
            mv_qs = mv_qs.annotate(
×
538
                count=Count("parent_id", distinct=True),
539
            ).order_by("month")
540
        else:
541
            mv_qs = mv_qs.annotate(
×
542
                count=Count("data_id", distinct=True),
543
            ).order_by("month")
544
        data = [
×
545
            {
546
                "value": r["count"],
547
                "label": format_month_label(r["month"]),
548
                "group": format_month_group(r["month"]),
549
            }
550
            for r in mv_qs
551
        ]
552

553
    return _finalize_month(data, params)
1✔
554

555

556
def _extract_criteria_option_values(params, question_name):
1✔
557
    """Extract option values that criteria restricts for a given qid.
558

559
    When criteria includes option_equals/option_contains/option_in
560
    targeting the same question_id as the donut chart, the tally
561
    should only count those specific values — not every value in
562
    a multiple_option answer array. Returns None if no restriction.
563
    """
564
    all_criteria = list(params.get("criteria") or [])
1✔
565
    all_criteria.extend(params.get("parent_criteria") or [])
1✔
566
    values = set()
1✔
567
    for c in all_criteria:
1✔
568
        ctype = c["type"]
1✔
569
        parts = c["parts"]
1✔
570
        if parts[0] != question_name:
1!
571
            continue
1✔
572
        if ctype in ("option_equals", "option_contains"):
×
573
            values.add(parts[1])
×
574
        elif ctype == "option_in":
×
575
            values.update(parts[1])
×
576
    return values or None
1✔
577

578

579
def _option_group_by_option(
1✔
580
    question, options, data_ids, qs,
581
    is_latest, value_type, restricted_values=None,
582
    include_unanswered=False, form=None, params=None,
583
):
584
    """Group by option values (donut chart).
585

586
    Returns a row for every defined option — including zero-count
587
    options — so pie/doughnut charts have stable legends and colors
588
    across refreshes and filter changes.
589

590
    When `restricted_values` is set (from a criteria filter on the
591
    same question), only those values are tallied — so a
592
    multiple_option record ["a", "b"] filtered by "a" counts only
593
    for "a", not "b".
594

595
    When `include_unanswered=True`, appends one synthetic row
596
    (group="_no_info") for parents with no qualifying answer,
597
    and adjusts the percentage denominator to include the bucket
598
    so single-choice rows sum to 100%.
599
    """
600
    option_values = {o.value for o in options}
1✔
601
    tally_values = (
1✔
602
        option_values & restricted_values
603
        if restricted_values else option_values
604
    )
605
    tallies = defaultdict(int)
1✔
606
    qualifying_parents = set()
1✔
607
    # Registration forms have no parent; track data_id directly.
608
    # Monitoring forms track data__parent_id (the registration ID).
609
    is_registration = form is not None and form.parent is None
1✔
610
    tracking_field = (
1✔
611
        "data_id" if is_registration else "parent_id"
612
    )
613
    for tracking_id, opts in MVAnswerDenormalized.objects.filter(
1✔
614
        data_id__in=data_ids,
615
        question_name=question.name,
616
        answer_options__isnull=False,
617
    ).values_list(tracking_field, "answer_options"):
618
        matched = False
1✔
619
        for v in (opts or []):
1✔
620
            if v in tally_values:
1!
621
                tallies[v] += 1
1✔
622
                matched = True
1✔
623
        if matched:
1!
624
            qualifying_parents.add(tracking_id)
1✔
625

626
    counts = [tallies.get(opt.value, 0) for opt in options]
1✔
627

628
    bucket_count = (
1✔
629
        _count_no_info_parents(form, params, qualifying_parents)
630
        if include_unanswered else 0
631
    )
632

633
    if value_type == "percentage":
1✔
634
        if include_unanswered:
1✔
635
            denom = len(qualifying_parents) + bucket_count
1✔
636
        else:
637
            denom = sum(counts)
1✔
638
    else:
639
        denom = None
1✔
640

641
    data = []
1✔
642
    for opt, count in zip(options, counts):
1✔
643
        val = (
1✔
644
            round((count / denom * 100), 2)
645
            if value_type == "percentage" and denom else count
646
        )
647
        data.append({
1✔
648
            "value": val,
649
            "label": opt.label,
650
            "group": opt.value,
651
            "color": opt.color,
652
        })
653

654
    if include_unanswered and bucket_count > 0:
1✔
655
        bucket_val = (
1✔
656
            round((bucket_count / denom * 100), 2)
657
            if value_type == "percentage" and denom else bucket_count
658
        )
659
        data.append({
1✔
660
            "value": bucket_val,
661
            "label": "No information available",
662
            "group": "_no_info",
663
            "color": "#bfbfbf",
664
        })
665

666
    labels = [d["label"] for d in data]
1✔
667
    return data, labels
1✔
668

669

670
# -- Number question handler --
671

672
def handle_number_question(form, question, params):
1✔
673
    """Handle number questions."""
674
    form_id = form.id
1✔
675
    group_by = params.get("group_by")
1✔
676
    repeat_agg = params.get("repeat_agg", "average")
1✔
677
    value_type = params.get("value_type", "number")
1✔
678
    stack_by = params.get("stack_by")
1✔
679

680
    qs, is_latest, _ = get_base_monitoring_qs(
1✔
681
        form, form_id, params
682
    )
683
    data_ids = get_monitoring_data_ids(qs, is_latest)
1✔
684
    agg_func = AGG_FUNCS.get(repeat_agg, Avg)
1✔
685

686
    if stack_by == "parent_id":
1✔
687
        return handle_stack_by_parent(
1✔
688
            question, qs, is_latest,
689
            data_ids, params
690
        )
691

692
    if group_by == "parent_id":
1✔
693
        return _number_group_by_parent(
1✔
694
            question, data_ids, agg_func, value_type
695
        )
696

697
    if group_by == "date":
1✔
698
        return _number_group_by_date(
1✔
699
            question, data_ids, params
700
        )
701

702
    if group_by == "month":
1✔
703
        return _number_group_by_month(
1✔
704
            question, data_ids, agg_func, value_type, params
705
        )
706

707
    result = MVAnswerDenormalized.objects.filter(
1✔
708
        data_id__in=data_ids,
709
        question_name=question.name,
710
        answer_value__isnull=False,
711
    ).aggregate(agg_value=agg_func("answer_value"))
712

713
    value = (
1✔
714
        round(result["agg_value"], 2)
715
        if result["agg_value"] else 0
716
    )
717
    return [{"value": value, "label": "Total"}], ["Total"]
1✔
718

719

720
def _number_group_by_parent(
1✔
721
    question, data_ids, agg_func, value_type
722
):
723
    """Number question grouped by parent_id."""
724
    agg_rows = list(
1✔
725
        MVAnswerDenormalized.objects.filter(
726
            data_id__in=data_ids,
727
            question_name=question.name,
728
            answer_value__isnull=False,
729
        ).values("parent_id").annotate(
730
            agg_value=agg_func("answer_value"),
731
        )
732
    )
733

734
    # Fetch parent names from mv_latest_monitoring (parent_name is
735
    # pre-joined from the registration FormData)
736
    parent_ids = [r["parent_id"] for r in agg_rows if r["parent_id"]]
1✔
737
    name_map = dict(
1✔
738
        MVLatestMonitoring.objects.filter(parent_id__in=parent_ids)
739
        .values_list("parent_id", "parent_name")
740
        .distinct()
741
    )
742

743
    data = sorted(
1✔
744
        [
745
            {
746
                "value": round(r["agg_value"], 2),
747
                "label": name_map.get(r["parent_id"], ""),
748
                "group": str(r["parent_id"]),
749
            }
750
            for r in agg_rows
751
            if r["parent_id"]
752
        ],
753
        key=lambda x: x["label"],
754
    )
755

756
    if value_type == "percentage":
1✔
757
        total = sum(d["value"] for d in data)
1✔
758
        if total > 0:
1!
759
            for d in data:
1✔
760
                d["value"] = round(
1✔
761
                    d["value"] / total * 100, 2
762
                )
763

764
    labels = [d["label"] for d in data]
1✔
765
    return data, labels
1✔
766

767

768
def _number_group_by_date(question, data_ids, params):
1✔
769
    """Number question grouped by date."""
770
    repeat_agg = params.get("repeat_agg", "average")
1✔
771
    agg_func = AGG_FUNCS.get(repeat_agg, Avg)
1✔
772
    date_qname = params.get("date_question_name")
1✔
773

774
    if date_qname:
1✔
775
        data = []
1✔
776
        for data_id in data_ids:
1✔
777
            date_answer = MVAnswerDenormalized.objects.filter(
1✔
778
                data_id=data_id,
779
                question_name=date_qname,
780
            ).first()
781
            if not date_answer or not date_answer.answer_name:
1!
782
                continue
×
783
            val_result = MVAnswerDenormalized.objects.filter(
1✔
784
                data_id=data_id,
785
                question_name=question.name,
786
                answer_value__isnull=False,
787
            ).aggregate(agg_value=agg_func("answer_value"))
788
            if val_result["agg_value"] is not None:
1!
789
                date_str = format_date_group(
1✔
790
                    date_answer.answer_name
791
                )
792
                data.append({
1✔
793
                    "value": round(
794
                        val_result["agg_value"], 2
795
                    ),
796
                    "label": date_str,
797
                    "group": date_str,
798
                })
799
    else:
800
        results = MVAnswerDenormalized.objects.filter(
1✔
801
            data_id__in=data_ids,
802
            question_name=question.name,
803
            answer_value__isnull=False,
804
        ).annotate(
805
            date=TruncDate("data_created"),
806
        ).values("date").annotate(
807
            agg_value=agg_func("answer_value"),
808
        ).order_by("date")
809
        data = [
1✔
810
            {
811
                "value": round(r["agg_value"], 2),
812
                "label": format_date_group(r["date"]),
813
                "group": format_date_group(r["date"]),
814
            }
815
            for r in results
816
        ]
817

818
    data.sort(key=lambda x: x["group"])
1✔
819
    return _finalize_date(data, params)
1✔
820

821

822
def _number_group_by_month(
1✔
823
    question, data_ids, agg_func, value_type, params
824
):
825
    """Number question grouped by month.
826

827
    When date_question_id is provided, bucket by the month of that
828
    date answer (via a Subquery) instead of FormData.created so the
829
    x-axis aligns with the filter's date dimension.
830
    """
831
    date_qname = params.get("date_question_name")
1✔
832

833
    base = MVAnswerDenormalized.objects.filter(
1✔
834
        data_id__in=data_ids,
835
        question_name=question.name,
836
        answer_value__isnull=False,
837
    )
838

839
    if date_qname:
1!
840
        date_sq = _date_answer_sq(date_qname)
×
841
        results = base.annotate(
×
842
            date_name=Subquery(date_sq),
843
        ).filter(
844
            date_name__isnull=False,
845
        ).annotate(
846
            month_key=Substr("date_name", 1, 7),
847
        ).values("month_key").annotate(
848
            agg_value=agg_func("answer_value"),
849
        ).order_by("month_key")
850
        data = [
×
851
            {
852
                "value": round(r["agg_value"], 2),
853
                "label": format_month_label(r["month_key"]),
854
                "group": r["month_key"],
855
            }
856
            for r in results if r["agg_value"] is not None
857
        ]
858
    else:
859
        results = base.annotate(
1✔
860
            month=TruncMonth("data_created"),
861
        ).values("month").annotate(
862
            agg_value=agg_func("answer_value"),
863
        ).order_by("month")
864
        data = [
1✔
865
            {
866
                "value": round(r["agg_value"], 2),
867
                "label": format_month_label(r["month"]),
868
                "group": format_month_group(r["month"]),
869
            }
870
            for r in results
871
        ]
872

873
    if value_type == "percentage":
1✔
874
        total = sum(d["value"] for d in data)
1✔
875
        if total > 0:
1!
876
            for d in data:
1✔
877
                d["value"] = round(
1✔
878
                    d["value"] / total * 100, 2
879
                )
880

881
    return _finalize_month(data, params)
1✔
882

883

884
# -- Stack handlers --
885

886
def handle_stack_by_option(
1✔
887
    question, options, data_ids,
888
    qs, is_latest, params
889
):
890
    """Handle stack_by=option: stacked bar charts."""
891
    group_by = params.get("group_by")
1✔
892
    value_type = params.get("value_type", "number")
1✔
893

894
    opt_labels = [o.label for o in options]
1✔
895
    opt_colors = [o.color for o in options]
1✔
896

897
    if group_by == "month":
1✔
898
        return _stack_option_by_month(
1✔
899
            question, options, data_ids,
900
            opt_labels, opt_colors, value_type, params
901
        )
902

903
    if group_by == "parent_id":
1!
904
        return _stack_option_by_parent(
1✔
905
            question, options, data_ids,
906
            qs, is_latest, opt_labels, opt_colors
907
        )
908

909
    return {
×
910
        "data": [], "labels": [],
911
        "stack_labels": [], "colors": [],
912
    }
913

914

915
def _stack_option_by_month(
1✔
916
    question, options, data_ids,
917
    opt_labels, opt_colors, value_type, params
918
):
919
    """Stack by option, grouped by month.
920

921
    Fetches answers once and buckets in Python — O(N) instead of
922
    O(months × options) queries. Honors date_question_id when
923
    provided so the month bucket aligns with the filter dimension.
924
    """
925
    date_qname = params.get("date_question_name")
1✔
926
    option_values = {o.value for o in options}
1✔
927

928
    base = MVAnswerDenormalized.objects.filter(
1✔
929
        data_id__in=data_ids,
930
        question_name=question.name,
931
    )
932

933
    if date_qname:
1!
934
        date_sq = _date_answer_sq(date_qname)
×
935
        rows = base.annotate(
×
936
            date_name=Subquery(date_sq),
937
        ).filter(
938
            date_name__isnull=False,
939
        ).annotate(
940
            month_key=Substr("date_name", 1, 7),
941
        ).values("month_key", "answer_options")
942
        get_key = lambda r: r["month_key"]  # noqa: E731
×
943
        get_label = lambda k: format_month_label(k)  # noqa: E731
×
944
    else:
945
        rows = base.annotate(
1✔
946
            month=TruncMonth("data_created"),
947
        ).values("month", "answer_options")
948
        get_key = lambda r: format_month_group(r["month"])  # noqa: E731
1✔
949
        get_label = lambda k: format_month_label(k)  # noqa: E731
1✔
950

951
    buckets = defaultdict(lambda: defaultdict(int))
1✔
952
    for r in rows:
1✔
953
        key = get_key(r)
1✔
954
        if not key:
1!
955
            continue
×
956
        for v in (r["answer_options"] or []):
1✔
957
            if v in option_values:
1!
958
                buckets[key][v] += 1
1✔
959

960
    data = []
1✔
961
    for key in sorted(buckets.keys()):
1✔
962
        row = {"group": key, "label": get_label(key)}
1✔
963
        total_in_month = 0
1✔
964
        for opt in options:
1✔
965
            count = buckets[key].get(opt.value, 0)
1✔
966
            row[opt.label] = count
1✔
967
            total_in_month += count
1✔
968
        if value_type == "percentage" and total_in_month > 0:
1✔
969
            for opt in options:
1✔
970
                row[opt.label] = round(
1✔
971
                    row[opt.label] / total_in_month * 100, 2,
972
                )
973
        data.append(row)
1✔
974

975
    labels = [d["label"] for d in data]
1✔
976
    return {
1✔
977
        "data": data,
978
        "labels": labels,
979
        "stack_labels": opt_labels,
980
        "colors": opt_colors,
981
    }
982

983

984
def _stack_option_by_parent_from_mv(
1✔
985
    agg_data, parent_ids, qs, options, opt_labels, opt_colors
986
):
987
    """Build stack data from mv_parent_aggregates.
988

989
    Single query — O(1) instead of O(P × M) queries.
990
    """
991
    parent_options = {
×
992
        row['parent_id']: row['option_values'] or []
993
        for row in agg_data
994
    }
995
    parent_names = {p.id: p.name for p in qs.only('id', 'name')}
×
996

997
    data = []
×
998
    for parent_id in parent_ids:
×
999
        opts = parent_options.get(parent_id, [])
×
1000
        row = {"label": parent_names.get(parent_id, ""), "group": parent_id}
×
1001
        for opt in options:
×
1002
            row[opt.label] = opts.count(opt.value)
×
1003
        data.append(row)
×
1004

1005
    return {
×
1006
        "data": data,
1007
        "labels": [d["label"] for d in data],
1008
        "stack_labels": opt_labels,
1009
        "colors": opt_colors,
1010
    }
1011

1012

1013
def _stack_option_by_parent_legacy(
1✔
1014
    question, options, data_ids,
1015
    qs, is_latest, opt_labels, opt_colors
1016
):
1017
    """Original _stack_option_by_parent implementation.
1018

1019
    Used as fallback when MV is not available or empty.
1020

1021
    Handles three data shapes:
1022
      - is_latest=True: qs rows are parent FormData with a `latest_id`
1023
        annotation pointing to each parent's most-recent monitoring
1024
        submission. Answer counts are read from that single submission.
1025
      - is_latest=False, monitoring-form query: data_ids reference
1026
        monitoring submissions; parents are derived via their parent_id.
1027
        Answer counts aggregate all matching submissions per parent.
1028
      - is_latest=False, REGISTRATION-form query (akvo-mis-9d8): data_ids
1029
        ARE registration submissions themselves (parent__isnull=True).
1030
        Parents = qs directly; p_data_ids = [parent.id].
1031
    """
1032
    is_registration_form = False
1✔
1033
    if is_latest:
1!
1034
        parents = qs  # FormData qs with .latest_id and .name
×
1035
    else:
1036
        parent_ids = list(
1✔
1037
            MVAnswerDenormalized.objects.filter(
1038
                data_id__in=data_ids,
1039
                parent_id__isnull=False,
1040
            ).values_list("parent_id", flat=True).distinct()
1041
        )
1042
        if parent_ids:
1✔
1043
            name_map = dict(
1✔
1044
                MVLatestMonitoring.objects.filter(
1045
                    parent_id__in=parent_ids
1046
                ).values_list("parent_id", "parent_name").distinct()
1047
            )
1048
            parents = [
1✔
1049
                {"id": pid, "name": name_map.get(pid, "")}
1050
                for pid in parent_ids
1051
            ]
1052
        else:
1053
            parents = qs  # registration-form path
1✔
1054
            is_registration_form = True
1✔
1055

1056
    data = []
1✔
1057
    for parent in parents:
1✔
1058
        if is_latest:
1!
1059
            p_data_ids = [parent.latest_id]
×
1060
            p_name = parent.name
×
1061
            parent_id_val = parent.id
×
1062
        elif is_registration_form:
1✔
1063
            p_data_ids = [parent.id]
1✔
1064
            p_name = parent.name
1✔
1065
            parent_id_val = parent.id
1✔
1066
        else:
1067
            # parent is a dict {"id": ..., "name": ...}
1068
            p_data_ids = list(
1✔
1069
                MVAnswerDenormalized.objects.filter(
1070
                    data_id__in=data_ids,
1071
                    parent_id=parent["id"],
1072
                ).values_list("data_id", flat=True).distinct()
1073
            )
1074
            p_name = parent["name"]
1✔
1075
            parent_id_val = parent["id"]
1✔
1076

1077
        row = {"label": p_name, "group": parent_id_val}
1✔
1078
        for opt in options:
1✔
1079
            count = MVAnswerDenormalized.objects.filter(
1✔
1080
                data_id__in=p_data_ids,
1081
                question_name=question.name,
1082
                answer_options__contains=[opt.value],
1083
            ).count()
1084
            row[opt.label] = count
1✔
1085
        data.append(row)
1✔
1086

1087
    labels = [d["label"] for d in data]
1✔
1088
    return {
1✔
1089
        "data": data,
1090
        "labels": labels,
1091
        "stack_labels": opt_labels,
1092
        "colors": opt_colors,
1093
    }
1094

1095

1096
def _stack_option_by_parent(
1✔
1097
    question, options, data_ids,
1098
    qs, is_latest, opt_labels, opt_colors
1099
):
1100
    """Stack by option, grouped by parent_id.
1101

1102
    OPTIMIZED: Uses mv_parent_aggregates when is_latest=True to replace
1103
    the N+1 query pattern (P parents × M options) with a single MV lookup.
1104
    Falls back to _stack_option_by_parent_legacy when MV has no data or
1105
    when is_latest=False (all-submissions path, not covered by the MV).
1106
    """
1107
    if is_latest and data_ids:
1!
1108
        first_data = (
×
1109
            MVAnswerDenormalized.objects
1110
            .filter(data_id__in=data_ids[:1])
1111
            .values('form_id')
1112
            .first()
1113
        )
1114
        if first_data:
×
1115
            form_id = first_data['form_id']
×
1116
            parent_ids = list(qs.values_list('id', flat=True))
×
1117
            agg_data = list(
×
1118
                MVParentAggregates.objects.filter(
1119
                    form_id=form_id,
1120
                    question_name=question.name,
1121
                    parent_id__in=parent_ids,
1122
                ).values('parent_id', 'option_values')
1123
            )
1124
            if agg_data:
×
1125
                return _stack_option_by_parent_from_mv(
×
1126
                    agg_data, parent_ids, qs, options, opt_labels, opt_colors
1127
                )
1128

1129
    return _stack_option_by_parent_legacy(
1✔
1130
        question, options, data_ids, qs, is_latest, opt_labels, opt_colors
1131
    )
1132

1133

1134
def handle_stack_by_parent(
1✔
1135
    question, qs, is_latest, data_ids, params
1136
):
1137
    """Handle stack_by=parent_id: multi-line charts."""
1138
    group_by = params.get("group_by")
1✔
1139
    repeat_agg = params.get("repeat_agg", "average")
1✔
1140
    agg_func = AGG_FUNCS.get(repeat_agg, Avg)
1✔
1141

1142
    if is_latest:
1!
1143
        parents = list(
×
1144
            qs.values("id", "name", "latest_id")
1145
        )
1146
    else:
1147
        parent_id_list = list(
1✔
1148
            MVAnswerDenormalized.objects.filter(
1149
                data_id__in=data_ids,
1150
                parent_id__isnull=False,
1151
            ).values_list("parent_id", flat=True).distinct()
1152
        )
1153
        name_map = dict(
1✔
1154
            MVLatestMonitoring.objects.filter(
1155
                parent_id__in=parent_id_list
1156
            ).values_list("parent_id", "parent_name").distinct()
1157
        )
1158
        parents = [
1✔
1159
            {
1160
                "id": pid,
1161
                "name": name_map.get(pid, ""),
1162
                "data_ids": list(
1163
                    MVAnswerDenormalized.objects.filter(
1164
                        data_id__in=data_ids,
1165
                        parent_id=pid,
1166
                    ).values_list("data_id", flat=True).distinct()
1167
                ),
1168
            }
1169
            for pid in parent_id_list
1170
        ]
1171

1172
    parent_names = [p["name"] for p in parents]
1✔
1173

1174
    if group_by == "date":
1✔
1175
        return _stack_parent_by_date(
1✔
1176
            question, parents, is_latest,
1177
            parent_names, agg_func, params
1178
        )
1179

1180
    if group_by == "month":
1!
1181
        return _stack_parent_by_month(
1✔
1182
            question, parents, is_latest,
1183
            parent_names, agg_func, params
1184
        )
1185

1186
    return {"data": [], "labels": [], "stack_labels": []}
×
1187

1188

1189
def _stack_parent_by_date(
1✔
1190
    question, parents, is_latest,
1191
    parent_names, agg_func, params
1192
):
1193
    """Stack by parent_id, grouped by date.
1194

1195
    Prefetches date keys and aggregated values per data_id in two
1196
    bulk queries instead of N+1 per-point queries.
1197
    """
1198
    date_qname = params.get("date_question_name")
1✔
1199

1200
    all_data_ids = []
1✔
1201
    for p in parents:
1✔
1202
        if is_latest:
1!
1203
            all_data_ids.append(p["latest_id"])
×
1204
        else:
1205
            all_data_ids.extend(p["data_ids"])
1✔
1206

1207
    if date_qname:
1!
1208
        date_rows = MVAnswerDenormalized.objects.filter(
×
1209
            data_id__in=all_data_ids,
1210
            question_name=date_qname,
1211
            answer_name__isnull=False,
1212
        ).values("data_id", "answer_name")
1213
        date_map = {
×
1214
            r["data_id"]: format_date_group(r["answer_name"])
1215
            for r in date_rows
1216
        }
1217
    else:
1218
        mv_rows = MVAnswerDenormalized.objects.filter(
1✔
1219
            data_id__in=all_data_ids,
1220
        ).values("data_id", "data_created").distinct()
1221
        date_map = {
1✔
1222
            r["data_id"]: format_date_group(r["data_created"])
1223
            for r in mv_rows
1224
        }
1225

1226
    val_rows = MVAnswerDenormalized.objects.filter(
1✔
1227
        data_id__in=all_data_ids,
1228
        question_name=question.name,
1229
        answer_value__isnull=False,
1230
    ).values("data_id").annotate(
1231
        agg_value=agg_func("answer_value"),
1232
    )
1233
    val_map = {
1✔
1234
        r["data_id"]: r["agg_value"]
1235
        for r in val_rows
1236
        if r["agg_value"] is not None
1237
    }
1238

1239
    all_rows = {}
1✔
1240
    for p in parents:
1✔
1241
        p_ids = (
1✔
1242
            [p["latest_id"]] if is_latest
1243
            else p["data_ids"]
1244
        )
1245
        for data_id in p_ids:
1✔
1246
            date_key = date_map.get(data_id)
1✔
1247
            agg_val = val_map.get(data_id)
1✔
1248
            if not date_key or agg_val is None:
1!
1249
                continue
×
1250
            if date_key not in all_rows:
1!
1251
                all_rows[date_key] = {"date": date_key}
1✔
1252
            all_rows[date_key][p["name"]] = round(agg_val, 2)
1✔
1253

1254
    data = [all_rows[k] for k in sorted(all_rows.keys())]
1✔
1255
    labels = sorted(all_rows.keys())
1✔
1256
    return {
1✔
1257
        "data": data,
1258
        "labels": labels,
1259
        "stack_labels": parent_names,
1260
    }
1261

1262

1263
def _stack_parent_by_month(
1✔
1264
    question, parents, is_latest,
1265
    parent_names, agg_func, params
1266
):
1267
    """Stack by parent_id, grouped by month.
1268

1269
    When date_question_id is provided, buckets by the month of that
1270
    date answer (via Subquery) instead of FormData.created.
1271
    """
1272
    date_qname = params.get("date_question_name")
1✔
1273
    all_rows = {}
1✔
1274

1275
    for p in parents:
1✔
1276
        p_ids = (
1✔
1277
            [p["latest_id"]] if is_latest
1278
            else p["data_ids"]
1279
        )
1280

1281
        base = MVAnswerDenormalized.objects.filter(
1✔
1282
            data_id__in=p_ids,
1283
            question_name=question.name,
1284
            answer_value__isnull=False,
1285
        )
1286

1287
        if date_qname:
1!
1288
            date_sq = _date_answer_sq(date_qname)
×
1289
            results = base.annotate(
×
1290
                date_name=Subquery(date_sq),
1291
            ).filter(
1292
                date_name__isnull=False,
1293
            ).annotate(
1294
                month_key=Substr("date_name", 1, 7),
1295
            ).values("month_key").annotate(
1296
                agg_value=agg_func("answer_value"),
1297
            ).order_by("month_key")
1298
            for r in results:
×
1299
                if r["agg_value"] is None:
×
1300
                    continue
×
1301
                month_key = r["month_key"]
×
1302
                if month_key not in all_rows:
×
1303
                    all_rows[month_key] = {
×
1304
                        "month": format_month_label(month_key),
1305
                    }
1306
                all_rows[month_key][p["name"]] = round(
×
1307
                    r["agg_value"], 2,
1308
                )
1309
        else:
1310
            results = base.annotate(
1✔
1311
                month=TruncMonth("data_created"),
1312
            ).values("month").annotate(
1313
                agg_value=agg_func("answer_value"),
1314
            ).order_by("month")
1315
            for r in results:
1✔
1316
                month_key = format_month_group(r["month"])
1✔
1317
                if month_key not in all_rows:
1✔
1318
                    all_rows[month_key] = {
1✔
1319
                        "month": format_month_label(
1320
                            r["month"]
1321
                        ),
1322
                    }
1323
                all_rows[month_key][p["name"]] = round(
1✔
1324
                    r["agg_value"], 2,
1325
                )
1326

1327
    data = [all_rows[k] for k in sorted(all_rows.keys())]
1✔
1328
    labels = [d["month"] for d in data]
1✔
1329
    return {
1✔
1330
        "data": data,
1331
        "labels": labels,
1332
        "stack_labels": parent_names,
1333
    }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc