• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / akvo-mis / #605

01 May 2026 05:40PM UTC coverage: 88.067% (+0.03%) from 88.036%
#605

push

coveralls-python

web-flow
Visualization feedback (#206)

* [#199] feat(ChartRenderer): enhance chart option

Tooltip handling, horizontal orientation and data formatting for pie charts

* [#199] feat(visualizations): update chart configurations with tooltips and reorder items

* [#199]  Add new chart_type: metric_card to simplify the propotion logic

Co-authored-by: Copilot <copilot@github.com>

* [#199] feat(ChartRenderer): enhance chart option

Tooltip handling, horizontal orientation and data formatting for pie charts

* [#199] feat(visualizations): update chart configurations with tooltips and reorder items

* [#199]  Add new chart_type: metric_card to simplify the propotion logic

Co-authored-by: Copilot <copilot@github.com>

* [#199] feat(visualization): add _no_info bucket and api-driven kpi_stack

- Backend: add opt-in `include_unanswered=True` param to /visualization/values
  that appends a synthetic `_no_info` row for parents with no monitoring answer
- Backend: 10 new tests covering percentage math, admin filter, soft-delete,
  multi-choice denominator, and edge cases
- Frontend: map `_no_info` group → "No information available" label in
  ChartRenderer and DashboardMap legend
- Frontend: DashboardMap legend strip with colored dots per status_colors key
- Frontend: ChartRenderer kpi_stack now supports api-driven mode (no segments)
  where a single group_by=option call's rows become stack segments dynamically
- Config: enable include_unanswered on EPS/RWS operational status charts

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [#199] docs: add new param: include_unanswered in visualization values
API

* [#199] test(visualization): add FR-11 share_card denominator test

Proves that include_unanswered=true causes the _no_info row to appear
in group_by=option responses, making sum(all row values) equal total
registrations — the correct denominator for MetricCard share KPIs.

🤖 Generated with ... (continued)

4861 of 5670 branches covered (85.73%)

Branch coverage included in aggregate %.

9434 of 10562 relevant lines covered (89.32%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.38
backend/api/v1/v1_visualization/values_functions.py
1
from collections import defaultdict
1✔
2

3
from django.db.models import Count, Avg, F, OuterRef, Subquery
1✔
4
from django.db.models.functions import TruncMonth, Substr
1✔
5

6
from api.v1.v1_data.models import FormData, Answers
1✔
7
from api.v1.v1_forms.models import QuestionOptions
1✔
8
from api.v1.v1_visualization.constants import AGG_FUNCS
1✔
9
from api.v1.v1_visualization.functions import (
1✔
10
    get_base_monitoring_qs,
11
    get_monitoring_data_ids,
12
    format_month_label,
13
    format_month_group,
14
    format_date_group,
15
    fill_month_gaps,
16
    fill_date_gaps,
17
    apply_administration_filter,
18
    apply_parent_criteria_to_qs,
19
)
20

21

22
def _should_fill_gaps(params):
1✔
23
    """Only gap-fill when both from_date and to_date are provided."""
24
    return bool(
1✔
25
        params.get("from_date") and params.get("to_date")
26
    )
27

28

29
def _total_parents_in_scope(form, params):
1✔
30
    """Count all parent registrations in scope, respecting filters."""
31
    scope_form = form.parent if form.parent else form
1✔
32
    qs = FormData.objects.filter(
1✔
33
        form=scope_form,
34
        parent__isnull=True,
35
        is_pending=False,
36
        is_draft=False,
37
    )
38
    administration_id = params.get("administration_id")
1✔
39
    if administration_id:
1!
40
        qs = apply_administration_filter(qs, administration_id)
1✔
41
    qs = apply_parent_criteria_to_qs(
1✔
42
        qs, True, params.get("parent_criteria"),
43
    )
44
    return qs.count()
1✔
45

46

47
def _count_no_info_parents(form, params, qualifying_ids):
1✔
48
    """Count datapoints in scope with no qualifying answer.
49

50
    For monitoring forms: counts parent registrations without any
51
    qualifying monitoring submission (gap in monitoring coverage).
52
    For registration forms: counts registrations that exist but have no
53
    answer for the question (field left blank / skipped).
54

55
    Respects administration_id and parent_criteria so the count
56
    reconciles with option counts under filtering (FR-3).
57
    """
58
    total = _total_parents_in_scope(form, params)
1✔
59
    return max(0, total - len(qualifying_ids))
1✔
60

61

62
# -- Count mode handler --
63

64
def handle_count_mode(form, params):
1✔
65
    """Handle count mode (no question_id)."""
66
    form_id = form.id
1✔
67
    monitoring = params.get("monitoring", "latest")
1✔
68
    group_by = params.get("group_by")
1✔
69
    value_type = params.get("value_type", "number")
1✔
70
    sum_by = params.get("sum_by")
1✔
71
    is_monitoring = form.parent is not None
1✔
72

73
    if is_monitoring and monitoring == "latest" \
1✔
74
            and sum_by == "parent_id":
75
        qs, is_latest, _ = get_base_monitoring_qs(
1✔
76
            form, form_id, params
77
        )
78
        count = qs.count()
1✔
79
        if value_type == "percentage":
1✔
80
            total = FormData.objects.filter(
1✔
81
                form=form.parent,
82
                parent__isnull=True,
83
                is_pending=False,
84
                is_draft=False,
85
            ).count()
86
            value = round(
1✔
87
                (count / total * 100), 2
88
            ) if total > 0 else 0
89
        else:
90
            value = count
1✔
91
        return (
1✔
92
            [{"value": value, "label": "Total"}],
93
            ["Total"],
94
        )
95

96
    qs, is_latest, _ = get_base_monitoring_qs(
1✔
97
        form, form_id, params
98
    )
99

100
    if not group_by:
1✔
101
        count = qs.count()
1✔
102
        if value_type == "percentage" and is_monitoring:
1✔
103
            total = FormData.objects.filter(
1✔
104
                form=form.parent,
105
                parent__isnull=True,
106
                is_pending=False,
107
                is_draft=False,
108
            ).count()
109
            value = round(
1✔
110
                (count / total * 100), 2
111
            ) if total > 0 else 0
112
        else:
113
            value = count
1✔
114
        return (
1✔
115
            [{"value": value, "label": "Total"}],
116
            ["Total"],
117
        )
118

119
    if group_by == "month":
1✔
120
        return _count_group_by_month(qs, is_latest, params)
1✔
121

122
    if group_by == "parent_id":
1✔
123
        return _count_group_by_parent(qs, is_latest)
1✔
124

125
    if group_by == "id":
1✔
126
        return _count_group_by_id(qs, is_latest)
1✔
127

128
    if group_by == "date":
1!
129
        return _count_group_by_date(qs, is_latest, params)
1✔
130

131
    return [{"value": 0, "label": "Total"}], ["Total"]
×
132

133

134
def _count_group_by_month(qs, is_latest, params):
1✔
135
    """Count grouped by month."""
136
    date_qid = params.get("date_question_id")
1✔
137

138
    if is_latest:
1✔
139
        data_ids = get_monitoring_data_ids(qs, is_latest)
1✔
140
        if date_qid:
1✔
141
            answer_qs = Answers.objects.filter(
1✔
142
                data_id__in=data_ids,
143
                question_id=date_qid,
144
                name__isnull=False,
145
            )
146
            results = answer_qs.annotate(
1✔
147
                year_month=Substr("name", 1, 7),
148
            ).values("year_month").annotate(
149
                count=Count("data_id", distinct=True),
150
            ).order_by("year_month")
151
            data = [
1✔
152
                {
153
                    "value": r["count"],
154
                    "label": format_month_label(
155
                        r["year_month"]
156
                    ),
157
                    "group": r["year_month"],
158
                }
159
                for r in results
160
            ]
161
        else:
162
            results = FormData.objects.filter(
1✔
163
                id__in=data_ids,
164
            ).annotate(
165
                month=TruncMonth("created"),
166
            ).values("month").annotate(
167
                count=Count("id"),
168
            ).order_by("month")
169
            data = [
1✔
170
                {
171
                    "value": r["count"],
172
                    "label": format_month_label(r["month"]),
173
                    "group": format_month_group(r["month"]),
174
                }
175
                for r in results
176
            ]
177
    else:
178
        if date_qid:
1✔
179
            answer_qs = Answers.objects.filter(
1✔
180
                data__in=qs,
181
                question_id=date_qid,
182
                name__isnull=False,
183
            )
184
            results = answer_qs.annotate(
1✔
185
                year_month=Substr("name", 1, 7),
186
            ).values("year_month").annotate(
187
                count=Count("data_id", distinct=True),
188
            ).order_by("year_month")
189
            data = [
1✔
190
                {
191
                    "value": r["count"],
192
                    "label": format_month_label(
193
                        r["year_month"]
194
                    ),
195
                    "group": r["year_month"],
196
                }
197
                for r in results
198
            ]
199
        else:
200
            results = qs.annotate(
1✔
201
                month=TruncMonth("created"),
202
            ).values("month").annotate(
203
                count=Count("id"),
204
            ).order_by("month")
205
            data = [
1✔
206
                {
207
                    "value": r["count"],
208
                    "label": format_month_label(r["month"]),
209
                    "group": format_month_group(r["month"]),
210
                }
211
                for r in results
212
            ]
213

214
    if _should_fill_gaps(params):
1✔
215
        data = fill_month_gaps(
1✔
216
            data, params["from_date"], params["to_date"]
217
        )
218
    labels = [d["label"] for d in data]
1✔
219
    return data, labels
1✔
220

221

222
def _count_group_by_parent(qs, is_latest):
1✔
223
    """Count grouped by parent_id."""
224
    if is_latest:
1✔
225
        data = [
1✔
226
            {
227
                "value": 1,
228
                "label": p.name,
229
                "group": str(p.id),
230
            }
231
            for p in qs.only("id", "name")
232
        ]
233
    else:
234
        results = qs.filter(
1✔
235
            parent__isnull=False,
236
        ).values(
237
            "parent_id",
238
            parent_name=F("parent__name"),
239
        ).annotate(
240
            count=Count("id"),
241
        ).order_by("parent_name")
242
        data = [
1✔
243
            {
244
                "value": r["count"],
245
                "label": r["parent_name"],
246
                "group": str(r["parent_id"]),
247
            }
248
            for r in results
249
        ]
250
    labels = [d["label"] for d in data]
1✔
251
    return data, labels
1✔
252

253

254
def _count_group_by_id(qs, is_latest):
1✔
255
    """Count grouped by individual record id (value=1 per row)."""
256
    if is_latest:
1✔
257
        data = [
1✔
258
            {
259
                "value": 1,
260
                "label": p.name,
261
                "group": str(p.latest_id),
262
            }
263
            for p in qs.only("id", "name")
264
        ]
265
    else:
266
        data = [
1✔
267
            {
268
                "value": 1,
269
                "label": r.name,
270
                "group": str(r.id),
271
            }
272
            for r in qs.only("id", "name").order_by("id")
273
        ]
274
    labels = [d["label"] for d in data]
1✔
275
    return data, labels
1✔
276

277

278
def _count_group_by_date(qs, is_latest, params):
1✔
279
    """Count grouped by individual date (not month bucket)."""
280
    date_qid = params.get("date_question_id")
1✔
281
    data_ids = get_monitoring_data_ids(qs, is_latest)
1✔
282

283
    if date_qid:
1!
284
        results = Answers.objects.filter(
1✔
285
            data_id__in=data_ids,
286
            question_id=date_qid,
287
            name__isnull=False,
288
        ).annotate(
289
            day=Substr("name", 1, 10),
290
        ).values("day").annotate(
291
            count=Count("data_id", distinct=True),
292
        ).order_by("day")
293
        data = [
1✔
294
            {
295
                "value": r["count"],
296
                "label": r["day"],
297
                "group": r["day"],
298
            }
299
            for r in results
300
        ]
301
    else:
302
        results = FormData.objects.filter(
×
303
            id__in=data_ids,
304
        ).values(
305
            day=F("created__date"),
306
        ).annotate(
307
            count=Count("id"),
308
        ).order_by("day")
309
        data = [
×
310
            {
311
                "value": r["count"],
312
                "label": format_date_group(r["day"]),
313
                "group": format_date_group(r["day"]),
314
            }
315
            for r in results
316
        ]
317
    if _should_fill_gaps(params):
1✔
318
        data = fill_date_gaps(
1✔
319
            data, params["from_date"], params["to_date"]
320
        )
321
    labels = [d["label"] for d in data]
1✔
322
    return data, labels
1✔
323

324

325
# -- Option question handler --
326

327
def handle_option_question(form, question, params):
1✔
328
    """Handle option/multiple_option questions."""
329
    form_id = form.id
1✔
330
    group_by = params.get("group_by")
1✔
331
    option_value = params.get("option_value")
1✔
332
    sum_by = params.get("sum_by")
1✔
333
    value_type = params.get("value_type", "number")
1✔
334
    stack_by = params.get("stack_by")
1✔
335

336
    qs, is_latest, _ = get_base_monitoring_qs(
1✔
337
        form, form_id, params
338
    )
339
    data_ids = get_monitoring_data_ids(qs, is_latest)
1✔
340

341
    options = QuestionOptions.objects.filter(
1✔
342
        question=question,
343
    ).order_by("order")
344

345
    if option_value and group_by == "month":
1✔
346
        return _option_value_group_by_month(
1✔
347
            question, data_ids, option_value, sum_by, params
348
        )
349

350
    if option_value:
1✔
351
        return _option_value_filter(
1✔
352
            question, data_ids, qs, is_latest,
353
            option_value, sum_by, value_type,
354
            include_unanswered=params.get(
355
                "include_unanswered", False
356
            ),
357
            form=form,
358
            params=params,
359
        )
360

361
    if stack_by == "option" and group_by:
1✔
362
        return handle_stack_by_option(
1✔
363
            question, options, data_ids,
364
            qs, is_latest, params
365
        )
366

367
    if group_by == "option":
1!
368
        restricted = _extract_criteria_option_values(
1✔
369
            params, question.id
370
        )
371
        return _option_group_by_option(
1✔
372
            question, options, data_ids, qs,
373
            is_latest, value_type, restricted,
374
            include_unanswered=params.get(
375
                "include_unanswered", False
376
            ),
377
            form=form,
378
            params=params,
379
        )
380

381
    return [], []
×
382

383

384
def _option_value_filter(
1✔
385
    question, data_ids, qs, is_latest,
386
    option_value, sum_by, value_type,
387
    include_unanswered=False, form=None, params=None,
388
):
389
    """Filter by specific option value and count."""
390
    count = Answers.objects.filter(
1✔
391
        data_id__in=data_ids,
392
        question_id=question.id,
393
        options__contains=[option_value],
394
    )
395
    if sum_by == "parent_id":
1✔
396
        count = count.values(
1✔
397
            "data__parent_id"
398
        ).distinct().count()
399
    else:
400
        count = count.count()
1✔
401

402
    if value_type == "percentage":
1✔
403
        if include_unanswered and form is not None:
1✔
404
            total = _total_parents_in_scope(form, params or {})
1✔
405
        else:
406
            total = qs.count() if is_latest else len(data_ids)
1✔
407
        value = round(
1✔
408
            (count / total * 100), 2
409
        ) if total > 0 else 0
410
    else:
411
        value = count
1✔
412

413
    return (
1✔
414
        [{"value": value, "label": option_value}],
415
        [option_value],
416
    )
417

418

419
def _option_value_group_by_month(
1✔
420
    question, data_ids, option_value, sum_by, params
421
):
422
    """Filter by option_value, then bucket by month.
423

424
    Used by charts like "Proposed completion date": filter to
425
    incomplete projects (option_value='no') and bucket the count
426
    by a date question (e.g. project deadline). When `sum_by` is
427
    `parent_id`, counts distinct parents per month.
428
    """
429
    date_qid = params.get("date_question_id")
1✔
430

431
    matching_ids = list(Answers.objects.filter(
1✔
432
        data_id__in=data_ids,
433
        question_id=question.id,
434
        options__contains=[option_value],
435
    ).values_list("data_id", flat=True))
436

437
    if not matching_ids:
1✔
438
        data = []
1✔
439
    elif date_qid:
1!
440
        answer_qs = Answers.objects.filter(
1✔
441
            data_id__in=matching_ids,
442
            question_id=date_qid,
443
            name__isnull=False,
444
        )
445
        if sum_by == "parent_id":
1!
446
            answer_qs = answer_qs.annotate(
1✔
447
                year_month=Substr("name", 1, 7),
448
            ).values("year_month").annotate(
449
                count=Count(
450
                    "data__parent_id", distinct=True
451
                ),
452
            ).order_by("year_month")
453
        else:
454
            answer_qs = answer_qs.annotate(
×
455
                year_month=Substr("name", 1, 7),
456
            ).values("year_month").annotate(
457
                count=Count("data_id", distinct=True),
458
            ).order_by("year_month")
459
        data = [
1✔
460
            {
461
                "value": r["count"],
462
                "label": format_month_label(
463
                    r["year_month"]
464
                ),
465
                "group": r["year_month"],
466
            }
467
            for r in answer_qs
468
        ]
469
    else:
470
        fd_qs = FormData.objects.filter(
×
471
            id__in=matching_ids,
472
        ).annotate(
473
            month=TruncMonth("created"),
474
        ).values("month")
475
        if sum_by == "parent_id":
×
476
            fd_qs = fd_qs.annotate(
×
477
                count=Count("parent_id", distinct=True),
478
            ).order_by("month")
479
        else:
480
            fd_qs = fd_qs.annotate(
×
481
                count=Count("id"),
482
            ).order_by("month")
483
        data = [
×
484
            {
485
                "value": r["count"],
486
                "label": format_month_label(r["month"]),
487
                "group": format_month_group(r["month"]),
488
            }
489
            for r in fd_qs
490
        ]
491

492
    if _should_fill_gaps(params):
1✔
493
        data = fill_month_gaps(
1✔
494
            data, params["from_date"], params["to_date"]
495
        )
496
    labels = [d["label"] for d in data]
1✔
497
    return data, labels
1✔
498

499

500
def _extract_criteria_option_values(params, question_id):
1✔
501
    """Extract option values that criteria restricts for a given qid.
502

503
    When criteria includes option_equals/option_contains/option_in
504
    targeting the same question_id as the donut chart, the tally
505
    should only count those specific values — not every value in
506
    a multiple_option answer array. Returns None if no restriction.
507
    """
508
    all_criteria = list(params.get("criteria") or [])
1✔
509
    all_criteria.extend(params.get("parent_criteria") or [])
1✔
510
    values = set()
1✔
511
    for c in all_criteria:
1✔
512
        ctype = c["type"]
1✔
513
        parts = c["parts"]
1✔
514
        if parts[0] != question_id:
1!
515
            continue
1✔
516
        if ctype in ("option_equals", "option_contains"):
×
517
            values.add(parts[1])
×
518
        elif ctype == "option_in":
×
519
            values.update(parts[1])
×
520
    return values or None
1✔
521

522

523
def _option_group_by_option(
1✔
524
    question, options, data_ids, qs,
525
    is_latest, value_type, restricted_values=None,
526
    include_unanswered=False, form=None, params=None,
527
):
528
    """Group by option values (donut chart).
529

530
    Returns a row for every defined option — including zero-count
531
    options — so pie/doughnut charts have stable legends and colors
532
    across refreshes and filter changes.
533

534
    When `restricted_values` is set (from a criteria filter on the
535
    same question), only those values are tallied — so a
536
    multiple_option record ["a", "b"] filtered by "a" counts only
537
    for "a", not "b".
538

539
    When `include_unanswered=True`, appends one synthetic row
540
    (group="_no_info") for parents with no qualifying answer,
541
    and adjusts the percentage denominator to include the bucket
542
    so single-choice rows sum to 100%.
543
    """
544
    option_values = {o.value for o in options}
1✔
545
    tally_values = (
1✔
546
        option_values & restricted_values
547
        if restricted_values else option_values
548
    )
549
    tallies = defaultdict(int)
1✔
550
    qualifying_parents = set()
1✔
551
    # Registration forms have no parent; track data_id directly.
552
    # Monitoring forms track data__parent_id (the registration ID).
553
    is_registration = form is not None and form.parent is None
1✔
554
    tracking_field = (
1✔
555
        "data_id" if is_registration else "data__parent_id"
556
    )
557
    for tracking_id, opts in Answers.objects.filter(
1✔
558
        data_id__in=data_ids,
559
        question_id=question.id,
560
        options__isnull=False,
561
    ).values_list(tracking_field, "options"):
562
        matched = False
1✔
563
        for v in (opts or []):
1✔
564
            if v in tally_values:
1!
565
                tallies[v] += 1
1✔
566
                matched = True
1✔
567
        if matched:
1!
568
            qualifying_parents.add(tracking_id)
1✔
569

570
    counts = [tallies.get(opt.value, 0) for opt in options]
1✔
571

572
    bucket_count = (
1✔
573
        _count_no_info_parents(form, params, qualifying_parents)
574
        if include_unanswered else 0
575
    )
576

577
    if value_type == "percentage":
1✔
578
        if include_unanswered:
1✔
579
            denom = len(qualifying_parents) + bucket_count
1✔
580
        else:
581
            denom = sum(counts)
1✔
582
    else:
583
        denom = None
1✔
584

585
    data = []
1✔
586
    for opt, count in zip(options, counts):
1✔
587
        val = (
1✔
588
            round((count / denom * 100), 2)
589
            if value_type == "percentage" and denom else count
590
        )
591
        data.append({
1✔
592
            "value": val,
593
            "label": opt.label,
594
            "group": opt.value,
595
            "color": opt.color,
596
        })
597

598
    if include_unanswered and bucket_count > 0:
1✔
599
        bucket_val = (
1✔
600
            round((bucket_count / denom * 100), 2)
601
            if value_type == "percentage" and denom else bucket_count
602
        )
603
        data.append({
1✔
604
            "value": bucket_val,
605
            "label": "No information available",
606
            "group": "_no_info",
607
            "color": "#bfbfbf",
608
        })
609

610
    labels = [d["label"] for d in data]
1✔
611
    return data, labels
1✔
612

613

614
# -- Number question handler --
615

616
def handle_number_question(form, question, params):
1✔
617
    """Handle number questions."""
618
    form_id = form.id
1✔
619
    group_by = params.get("group_by")
1✔
620
    repeat_agg = params.get("repeat_agg", "average")
1✔
621
    value_type = params.get("value_type", "number")
1✔
622
    stack_by = params.get("stack_by")
1✔
623

624
    qs, is_latest, _ = get_base_monitoring_qs(
1✔
625
        form, form_id, params
626
    )
627
    data_ids = get_monitoring_data_ids(qs, is_latest)
1✔
628
    agg_func = AGG_FUNCS.get(repeat_agg, Avg)
1✔
629

630
    if stack_by == "parent_id":
1✔
631
        return handle_stack_by_parent(
1✔
632
            question, qs, is_latest,
633
            data_ids, params
634
        )
635

636
    if group_by == "parent_id":
1✔
637
        return _number_group_by_parent(
1✔
638
            question, data_ids, agg_func, value_type
639
        )
640

641
    if group_by == "date":
1✔
642
        return _number_group_by_date(
1✔
643
            question, data_ids, params
644
        )
645

646
    if group_by == "month":
1✔
647
        return _number_group_by_month(
1✔
648
            question, data_ids, agg_func, value_type, params
649
        )
650

651
    result = Answers.objects.filter(
1✔
652
        data_id__in=data_ids,
653
        question_id=question.id,
654
        value__isnull=False,
655
    ).aggregate(agg_value=agg_func("value"))
656

657
    value = (
1✔
658
        round(result["agg_value"], 2)
659
        if result["agg_value"] else 0
660
    )
661
    return [{"value": value, "label": "Total"}], ["Total"]
1✔
662

663

664
def _number_group_by_parent(
1✔
665
    question, data_ids, agg_func, value_type
666
):
667
    """Number question grouped by parent_id."""
668
    results = Answers.objects.filter(
1✔
669
        data_id__in=data_ids,
670
        question_id=question.id,
671
        value__isnull=False,
672
    ).values(
673
        parent_name=F("data__parent__name"),
674
        parent_id=F("data__parent_id"),
675
    ).annotate(
676
        agg_value=agg_func("value"),
677
    ).order_by("parent_name")
678

679
    data = [
1✔
680
        {
681
            "value": round(r["agg_value"], 2),
682
            "label": r["parent_name"],
683
            "group": str(r["parent_id"]),
684
        }
685
        for r in results
686
    ]
687

688
    if value_type == "percentage":
1✔
689
        total = sum(d["value"] for d in data)
1✔
690
        if total > 0:
1!
691
            for d in data:
1✔
692
                d["value"] = round(
1✔
693
                    d["value"] / total * 100, 2
694
                )
695

696
    labels = [d["label"] for d in data]
1✔
697
    return data, labels
1✔
698

699

700
def _number_group_by_date(question, data_ids, params):
1✔
701
    """Number question grouped by date."""
702
    repeat_agg = params.get("repeat_agg", "average")
1✔
703
    agg_func = AGG_FUNCS.get(repeat_agg, Avg)
1✔
704
    date_qid = params.get("date_question_id")
1✔
705

706
    if date_qid:
1✔
707
        data = []
1✔
708
        for data_id in data_ids:
1✔
709
            date_answer = Answers.objects.filter(
1✔
710
                data_id=data_id,
711
                question_id=date_qid,
712
            ).first()
713
            if not date_answer or not date_answer.name:
1!
714
                continue
×
715
            val_result = Answers.objects.filter(
1✔
716
                data_id=data_id,
717
                question_id=question.id,
718
                value__isnull=False,
719
            ).aggregate(agg_value=agg_func("value"))
720
            if val_result["agg_value"] is not None:
1!
721
                date_str = format_date_group(
1✔
722
                    date_answer.name
723
                )
724
                data.append({
1✔
725
                    "value": round(
726
                        val_result["agg_value"], 2
727
                    ),
728
                    "label": date_str,
729
                    "group": date_str,
730
                })
731
    else:
732
        results = Answers.objects.filter(
1✔
733
            data_id__in=data_ids,
734
            question_id=question.id,
735
            value__isnull=False,
736
        ).values(
737
            date=F("data__created__date"),
738
        ).annotate(
739
            agg_value=agg_func("value"),
740
        ).order_by("date")
741
        data = [
1✔
742
            {
743
                "value": round(r["agg_value"], 2),
744
                "label": format_date_group(r["date"]),
745
                "group": format_date_group(r["date"]),
746
            }
747
            for r in results
748
        ]
749

750
    data.sort(key=lambda x: x["group"])
1✔
751
    if _should_fill_gaps(params):
1✔
752
        data = fill_date_gaps(
1✔
753
            data, params["from_date"], params["to_date"]
754
        )
755
    labels = [d["label"] for d in data]
1✔
756
    return data, labels
1✔
757

758

759
def _number_group_by_month(
1✔
760
    question, data_ids, agg_func, value_type, params
761
):
762
    """Number question grouped by month.
763

764
    When date_question_id is provided, bucket by the month of that
765
    date answer (via a Subquery) instead of FormData.created so the
766
    x-axis aligns with the filter's date dimension.
767
    """
768
    date_qid = params.get("date_question_id")
1✔
769

770
    base = Answers.objects.filter(
1✔
771
        data_id__in=data_ids,
772
        question_id=question.id,
773
        value__isnull=False,
774
    )
775

776
    if date_qid:
1!
777
        date_sq = Answers.objects.filter(
×
778
            data_id=OuterRef("data_id"),
779
            question_id=date_qid,
780
            name__isnull=False,
781
        ).values("name")[:1]
782
        results = base.annotate(
×
783
            date_name=Subquery(date_sq),
784
        ).filter(
785
            date_name__isnull=False,
786
        ).annotate(
787
            month_key=Substr("date_name", 1, 7),
788
        ).values("month_key").annotate(
789
            agg_value=agg_func("value"),
790
        ).order_by("month_key")
791
        data = [
×
792
            {
793
                "value": round(r["agg_value"], 2),
794
                "label": format_month_label(r["month_key"]),
795
                "group": r["month_key"],
796
            }
797
            for r in results if r["agg_value"] is not None
798
        ]
799
    else:
800
        results = base.annotate(
1✔
801
            month=TruncMonth("data__created"),
802
        ).values("month").annotate(
803
            agg_value=agg_func("value"),
804
        ).order_by("month")
805
        data = [
1✔
806
            {
807
                "value": round(r["agg_value"], 2),
808
                "label": format_month_label(r["month"]),
809
                "group": format_month_group(r["month"]),
810
            }
811
            for r in results
812
        ]
813

814
    if value_type == "percentage":
1✔
815
        total = sum(d["value"] for d in data)
1✔
816
        if total > 0:
1!
817
            for d in data:
1✔
818
                d["value"] = round(
1✔
819
                    d["value"] / total * 100, 2
820
                )
821

822
    if _should_fill_gaps(params):
1✔
823
        data = fill_month_gaps(
1✔
824
            data, params["from_date"], params["to_date"]
825
        )
826

827
    labels = [d["label"] for d in data]
1✔
828
    return data, labels
1✔
829

830

831
# -- Stack handlers --
832

833
def handle_stack_by_option(
1✔
834
    question, options, data_ids,
835
    qs, is_latest, params
836
):
837
    """Handle stack_by=option: stacked bar charts."""
838
    group_by = params.get("group_by")
1✔
839
    value_type = params.get("value_type", "number")
1✔
840

841
    opt_labels = [o.label for o in options]
1✔
842
    opt_colors = [o.color for o in options]
1✔
843

844
    if group_by == "month":
1✔
845
        return _stack_option_by_month(
1✔
846
            question, options, data_ids,
847
            opt_labels, opt_colors, value_type, params
848
        )
849

850
    if group_by == "parent_id":
1!
851
        return _stack_option_by_parent(
1✔
852
            question, options, data_ids,
853
            qs, is_latest, opt_labels, opt_colors
854
        )
855

856
    return {
×
857
        "data": [], "labels": [],
858
        "stack_labels": [], "colors": [],
859
    }
860

861

862
def _stack_option_by_month(
1✔
863
    question, options, data_ids,
864
    opt_labels, opt_colors, value_type, params
865
):
866
    """Stack by option, grouped by month.
867

868
    Fetches answers once and buckets in Python — O(N) instead of
869
    O(months × options) queries. Honors date_question_id when
870
    provided so the month bucket aligns with the filter dimension.
871
    """
872
    date_qid = params.get("date_question_id")
1✔
873
    option_values = {o.value for o in options}
1✔
874

875
    base = Answers.objects.filter(
1✔
876
        data_id__in=data_ids,
877
        question_id=question.id,
878
    )
879

880
    if date_qid:
1!
881
        date_sq = Answers.objects.filter(
×
882
            data_id=OuterRef("data_id"),
883
            question_id=date_qid,
884
            name__isnull=False,
885
        ).values("name")[:1]
886
        rows = base.annotate(
×
887
            date_name=Subquery(date_sq),
888
        ).filter(
889
            date_name__isnull=False,
890
        ).annotate(
891
            month_key=Substr("date_name", 1, 7),
892
        ).values("month_key", "options")
893
        get_key = lambda r: r["month_key"]  # noqa: E731
×
894
        get_label = lambda k: format_month_label(k)  # noqa: E731
×
895
    else:
896
        rows = base.annotate(
1✔
897
            month=TruncMonth("data__created"),
898
        ).values("month", "options")
899
        get_key = lambda r: format_month_group(r["month"])  # noqa: E731
1✔
900
        get_label = lambda k: format_month_label(k)  # noqa: E731
1✔
901

902
    buckets = defaultdict(lambda: defaultdict(int))
1✔
903
    for r in rows:
1✔
904
        key = get_key(r)
1✔
905
        if not key:
1!
906
            continue
×
907
        for v in (r["options"] or []):
1✔
908
            if v in option_values:
1!
909
                buckets[key][v] += 1
1✔
910

911
    data = []
1✔
912
    for key in sorted(buckets.keys()):
1✔
913
        row = {"group": key, "label": get_label(key)}
1✔
914
        total_in_month = 0
1✔
915
        for opt in options:
1✔
916
            count = buckets[key].get(opt.value, 0)
1✔
917
            row[opt.label] = count
1✔
918
            total_in_month += count
1✔
919
        if value_type == "percentage" and total_in_month > 0:
1✔
920
            for opt in options:
1✔
921
                row[opt.label] = round(
1✔
922
                    row[opt.label] / total_in_month * 100, 2,
923
                )
924
        data.append(row)
1✔
925

926
    labels = [d["label"] for d in data]
1✔
927
    return {
1✔
928
        "data": data,
929
        "labels": labels,
930
        "stack_labels": opt_labels,
931
        "colors": opt_colors,
932
    }
933

934

935
def _stack_option_by_parent(
1✔
936
    question, options, data_ids,
937
    qs, is_latest, opt_labels, opt_colors
938
):
939
    """Stack by option, grouped by parent_id.
940

941
    Handles three data shapes:
942
      - is_latest=True: qs rows are parent FormData with a `latest_id`
943
        annotation pointing to each parent's most-recent monitoring
944
        submission. Answer counts are read from that single submission.
945
      - is_latest=False, monitoring-form query: data_ids reference
946
        monitoring submissions; parents are derived via their parent_id.
947
        Answer counts aggregate all matching submissions per parent.
948
      - is_latest=False, REGISTRATION-form query (akvo-mis-9d8): data_ids
949
        ARE registration submissions themselves (parent__isnull=True).
950
        Parents = qs directly; p_data_ids = [parent.id].
951
    """
952
    # Distinguish monitoring vs registration by probing for a parent_id.
953
    is_registration_form = False
1✔
954
    if is_latest:
1!
955
        parents = qs
×
956
    else:
957
        parent_ids = list(FormData.objects.filter(
1✔
958
            id__in=data_ids,
959
            parent__isnull=False,
960
        ).values_list("parent_id", flat=True).distinct())
961
        if parent_ids:
1✔
962
            parents = FormData.objects.filter(id__in=parent_ids)
1✔
963
        else:
964
            # Registration-form path: qs IS the list of registrations.
965
            parents = qs
1✔
966
            is_registration_form = True
1✔
967

968
    data = []
1✔
969
    for parent in parents:
1✔
970
        if is_latest:
1!
971
            p_data_ids = [parent.latest_id]
×
972
            p_name = parent.name
×
973
        elif is_registration_form:
1✔
974
            p_data_ids = [parent.id]
1✔
975
            p_name = parent.name
1✔
976
        else:
977
            p_data_ids = list(FormData.objects.filter(
1✔
978
                id__in=data_ids,
979
                parent_id=parent.id,
980
            ).values_list("id", flat=True))
981
            p_name = parent.name
1✔
982

983
        row = {"label": p_name, "group": parent.id}
1✔
984
        for opt in options:
1✔
985
            count = Answers.objects.filter(
1✔
986
                data_id__in=p_data_ids,
987
                question_id=question.id,
988
                options__contains=[opt.value],
989
            ).count()
990
            row[opt.label] = count
1✔
991
        data.append(row)
1✔
992

993
    labels = [d["label"] for d in data]
1✔
994
    return {
1✔
995
        "data": data,
996
        "labels": labels,
997
        "stack_labels": opt_labels,
998
        "colors": opt_colors,
999
    }
1000

1001

1002
def handle_stack_by_parent(
1✔
1003
    question, qs, is_latest, data_ids, params
1004
):
1005
    """Handle stack_by=parent_id: multi-line charts."""
1006
    group_by = params.get("group_by")
1✔
1007
    repeat_agg = params.get("repeat_agg", "average")
1✔
1008
    agg_func = AGG_FUNCS.get(repeat_agg, Avg)
1✔
1009

1010
    if is_latest:
1!
1011
        parents = list(
×
1012
            qs.values("id", "name", "latest_id")
1013
        )
1014
    else:
1015
        parent_ids = FormData.objects.filter(
1✔
1016
            id__in=data_ids,
1017
            parent__isnull=False,
1018
        ).values_list(
1019
            "parent_id", flat=True
1020
        ).distinct()
1021
        parent_data = FormData.objects.filter(
1✔
1022
            id__in=parent_ids,
1023
        ).values("id", "name")
1024
        parents = [
1✔
1025
            {
1026
                "id": p["id"],
1027
                "name": p["name"],
1028
                "data_ids": list(
1029
                    FormData.objects.filter(
1030
                        id__in=data_ids,
1031
                        parent_id=p["id"],
1032
                    ).values_list("id", flat=True)
1033
                ),
1034
            }
1035
            for p in parent_data
1036
        ]
1037

1038
    parent_names = [p["name"] for p in parents]
1✔
1039

1040
    if group_by == "date":
1✔
1041
        return _stack_parent_by_date(
1✔
1042
            question, parents, is_latest,
1043
            parent_names, agg_func, params
1044
        )
1045

1046
    if group_by == "month":
1!
1047
        return _stack_parent_by_month(
1✔
1048
            question, parents, is_latest,
1049
            parent_names, agg_func, params
1050
        )
1051

1052
    return {"data": [], "labels": [], "stack_labels": []}
×
1053

1054

1055
def _stack_parent_by_date(
1✔
1056
    question, parents, is_latest,
1057
    parent_names, agg_func, params
1058
):
1059
    """Stack by parent_id, grouped by date.
1060

1061
    Prefetches date keys and aggregated values per data_id in two
1062
    bulk queries instead of N+1 per-point queries.
1063
    """
1064
    date_qid = params.get("date_question_id")
1✔
1065

1066
    all_data_ids = []
1✔
1067
    for p in parents:
1✔
1068
        if is_latest:
1!
1069
            all_data_ids.append(p["latest_id"])
×
1070
        else:
1071
            all_data_ids.extend(p["data_ids"])
1✔
1072

1073
    if date_qid:
1!
1074
        date_rows = Answers.objects.filter(
×
1075
            data_id__in=all_data_ids,
1076
            question_id=date_qid,
1077
            name__isnull=False,
1078
        ).values("data_id", "name")
1079
        date_map = {
×
1080
            r["data_id"]: format_date_group(r["name"])
1081
            for r in date_rows
1082
        }
1083
    else:
1084
        fd_rows = FormData.objects.filter(
1✔
1085
            id__in=all_data_ids,
1086
        ).values("id", "created")
1087
        date_map = {
1✔
1088
            r["id"]: format_date_group(r["created"])
1089
            for r in fd_rows
1090
        }
1091

1092
    val_rows = Answers.objects.filter(
1✔
1093
        data_id__in=all_data_ids,
1094
        question_id=question.id,
1095
        value__isnull=False,
1096
    ).values("data_id").annotate(
1097
        agg_value=agg_func("value"),
1098
    )
1099
    val_map = {
1✔
1100
        r["data_id"]: r["agg_value"]
1101
        for r in val_rows
1102
        if r["agg_value"] is not None
1103
    }
1104

1105
    all_rows = {}
1✔
1106
    for p in parents:
1✔
1107
        p_ids = (
1✔
1108
            [p["latest_id"]] if is_latest
1109
            else p["data_ids"]
1110
        )
1111
        for data_id in p_ids:
1✔
1112
            date_key = date_map.get(data_id)
1✔
1113
            agg_val = val_map.get(data_id)
1✔
1114
            if not date_key or agg_val is None:
1!
1115
                continue
×
1116
            if date_key not in all_rows:
1!
1117
                all_rows[date_key] = {"date": date_key}
1✔
1118
            all_rows[date_key][p["name"]] = round(agg_val, 2)
1✔
1119

1120
    data = [all_rows[k] for k in sorted(all_rows.keys())]
1✔
1121
    labels = sorted(all_rows.keys())
1✔
1122
    return {
1✔
1123
        "data": data,
1124
        "labels": labels,
1125
        "stack_labels": parent_names,
1126
    }
1127

1128

1129
def _stack_parent_by_month(
1✔
1130
    question, parents, is_latest,
1131
    parent_names, agg_func, params
1132
):
1133
    """Stack by parent_id, grouped by month.
1134

1135
    When date_question_id is provided, buckets by the month of that
1136
    date answer (via Subquery) instead of FormData.created.
1137
    """
1138
    date_qid = params.get("date_question_id")
1✔
1139
    all_rows = {}
1✔
1140

1141
    for p in parents:
1✔
1142
        p_ids = (
1✔
1143
            [p["latest_id"]] if is_latest
1144
            else p["data_ids"]
1145
        )
1146

1147
        base = Answers.objects.filter(
1✔
1148
            data_id__in=p_ids,
1149
            question_id=question.id,
1150
            value__isnull=False,
1151
        )
1152

1153
        if date_qid:
1!
1154
            date_sq = Answers.objects.filter(
×
1155
                data_id=OuterRef("data_id"),
1156
                question_id=date_qid,
1157
                name__isnull=False,
1158
            ).values("name")[:1]
1159
            results = base.annotate(
×
1160
                date_name=Subquery(date_sq),
1161
            ).filter(
1162
                date_name__isnull=False,
1163
            ).annotate(
1164
                month_key=Substr("date_name", 1, 7),
1165
            ).values("month_key").annotate(
1166
                agg_value=agg_func("value"),
1167
            ).order_by("month_key")
1168
            for r in results:
×
1169
                if r["agg_value"] is None:
×
1170
                    continue
×
1171
                month_key = r["month_key"]
×
1172
                if month_key not in all_rows:
×
1173
                    all_rows[month_key] = {
×
1174
                        "month": format_month_label(month_key),
1175
                    }
1176
                all_rows[month_key][p["name"]] = round(
×
1177
                    r["agg_value"], 2,
1178
                )
1179
        else:
1180
            results = base.annotate(
1✔
1181
                month=TruncMonth("data__created"),
1182
            ).values("month").annotate(
1183
                agg_value=agg_func("value"),
1184
            ).order_by("month")
1185
            for r in results:
1✔
1186
                month_key = format_month_group(r["month"])
1✔
1187
                if month_key not in all_rows:
1✔
1188
                    all_rows[month_key] = {
1✔
1189
                        "month": format_month_label(
1190
                            r["month"]
1191
                        ),
1192
                    }
1193
                all_rows[month_key][p["name"]] = round(
1✔
1194
                    r["agg_value"], 2,
1195
                )
1196

1197
    data = [all_rows[k] for k in sorted(all_rows.keys())]
1✔
1198
    labels = [d["month"] for d in data]
1✔
1199
    return {
1✔
1200
        "data": data,
1201
        "labels": labels,
1202
        "stack_labels": parent_names,
1203
    }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc