• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / iwsims / #59

18 Jun 2026 07:20AM UTC coverage: 88.033% (-0.1%) from 88.13%
#59

push

coveralls-python

web-flow
Merge 5dfcb298b into a6f6761c9

5183 of 6053 branches covered (85.63%)

Branch coverage included in aggregate %.

9979 of 11170 relevant lines covered (89.34%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.47
backend/api/v1/v1_visualization/functions.py
1
import logging
1✔
2
from collections import defaultdict
1✔
3

4
from django.db import connection
1✔
5
from django.db.models import (
1✔
6
    Avg, Count, Q, Subquery, OuterRef,
7
)
8
from datetime import datetime as dt_datetime, timedelta, date
1✔
9
from django.utils import timezone
1✔
10
from rest_framework.exceptions import ValidationError
1✔
11

12
from api.v1.v1_data.models import FormData, Answers
1✔
13
from api.v1.v1_forms.models import Questions, QuestionOptions
1✔
14
from api.v1.v1_profile.models import Administration
1✔
15
from api.v1.v1_visualization.constants import MATERIALIZED_VIEWS
1✔
16
from api.v1.v1_visualization.models import (
1✔
17
    MVAnswerDenormalized,
18
    MVCrossFormLatest,
19
    MVLatestMonitoring,
20
)
21

22

23
logger = logging.getLogger(__name__)
1✔
24

25

26
def validate_qname(token):
1✔
27
    """Normalize a question token to a question_name string.
28

29
    Dashboard endpoints are question_name-only. A digits-only token is a
30
    legacy question_id and is rejected with a 400 so a stray id is never
31
    silently treated as a literal name that matches nothing.
32
    """
33
    if token is None:
1✔
34
        return None
1✔
35
    name = str(token)
1✔
36
    if name.isdigit():
1✔
37
        raise ValidationError(
1✔
38
            f"Expected a question_name, got a numeric id '{name}'. "
39
            "Dashboard endpoints are question_name-only."
40
        )
41
    return name
1✔
42

43

44
def refresh_materialized_data(views=None, concurrent=False):
1✔
45
    """Refresh materialized views.
46

47
    Args:
48
        views: List of view names to refresh. Defaults to all views.
49
        concurrent: Use REFRESH CONCURRENTLY (non-blocking, requires
50
                    unique index). Falls back to regular refresh on error.
51

52
    Note: Not wrapped in @transaction.atomic — REFRESH CONCURRENTLY
53
    cannot run inside a transaction. Django's default autocommit mode
54
    makes each cursor context an independent transaction.
55
    """
56
    views_to_refresh = views or MATERIALIZED_VIEWS
1✔
57

58
    for view in views_to_refresh:
1✔
59
        # REFRESH CONCURRENTLY cannot run inside a transaction block.
60
        # Django's TestCase wraps tests in a transaction, so downgrade
61
        # to a regular refresh when in_atomic_block is True.
62
        use_concurrent = concurrent and not connection.in_atomic_block
1✔
63

64
        if use_concurrent:
1✔
65
            try:
1✔
66
                with connection.cursor() as cursor:
1✔
67
                    cursor.execute(
1✔
68
                        f"REFRESH MATERIALIZED VIEW CONCURRENTLY {view};"
69
                    )
70
                logger.info(f"Refreshed materialized view: {view}")
1✔
71
            except Exception as e:
1✔
72
                logger.warning(
1✔
73
                    f"Concurrent refresh failed for {view}: {e} — "
74
                    f"falling back to regular refresh"
75
                )
76
                try:
1✔
77
                    with connection.cursor() as cursor:
1✔
78
                        cursor.execute(
1✔
79
                            f"REFRESH MATERIALIZED VIEW {view};"
80
                        )
81
                    logger.info(
1✔
82
                        f"Refreshed {view} (fallback to non-concurrent)"
83
                    )
84
                except Exception as e2:
×
85
                    logger.error(
×
86
                        f"Fallback refresh also failed for {view}: {e2}"
87
                    )
88
                    raise
×
89
        else:
90
            with connection.cursor() as cursor:
1✔
91
                cursor.execute(
1✔
92
                    f"REFRESH MATERIALIZED VIEW {view};"
93
                )
94
            logger.info(f"Refreshed materialized view: {view}")
1✔
95

96

97
# -- Shared helpers --
98

99
def apply_administration_filter(queryset, administration_id):
1✔
100
    """Filter queryset by administration hierarchy."""
101
    try:
1✔
102
        adm = Administration.objects.get(
1✔
103
            pk=administration_id
104
        )
105
    except Administration.DoesNotExist:
1✔
106
        return queryset.none()
1✔
107
    adm_path = (
1✔
108
        f"{adm.path}{adm.id}." if adm.path
109
        else f"{adm.id}."
110
    )
111
    return queryset.filter(
1✔
112
        Q(administration_id=administration_id)
113
        | Q(administration__path__startswith=adm_path)
114
    )
115

116

117
def apply_administration_filter_mv(
1✔
118
    qs, administration_id, field='parent_administration_id'
119
):
120
    """Filter MV queryset by administration hierarchy.
121

122
    Like apply_administration_filter but works with MV models that store
123
    administration IDs as plain integers rather than FK fields.
124

125
    Args:
126
        qs: QuerySet of an MV model
127
        administration_id: Target administration ID to filter on
128
        field: Name of the integer administration field to filter against
129
    """
130
    try:
1✔
131
        adm = Administration.objects.get(pk=administration_id)
1✔
132
    except Administration.DoesNotExist:
1✔
133
        return qs.none()
1✔
134

135
    adm_path = (
1✔
136
        f"{adm.path}{adm.id}." if adm.path
137
        else f"{adm.id}."
138
    )
139
    child_admin_ids = list(
1✔
140
        Administration.objects.filter(
141
            Q(pk=administration_id)
142
            | Q(path__startswith=adm_path)
143
        ).values_list('pk', flat=True)
144
    )
145
    return qs.filter(**{f'{field}__in': child_admin_ids})
1✔
146

147

148
def get_latest_monitoring_from_mv(
1✔
149
    form_id, administration_id=None, date_filters=None
150
):
151
    """Get latest monitoring rows using mv_latest_monitoring.
152

153
    Replaces the correlated subquery in latest_monitoring_subquery().
154
    Returns a QuerySet of MVLatestMonitoring rows.
155

156
    Args:
157
        form_id: Monitoring form ID
158
        administration_id: Optional administration filter
159
        date_filters: Optional dict with from_date, to_date, date_question_id
160
    """
161
    qs = MVLatestMonitoring.objects.filter(form_id=form_id)
1✔
162

163
    if administration_id:
1✔
164
        qs = apply_administration_filter_mv(
1✔
165
            qs, administration_id, 'parent_administration_id'
166
        )
167

168
    if date_filters:
1✔
169
        from_date = date_filters.get("from_date")
1✔
170
        to_date = date_filters.get("to_date")
1✔
171
        date_qname = date_filters.get("date_question_name")
1✔
172

173
        if date_qname:
1✔
174
            matching = MVAnswerDenormalized.objects.filter(
1✔
175
                question_name=date_qname,
176
                answer_name__isnull=False,
177
            )
178
            if from_date:
1!
179
                matching = matching.filter(answer_name__gte=str(from_date))
1✔
180
            if to_date:
1!
181
                matching = matching.filter(
×
182
                    answer_name__lte=_to_date_upper_bound(to_date)
183
                )
184
            qs = qs.filter(latest_data_id__in=matching.values("data_id"))
1✔
185
        else:
186
            if from_date:
1✔
187
                qs = qs.filter(created__date__gte=from_date)
1✔
188
            if to_date:
1✔
189
                qs = qs.filter(created__date__lte=to_date)
1✔
190

191
    return qs
1✔
192

193

194
def get_latest_data_ids_from_mv(
1✔
195
    form_id, administration_id=None, date_filters=None
196
):
197
    """Return latest monitoring data IDs from mv_latest_monitoring.
198

199
    Convenience wrapper around get_latest_monitoring_from_mv() that returns
200
    a flat list of IDs for use in Answers queries.
201
    """
202
    qs = get_latest_monitoring_from_mv(
1✔
203
        form_id, administration_id, date_filters
204
    )
205
    return list(qs.values_list('latest_data_id', flat=True))
1✔
206

207

208
def resolve_default_administration_id(administration_id):
1✔
209
    """Fall back to the root administration (parent IS NULL) when no
210
    administration_id is provided. These visualization endpoints are
211
    public, so we scope to the top-level country by default instead of
212
    leaking data across unrelated administrations."""
213
    if administration_id:
1✔
214
        return administration_id
1✔
215
    root = Administration.objects.filter(
1✔
216
        parent__isnull=True
217
    ).values_list("id", flat=True).first()
218
    if root is None:
1!
219
        raise ValidationError(
×
220
            "No root administration configured; "
221
            "administration_id is required."
222
        )
223
    return root
1✔
224

225

226
def build_date_filters(params):
1✔
227
    """Collect from_date/to_date/date_question_id into a dict.
228

229
    Returns an empty dict when no date filter is set, so callers can
230
    pass `date_filters or None` to subqueries that treat falsy as
231
    'no filter'.
232
    """
233
    date_filters = {}
1✔
234
    if params.get("from_date"):
1✔
235
        date_filters["from_date"] = params["from_date"]
1✔
236
    if params.get("to_date"):
1✔
237
        date_filters["to_date"] = params["to_date"]
1✔
238
    if params.get("date_question_name"):
1✔
239
        date_filters["date_question_name"] = params["date_question_name"]
1✔
240
    return date_filters
1✔
241

242

243
def _to_date_upper_bound(value):
1✔
244
    """Produce an inclusive upper bound for an ISO date-time string.
245

246
    `Answers.name` stores dates as ISO-8601 with time (e.g.
247
    '2025-01-20T00:00:00.000Z'), so a plain `name__lte='2025-01-20'`
248
    excludes same-day records lexically. Appending the latest time
249
    makes `<=` work as an inclusive day boundary.
250
    """
251
    return f"{value}T23:59:59.999Z"
1✔
252

253

254
def get_latest_monitoring_subquery(form_id, date_filters=None):
1✔
255
    """Return the right subquery for latest monitoring ID per parent.
256

257
    Uses mv_latest_monitoring (indexed lookup on (parent_id, form_id))
258
    when safe. Falls back to the correlated subquery otherwise.
259

260
    MV is skipped when:
261
    - date_filters is set: MV stores the absolute latest, not the most
262
      recent within a date range, so date-filtered queries need the
263
      data table scan to find the latest WITHIN range.
264
    - connection.in_atomic_block: TestCase wraps tests in a transaction
265
      so the MV is never refreshed after test data is created.
266

267
    Drop-in replacement for latest_monitoring_subquery() in any
268
    .annotate(latest_id=...) call.
269
    """
270
    if not date_filters and not connection.in_atomic_block:
1!
271
        return Subquery(
×
272
            MVLatestMonitoring.objects.filter(
273
                parent_id=OuterRef('pk'),
274
                form_id=form_id,
275
            ).values('latest_data_id')[:1]
276
        )
277
    return latest_monitoring_subquery(form_id, date_filters)
1✔
278

279

280
def latest_monitoring_subquery(form_id, date_filters=None):
1✔
281
    """Subquery: latest monitoring FormData ID per parent."""
282
    qs = FormData.objects.filter(
1✔
283
        parent=OuterRef("pk"),
284
        form_id=form_id,
285
        is_pending=False,
286
        is_draft=False,
287
    )
288
    if date_filters:
1✔
289
        date_qname = date_filters.get("date_question_name")
1✔
290
        if date_qname:
1✔
291
            sub = Answers.objects.filter(
1✔
292
                data=OuterRef("pk"),
293
                question__name=date_qname,
294
            )
295
            if date_filters.get("from_date"):
1✔
296
                sub = sub.filter(
1✔
297
                    name__gte=date_filters["from_date"],
298
                )
299
            if date_filters.get("to_date"):
1✔
300
                sub = sub.filter(
1✔
301
                    name__lte=_to_date_upper_bound(
302
                        date_filters["to_date"]
303
                    ),
304
                )
305
            qs = qs.filter(
1✔
306
                pk__in=Subquery(sub.values("data_id"))
307
            )
308
        else:
309
            if date_filters.get("from_date"):
1!
310
                qs = qs.filter(
1✔
311
                    created__date__gte=(
312
                        date_filters["from_date"]
313
                    )
314
                )
315
            if date_filters.get("to_date"):
1!
316
                qs = qs.filter(
×
317
                    created__date__lte=(
318
                        date_filters["to_date"]
319
                    )
320
                )
321
    return Subquery(
1✔
322
        qs.order_by("-created").values("id")[:1]
323
    )
324

325

326
def parse_criteria_string(value, allowed_types):
1✔
327
    """Parse a `criteria=type:qid:value,...` query string.
328

329
    Returns a list of {"type", "parts"} dicts. For option_in the
330
    value is split on `|` into a list; for other option operators
331
    the value is passed through as a string; thresholds are coerced
332
    to float. Raises ValueError with a user-visible message on any
333
    malformed fragment so callers can surface a 400.
334
    """
335
    parsed = []
1✔
336
    for item in value.split(","):
1✔
337
        parts = item.strip().split(":")
1✔
338
        if len(parts) < 3:
1✔
339
            raise ValueError(
1✔
340
                f"Invalid criteria format: '{item}'."
341
                " Expected type:qid:value"
342
            )
343
        ctype = parts[0]
1✔
344
        if ctype not in allowed_types:
1✔
345
            raise ValueError(
1✔
346
                f"Invalid criteria type: '{ctype}'."
347
                f" Options: {sorted(allowed_types)}"
348
            )
349
        try:
1✔
350
            if ctype in ("option_equals", "option_contains"):
1✔
351
                qname = validate_qname(parts[1])
1✔
352
                normalized = [qname, parts[2]]
1✔
353
            elif ctype == "option_in":
1✔
354
                qname = validate_qname(parts[1])
1✔
355
                values = [
1✔
356
                    v for v in parts[2].split("|") if v
357
                ]
358
                if not values:
1!
359
                    raise ValueError(
×
360
                        "option_in requires at least one value:"
361
                        f" '{item}'"
362
                    )
363
                normalized = [qname, values]
1✔
364
            elif ctype in ("threshold_gt", "threshold_lt"):
1!
365
                qname = validate_qname(parts[1])
1✔
366
                threshold = float(parts[2])
1✔
367
                normalized = [qname, threshold]
1✔
368
            elif ctype == "overdue":
×
369
                completion_qname = validate_qname(parts[1])
×
370
                deadline_qname = validate_qname(parts[2])
×
371
                normalized = [completion_qname, deadline_qname]
×
372
            else:
373
                normalized = parts[1:]
×
374
        except ValueError as e:
1✔
375
            # Re-raise our own messages; wrap numeric parse failures
376
            if "criteria" in str(e) or "option_in" in str(e):
1!
377
                raise
×
378
            raise ValueError(
1✔
379
                f"Invalid numeric value in criteria: '{item}'."
380
            )
381
        parsed.append({"type": ctype, "parts": normalized})
1✔
382
    return parsed
1✔
383

384

385
def _criterion_matching_ids(data_ids, criterion):
1✔
386
    """Return iterable of data_ids matching a single criterion.
387

388
    Matches over mv_answer_denormalized by question_name (indexed by
389
    idx_mv_answer_question_name) rather than the base Answers table —
390
    Questions.name is unindexed, so a question__name join would seq-scan.
391
    The MV is form-scoped by data_id__in, so results are identical to the
392
    old question_id filter.
393
    """
394
    ctype = criterion["type"]
1✔
395
    parts = criterion["parts"]
1✔
396
    if ctype in ("option_equals", "option_contains"):
1✔
397
        qname, value = parts
1✔
398
        return MVAnswerDenormalized.objects.filter(
1✔
399
            data_id__in=data_ids,
400
            question_name=qname,
401
            answer_options__contains=[value],
402
        ).values_list("data_id", flat=True)
403
    if ctype == "option_in":
1✔
404
        qname, values = parts
1✔
405
        or_q = Q()
1✔
406
        for v in values:
1✔
407
            or_q |= Q(answer_options__contains=[v])
1✔
408
        return MVAnswerDenormalized.objects.filter(
1✔
409
            or_q,
410
            data_id__in=data_ids,
411
            question_name=qname,
412
        ).values_list("data_id", flat=True)
413
    if ctype == "threshold_gt":
1!
414
        qname, threshold = parts
1✔
415
        return MVAnswerDenormalized.objects.filter(
1✔
416
            data_id__in=data_ids,
417
            question_name=qname,
418
            answer_value__gt=threshold,
419
        ).values_list("data_id", flat=True)
420
    if ctype == "threshold_lt":
×
421
        qname, threshold = parts
×
422
        return MVAnswerDenormalized.objects.filter(
×
423
            data_id__in=data_ids,
424
            question_name=qname,
425
            answer_value__lt=threshold,
426
        ).values_list("data_id", flat=True)
427
    return []
×
428

429

430
def narrow_data_ids_by_criteria(data_ids, criteria):
1✔
431
    """Return subset of data_ids where ALL criteria match (AND).
432

433
    Each criterion is evaluated as a separate Answers query over the
434
    current candidate set; the intersection shrinks monotonically so
435
    criteria that narrow heavily short-circuit the remaining work.
436
    """
437
    if not criteria:
1!
438
        return list(data_ids)
×
439
    matching = set(data_ids)
1✔
440
    for criterion in criteria:
1✔
441
        if not matching:
1!
442
            break
×
443
        ids = set(
1✔
444
            _criterion_matching_ids(list(matching), criterion)
445
        )
446
        matching &= ids
1✔
447
    return [i for i in data_ids if i in matching]
1✔
448

449

450
def apply_parent_criteria_to_qs(qs, is_latest, parent_criteria):
1✔
451
    """Narrow by criteria on the PARENT (registration) form's answers.
452

453
    In latest mode `qs` rows are parent FormData (with `latest_id`),
454
    so we match directly against `qs.id`. In non-latest mode `qs`
455
    rows are monitoring FormData, so we match against `qs.parent_id`.
456
    """
457
    if not parent_criteria:
1✔
458
        return qs
1✔
459
    if is_latest:
1✔
460
        parent_ids = list(qs.values_list("id", flat=True))
1✔
461
        narrowed = narrow_data_ids_by_criteria(
1✔
462
            parent_ids, parent_criteria,
463
        )
464
        return qs.filter(id__in=narrowed)
1✔
465
    parent_ids = list(
1✔
466
        qs.values_list("parent_id", flat=True).distinct()
467
    )
468
    narrowed = narrow_data_ids_by_criteria(
1✔
469
        parent_ids, parent_criteria,
470
    )
471
    return qs.filter(parent_id__in=narrowed)
1✔
472

473

474
def apply_criteria_to_monitoring_qs(qs, is_latest, criteria):
1✔
475
    """Narrow a base monitoring queryset by multi-criteria filter.
476

477
    Fetches the current data_ids from `qs` (either `latest_id` or
478
    `id` depending on the mode), intersects them against each
479
    criterion's matching set, then re-filters `qs` so downstream
480
    callers see a consistent narrowed view.
481
    """
482
    if not criteria:
1✔
483
        return qs
1✔
484
    if is_latest:
1✔
485
        ids = list(qs.values_list("latest_id", flat=True))
1✔
486
        narrowed = narrow_data_ids_by_criteria(ids, criteria)
1✔
487
        return qs.filter(latest_id__in=narrowed)
1✔
488
    ids = list(qs.values_list("id", flat=True))
1✔
489
    narrowed = narrow_data_ids_by_criteria(ids, criteria)
1✔
490
    return qs.filter(id__in=narrowed)
1✔
491

492

493
def split_criteria_by_form(criteria, form_id, parent_form_id):
1✔
494
    """Split parsed criteria list into same-form and parent-form."""
495
    if not criteria:
1✔
496
        return None, None
1✔
497
    qnames = {c["parts"][0] for c in criteria}
1✔
498
    on_form = set(
1✔
499
        Questions.objects.filter(
500
            name__in=qnames, form_id=form_id,
501
        ).values_list("name", flat=True)
502
    )
503
    on_parent = set()
1✔
504
    if parent_form_id:
1!
505
        remaining = qnames - on_form
1✔
506
        if remaining:
1!
507
            on_parent = set(
×
508
                Questions.objects.filter(
509
                    name__in=remaining,
510
                    form_id=parent_form_id,
511
                ).values_list("name", flat=True)
512
            )
513
    same = [c for c in criteria if c["parts"][0] in on_form]
1✔
514
    parent = [c for c in criteria if c["parts"][0] in on_parent]
1✔
515
    return same or None, parent or None
1✔
516

517

518
def get_base_monitoring_qs(form, monitoring_form_id, params):
1✔
519
    """Build base queryset for monitoring data.
520

521
    Returns:
522
        Tuple of (queryset, is_monitoring_form, date_filters)
523
    """
524
    monitoring = params.get("monitoring", "latest")
1✔
525
    from_date = params.get("from_date")
1✔
526
    to_date = params.get("to_date")
1✔
527
    date_question_name = params.get("date_question_name")
1✔
528
    administration_id = params.get("administration_id")
1✔
529

530
    date_filters = build_date_filters(params)
1✔
531

532
    is_monitoring = form.parent is not None
1✔
533
    parent_form = (
1✔
534
        form.parent if is_monitoring else form
535
    )
536

537
    if is_monitoring and monitoring == "latest":
1✔
538
        qs = FormData.objects.filter(
1✔
539
            form=parent_form,
540
            parent__isnull=True,
541
            is_pending=False,
542
            is_draft=False,
543
        ).annotate(
544
            latest_id=get_latest_monitoring_subquery(
545
                monitoring_form_id,
546
                date_filters or None,
547
            ),
548
        ).filter(latest_id__isnull=False)
549

550
        if administration_id:
1!
551
            qs = apply_administration_filter(
1✔
552
                qs, administration_id
553
            )
554
        qs = apply_criteria_to_monitoring_qs(
1✔
555
            qs, True, params.get("criteria"),
556
        )
557
        qs = apply_parent_criteria_to_qs(
1✔
558
            qs, True, params.get("parent_criteria"),
559
        )
560
        return qs, True, date_filters
1✔
561

562
    qs = FormData.objects.filter(
1✔
563
        form_id=monitoring_form_id,
564
        is_pending=False,
565
        is_draft=False,
566
    )
567
    if administration_id:
1!
568
        qs = apply_administration_filter(
1✔
569
            qs, administration_id
570
        )
571

572
    if date_filters:
1✔
573
        if date_question_name:
1✔
574
            matching_ids = Answers.objects.filter(
1✔
575
                data__form_id=monitoring_form_id,
576
                question__name=date_question_name,
577
                name__isnull=False,
578
            )
579
            if from_date:
1✔
580
                matching_ids = matching_ids.filter(
1✔
581
                    name__gte=from_date
582
                )
583
            if to_date:
1✔
584
                matching_ids = matching_ids.filter(
1✔
585
                    name__lte=_to_date_upper_bound(to_date)
586
                )
587
            qs = qs.filter(
1✔
588
                id__in=matching_ids.values("data_id")
589
            )
590
        else:
591
            if from_date:
1!
592
                qs = qs.filter(
1✔
593
                    created__date__gte=from_date
594
                )
595
            if to_date:
1✔
596
                qs = qs.filter(
1✔
597
                    created__date__lte=to_date
598
                )
599

600
    qs = apply_criteria_to_monitoring_qs(
1✔
601
        qs, False, params.get("criteria"),
602
    )
603
    qs = apply_parent_criteria_to_qs(
1✔
604
        qs, False, params.get("parent_criteria"),
605
    )
606
    return qs, False, date_filters
1✔
607

608

609
def get_monitoring_data_ids(qs, is_latest_mode):
1✔
610
    """Extract monitoring data IDs from queryset."""
611
    if is_latest_mode:
1✔
612
        return list(
1✔
613
            qs.values_list("latest_id", flat=True)
614
        )
615
    return list(qs.values_list("id", flat=True))
1✔
616

617

618
def format_month_label(dt):
1✔
619
    """Format a date/datetime to 'Mon YYYY' label."""
620
    if hasattr(dt, 'strftime'):
1✔
621
        return dt.strftime("%b %Y")
1✔
622
    try:
1✔
623
        d = dt_datetime.strptime(str(dt)[:7], "%Y-%m")
1✔
624
        return d.strftime("%b %Y")
1✔
625
    except (ValueError, TypeError):
×
626
        return str(dt)
×
627

628

629
def format_month_group(dt):
1✔
630
    """Format to YYYY-MM group key."""
631
    if hasattr(dt, 'strftime'):
1!
632
        return dt.strftime("%Y-%m")
1✔
633
    return str(dt)[:7]
×
634

635

636
def format_date_group(dt):
1✔
637
    """Format to YYYY-MM-DD group key."""
638
    if hasattr(dt, 'strftime'):
1✔
639
        return dt.strftime("%Y-%m-%d")
1✔
640
    return str(dt)[:10]
1✔
641

642

643
def _parse_iso_date(value):
1✔
644
    """Parse YYYY-MM-DD string or pass through date/datetime."""
645
    if isinstance(value, (dt_datetime, date)):
1!
646
        return value if isinstance(value, date) else value.date()
1✔
647
    return dt_datetime.strptime(str(value)[:10], "%Y-%m-%d").date()
×
648

649

650
def fill_month_gaps(data, from_date, to_date):
1✔
651
    """Return a new list with zero-filled month rows between bounds.
652

653
    Preserves existing rows (by `group` key) and inserts zero rows
654
    for every month in [from_date, to_date] that is missing. Output
655
    is sorted chronologically by `group`.
656
    """
657
    start = _parse_iso_date(from_date).replace(day=1)
1✔
658
    end = _parse_iso_date(to_date).replace(day=1)
1✔
659
    existing = {row["group"]: row for row in data}
1✔
660

661
    filled = []
1✔
662
    cursor = start
1✔
663
    while cursor <= end:
1✔
664
        key = cursor.strftime("%Y-%m")
1✔
665
        if key in existing:
1✔
666
            filled.append(existing[key])
1✔
667
        else:
668
            filled.append({
1✔
669
                "value": 0,
670
                "label": cursor.strftime("%b %Y"),
671
                "group": key,
672
            })
673
        # advance to first day of next month
674
        if cursor.month == 12:
1✔
675
            cursor = cursor.replace(year=cursor.year + 1, month=1)
1✔
676
        else:
677
            cursor = cursor.replace(month=cursor.month + 1)
1✔
678
    return filled
1✔
679

680

681
def fill_date_gaps(data, from_date, to_date):
1✔
682
    """Return a new list with zero-filled day rows between bounds.
683

684
    Preserves existing rows (by `group` key) and inserts zero rows
685
    for every day in [from_date, to_date] that is missing. Output
686
    is sorted chronologically by `group`.
687
    """
688
    start = _parse_iso_date(from_date)
1✔
689
    end = _parse_iso_date(to_date)
1✔
690
    existing = {row["group"]: row for row in data}
1✔
691

692
    filled = []
1✔
693
    cursor = start
1✔
694
    while cursor <= end:
1✔
695
        key = cursor.strftime("%Y-%m-%d")
1✔
696
        if key in existing:
1✔
697
            filled.append(existing[key])
1✔
698
        else:
699
            filled.append({
1✔
700
                "value": 0,
701
                "label": key,
702
                "group": key,
703
            })
704
        cursor = cursor + timedelta(days=1)
1✔
705
    return filled
1✔
706

707

708
# -- question_name cross-form helpers --
709

710
def get_values_by_question_name(question_name, params):
1✔
711
    """Get visualization values by question_name across all monitoring forms.
712

713
    Uses mv_cross_form_latest to find the latest value for each parent,
714
    regardless of which monitoring form the answer came from.
715

716
    Args:
717
        question_name: Question name/identifier (e.g., "ph", "status").
718
        params: Dict with administration_id, group_by, value_type, and
719
            optionally sum_by, option_value, rolling_months, from_date,
720
            to_date.
721

722
    Returns:
723
        Tuple of (data, labels) matching existing API response format.
724
    """
725
    administration_id = params.get("administration_id")
1✔
726
    group_by = params.get("group_by")
1✔
727
    value_type = params.get("value_type", "number")
1✔
728
    sum_by = params.get("sum_by")
1✔
729
    option_value = params.get("option_value")
1✔
730
    rolling_months = params.get("rolling_months")
1✔
731
    from_date = params.get("from_date")
1✔
732
    to_date = params.get("to_date")
1✔
733

734
    qs = MVCrossFormLatest.objects.filter(question_name=question_name)
1✔
735

736
    # Optional: scope to a single registration family. Omitted → national
737
    # (cross-family) overview. mv_cross_form_latest carries parent_form_id
738
    # (the parent's registration form) + an index on
739
    # (parent_form_id, question_name).
740
    parent_form_id = params.get("parent_form_id")
1✔
741
    if parent_form_id:
1✔
742
        qs = qs.filter(parent_form_id=parent_form_id)
1✔
743

744
    if administration_id:
1!
745
        qs = apply_administration_filter_mv(
1✔
746
            qs, administration_id, field='administration_id'
747
        )
748

749
    # Recency / date-window filter on the latest submission timestamp.
750
    qs = _apply_qname_date_filter(
1✔
751
        qs, rolling_months, from_date, to_date
752
    )
753

754
    # Card / count mode: a single parent count, optionally narrowed to a
755
    # specific option value. Triggered by sum_by=parent_id or option_value
756
    # so the number/option aggregation defaults below stay unchanged.
757
    if sum_by == "parent_id" or option_value:
1✔
758
        return _count_parents_by_qname(qs, option_value, value_type)
1✔
759

760
    # Pick the most common question_type across all rows (mode).
761
    # A well-formed dataset has one type per question_name; when forms
762
    # disagree (rare), the majority wins with question_type as tiebreak.
763
    type_row = (
1✔
764
        qs.values("question_type")
765
        .annotate(cnt=Count("id"))
766
        .order_by("-cnt", "question_type")
767
        .first()
768
    )
769
    if not type_row:
1✔
770
        return [], []
1✔
771

772
    question_type = type_row["question_type"]
1✔
773

774
    if question_type == 4:  # number
1✔
775
        return _values_by_qname_number(qs, group_by, value_type)
1✔
776
    if question_type in (5, 6):  # option, multiple_option
1!
777
        return _values_by_qname_option(qs, question_name, group_by, value_type)
1✔
778
    return _values_by_qname_text(qs)
×
779

780

781
def _apply_qname_date_filter(qs, rolling_months, from_date, to_date):
1✔
782
    """Filter a cross-form queryset by submission recency / date window.
783

784
    - rolling_months: keep rows whose latest answer is within the last N
785
      months (approximated as N * 30 days from now).
786
    - from_date / to_date: inclusive bounds on the latest answer date.
787
    """
788
    if rolling_months:
1✔
789
        cutoff = timezone.now() - timedelta(days=30 * rolling_months)
1✔
790
        qs = qs.filter(latest_created__gte=cutoff)
1✔
791
    if from_date:
1✔
792
        qs = qs.filter(latest_created__date__gte=from_date)
1✔
793
    if to_date:
1✔
794
        qs = qs.filter(latest_created__date__lte=to_date)
1✔
795
    return qs
1✔
796

797

798
def _count_parents_by_qname(qs, option_value, value_type):
1✔
799
    """Count distinct parents for a question_name (card / KPI mode).
800

801
    When option_value is given, count only parents whose latest option
802
    values contain it. With value_type=percentage, return that count as a
803
    share of all parents that answered the question.
804
    """
805
    total = qs.values("parent_id").distinct().count()
1✔
806
    if option_value:
1✔
807
        matched = (
1✔
808
            qs.filter(latest_option_values__contains=[option_value])
809
            .values("parent_id")
810
            .distinct()
811
            .count()
812
        )
813
    else:
814
        matched = total
1✔
815

816
    if value_type == "percentage":
1✔
817
        value = round(matched / total * 100, 2) if total else 0
1✔
818
    else:
819
        value = matched
1✔
820

821
    label = option_value or "Total"
1✔
822
    group = option_value or "total"
1✔
823
    return [{"value": value, "label": label, "group": group}], [label]
1✔
824

825

826
def _values_by_qname_number(qs, group_by, value_type):
1✔
827
    """Handle number question aggregation by question_name."""
828
    if group_by == "parent_id":
1✔
829
        rows = list(
1✔
830
            qs.filter(latest_numeric_value__isnull=False)
831
            .values("parent_id", "latest_numeric_value")
832
        )
833
        if not rows:
1!
834
            return [], []
×
835
        parent_ids = [r["parent_id"] for r in rows]
1✔
836
        name_map = dict(
1✔
837
            FormData.objects.filter(id__in=parent_ids)
838
            .values_list("id", "name")
839
        )
840
        data = [
1✔
841
            {
842
                "value": round(r["latest_numeric_value"], 2),
843
                "label": name_map.get(r["parent_id"], str(r["parent_id"])),
844
                "group": str(r["parent_id"]),
845
            }
846
            for r in rows
847
        ]
848
    else:
849
        result = qs.filter(latest_numeric_value__isnull=False).aggregate(
1✔
850
            avg_value=Avg("latest_numeric_value"),
851
            total=Count("id"),
852
        )
853
        avg = (
1✔
854
            round(result["avg_value"], 2)
855
            if result["avg_value"] is not None else 0
856
        )
857
        data = [{"value": avg, "label": "Total", "group": "total"}]
1✔
858

859
    if value_type == "percentage" and data:
1✔
860
        total = sum(
1✔
861
            d["value"] for d in data
862
            if isinstance(d["value"], (int, float))
863
        )
864
        if total > 0:
1!
865
            data = [
1✔
866
                {**d, "value": round(d["value"] / total * 100, 2)}
867
                for d in data
868
            ]
869

870
    labels = [d["label"] for d in data]
1✔
871
    return data, labels
1✔
872

873

874
def _values_by_qname_option(qs, question_name, group_by, value_type):
1✔
875
    """Handle option question aggregation by question_name."""
876
    # Deduplicate options by value. Order by (order, value, question_id)
877
    # so the tiebreak between forms sharing the same option value is
878
    # deterministic (lowest question_id wins).
879
    raw_opts = QuestionOptions.objects.filter(
1✔
880
        question__name=question_name,
881
        value__isnull=False,
882
    ).order_by(
883
        "order", "value", "question_id",
884
    ).values("value", "label", "color")
885
    seen = set()
1✔
886
    options = []
1✔
887
    for opt in raw_opts:
1✔
888
        if opt["value"] not in seen:
1!
889
            seen.add(opt["value"])
1✔
890
            options.append(opt)
1✔
891

892
    if group_by == "parent_id":
1✔
893
        rows = list(qs.values("parent_id", "latest_option_values"))
1✔
894
        if not rows:
1!
895
            return [], []
×
896
        parent_ids = [r["parent_id"] for r in rows]
1✔
897
        name_map = dict(
1✔
898
            FormData.objects.filter(id__in=parent_ids)
899
            .values_list("id", "name")
900
        )
901
        data = [
1✔
902
            {
903
                "value": row["latest_option_values"] or [],
904
                "label": name_map.get(
905
                    row["parent_id"], str(row["parent_id"])
906
                ),
907
                "group": str(row["parent_id"]),
908
            }
909
            for row in rows
910
        ]
911
        labels = [d["label"] for d in data]
1✔
912
        return data, labels
1✔
913

914
    # default: group_by == "option"
915
    tallies = defaultdict(int)
1✔
916
    total_parents = 0
1✔
917
    for row in qs.values("latest_option_values"):
1✔
918
        opts = row["latest_option_values"] or []
1✔
919
        for opt_value in opts:
1✔
920
            tallies[opt_value] += 1
1✔
921
        if opts:
1!
922
            total_parents += 1
1✔
923

924
    data = []
1✔
925
    for opt in options:
1✔
926
        count = tallies.get(opt["value"], 0)
1✔
927
        if value_type == "percentage" and total_parents > 0:
1✔
928
            value = round(count / total_parents * 100, 2)
1✔
929
        else:
930
            value = count
1✔
931
        data.append({
1✔
932
            "value": value,
933
            "label": opt["label"] or opt["value"],
934
            "group": opt["value"],
935
            "color": opt.get("color"),
936
        })
937

938
    labels = [d["label"] for d in data]
1✔
939
    return data, labels
1✔
940

941

942
def _values_by_qname_text(qs):
1✔
943
    """Handle text/date question by question_name."""
944
    data = [
×
945
        {
946
            "value": row["latest_text_value"] or "",
947
            "label": str(row["parent_id"]),
948
            "group": str(row["parent_id"]),
949
        }
950
        for row in qs.filter(
951
            latest_text_value__isnull=False
952
        ).values("parent_id", "latest_text_value")
953
    ]
954
    labels = [d["label"] for d in data]
×
955
    return data, labels
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc