• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / iwsims / #58

18 Jun 2026 06:20AM UTC coverage: 88.13% (-0.004%) from 88.134%
#58

push

coveralls-python

web-flow
Merge pull request #21 from akvo/feature/20-MV4V-1

[#20] MV4V Task 1 to Task 3

5059 of 5892 branches covered (85.86%)

Branch coverage included in aggregate %.

9790 of 10957 relevant lines covered (89.35%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.79
backend/api/v1/v1_visualization/functions.py
1
import logging
1✔
2

3
from django.db import connection
1✔
4
from django.db.models import (
1✔
5
    Q, Subquery, OuterRef,
6
)
7
from datetime import datetime as dt_datetime, timedelta, date
1✔
8
from rest_framework.exceptions import ValidationError
1✔
9

10
from api.v1.v1_data.models import FormData, Answers
1✔
11
from api.v1.v1_forms.models import Questions
1✔
12
from api.v1.v1_profile.models import Administration
1✔
13
from api.v1.v1_visualization.constants import MATERIALIZED_VIEWS
1✔
14

15

16
logger = logging.getLogger(__name__)
1✔
17

18

19
def refresh_materialized_data(views=None, concurrent=False):
1✔
20
    """Refresh materialized views.
21

22
    Args:
23
        views: List of view names to refresh. Defaults to all views.
24
        concurrent: Use REFRESH CONCURRENTLY (non-blocking, requires
25
                    unique index). Falls back to regular refresh on error.
26

27
    Note: Not wrapped in @transaction.atomic — REFRESH CONCURRENTLY
28
    cannot run inside a transaction. Django's default autocommit mode
29
    makes each cursor context an independent transaction.
30
    """
31
    views_to_refresh = views or MATERIALIZED_VIEWS
1✔
32

33
    for view in views_to_refresh:
1✔
34
        # REFRESH CONCURRENTLY cannot run inside a transaction block.
35
        # Django's TestCase wraps tests in a transaction, so downgrade
36
        # to a regular refresh when in_atomic_block is True.
37
        use_concurrent = concurrent and not connection.in_atomic_block
1✔
38

39
        if use_concurrent:
1!
40
            try:
×
41
                with connection.cursor() as cursor:
×
42
                    cursor.execute(
×
43
                        f"REFRESH MATERIALIZED VIEW CONCURRENTLY {view};"
44
                    )
45
                logger.info(f"Refreshed materialized view: {view}")
×
46
            except Exception as e:
×
47
                logger.warning(
×
48
                    f"Concurrent refresh failed for {view}: {e} — "
49
                    f"falling back to regular refresh"
50
                )
51
                try:
×
52
                    with connection.cursor() as cursor:
×
53
                        cursor.execute(
×
54
                            f"REFRESH MATERIALIZED VIEW {view};"
55
                        )
56
                    logger.info(
×
57
                        f"Refreshed {view} (fallback to non-concurrent)"
58
                    )
59
                except Exception as e2:
×
60
                    logger.error(
×
61
                        f"Fallback refresh also failed for {view}: {e2}"
62
                    )
63
                    raise
×
64
        else:
65
            with connection.cursor() as cursor:
1✔
66
                cursor.execute(
1✔
67
                    f"REFRESH MATERIALIZED VIEW {view};"
68
                )
69
            logger.info(f"Refreshed materialized view: {view}")
1✔
70

71

72
# -- Shared helpers --
73

74
def apply_administration_filter(queryset, administration_id):
1✔
75
    """Filter queryset by administration hierarchy."""
76
    try:
1✔
77
        adm = Administration.objects.get(
1✔
78
            pk=administration_id
79
        )
80
    except Administration.DoesNotExist:
1✔
81
        return queryset.none()
1✔
82
    adm_path = (
1✔
83
        f"{adm.path}{adm.id}." if adm.path
84
        else f"{adm.id}."
85
    )
86
    return queryset.filter(
1✔
87
        Q(administration_id=administration_id)
88
        | Q(administration__path__startswith=adm_path)
89
    )
90

91

92
def resolve_default_administration_id(administration_id):
1✔
93
    """Fall back to the root administration (parent IS NULL) when no
94
    administration_id is provided. These visualization endpoints are
95
    public, so we scope to the top-level country by default instead of
96
    leaking data across unrelated administrations."""
97
    if administration_id:
1✔
98
        return administration_id
1✔
99
    root = Administration.objects.filter(
1✔
100
        parent__isnull=True
101
    ).values_list("id", flat=True).first()
102
    if root is None:
1!
103
        raise ValidationError(
×
104
            "No root administration configured; "
105
            "administration_id is required."
106
        )
107
    return root
1✔
108

109

110
def build_date_filters(params):
1✔
111
    """Collect from_date/to_date/date_question_id into a dict.
112

113
    Returns an empty dict when no date filter is set, so callers can
114
    pass `date_filters or None` to subqueries that treat falsy as
115
    'no filter'.
116
    """
117
    date_filters = {}
1✔
118
    if params.get("from_date"):
1✔
119
        date_filters["from_date"] = params["from_date"]
1✔
120
    if params.get("to_date"):
1✔
121
        date_filters["to_date"] = params["to_date"]
1✔
122
    if params.get("date_question_id"):
1✔
123
        date_filters["date_question_id"] = params["date_question_id"]
1✔
124
    return date_filters
1✔
125

126

127
def _to_date_upper_bound(value):
1✔
128
    """Produce an inclusive upper bound for an ISO date-time string.
129

130
    `Answers.name` stores dates as ISO-8601 with time (e.g.
131
    '2025-01-20T00:00:00.000Z'), so a plain `name__lte='2025-01-20'`
132
    excludes same-day records lexically. Appending the latest time
133
    makes `<=` work as an inclusive day boundary.
134
    """
135
    return f"{value}T23:59:59.999Z"
1✔
136

137

138
def latest_monitoring_subquery(form_id, date_filters=None):
1✔
139
    """Subquery: latest monitoring FormData ID per parent."""
140
    qs = FormData.objects.filter(
1✔
141
        parent=OuterRef("pk"),
142
        form_id=form_id,
143
        is_pending=False,
144
        is_draft=False,
145
    )
146
    if date_filters:
1✔
147
        date_qid = date_filters.get("date_question_id")
1✔
148
        if date_qid:
1✔
149
            sub = Answers.objects.filter(
1✔
150
                data=OuterRef("pk"),
151
                question_id=date_qid,
152
            )
153
            if date_filters.get("from_date"):
1✔
154
                sub = sub.filter(
1✔
155
                    name__gte=date_filters["from_date"],
156
                )
157
            if date_filters.get("to_date"):
1✔
158
                sub = sub.filter(
1✔
159
                    name__lte=_to_date_upper_bound(
160
                        date_filters["to_date"]
161
                    ),
162
                )
163
            qs = qs.filter(
1✔
164
                pk__in=Subquery(sub.values("data_id"))
165
            )
166
        else:
167
            if date_filters.get("from_date"):
1!
168
                qs = qs.filter(
1✔
169
                    created__date__gte=(
170
                        date_filters["from_date"]
171
                    )
172
                )
173
            if date_filters.get("to_date"):
1!
174
                qs = qs.filter(
×
175
                    created__date__lte=(
176
                        date_filters["to_date"]
177
                    )
178
                )
179
    return Subquery(
1✔
180
        qs.order_by("-created").values("id")[:1]
181
    )
182

183

184
def parse_criteria_string(value, allowed_types):
1✔
185
    """Parse a `criteria=type:qid:value,...` query string.
186

187
    Returns a list of {"type", "parts"} dicts. For option_in the
188
    value is split on `|` into a list; for other option operators
189
    the value is passed through as a string; thresholds are coerced
190
    to float. Raises ValueError with a user-visible message on any
191
    malformed fragment so callers can surface a 400.
192
    """
193
    parsed = []
1✔
194
    for item in value.split(","):
1✔
195
        parts = item.strip().split(":")
1✔
196
        if len(parts) < 3:
1✔
197
            raise ValueError(
1✔
198
                f"Invalid criteria format: '{item}'."
199
                " Expected type:qid:value"
200
            )
201
        ctype = parts[0]
1✔
202
        if ctype not in allowed_types:
1✔
203
            raise ValueError(
1✔
204
                f"Invalid criteria type: '{ctype}'."
205
                f" Options: {sorted(allowed_types)}"
206
            )
207
        try:
1✔
208
            if ctype in ("option_equals", "option_contains"):
1✔
209
                qid = int(parts[1])
1✔
210
                normalized = [qid, parts[2]]
1✔
211
            elif ctype == "option_in":
1✔
212
                qid = int(parts[1])
1✔
213
                values = [
1✔
214
                    v for v in parts[2].split("|") if v
215
                ]
216
                if not values:
1!
217
                    raise ValueError(
×
218
                        "option_in requires at least one value:"
219
                        f" '{item}'"
220
                    )
221
                normalized = [qid, values]
1✔
222
            elif ctype in ("threshold_gt", "threshold_lt"):
1!
223
                qid = int(parts[1])
1✔
224
                threshold = float(parts[2])
1✔
225
                normalized = [qid, threshold]
1✔
226
            elif ctype == "overdue":
×
227
                completion_qid = int(parts[1])
×
228
                deadline_qid = int(parts[2])
×
229
                normalized = [completion_qid, deadline_qid]
×
230
            else:
231
                normalized = parts[1:]
×
232
        except ValueError as e:
1✔
233
            # Re-raise our own messages; wrap numeric parse failures
234
            if "criteria" in str(e) or "option_in" in str(e):
1!
235
                raise
×
236
            raise ValueError(
1✔
237
                f"Invalid numeric value in criteria: '{item}'."
238
            )
239
        parsed.append({"type": ctype, "parts": normalized})
1✔
240
    return parsed
1✔
241

242

243
def _criterion_matching_ids(data_ids, criterion):
1✔
244
    """Return iterable of data_ids matching a single criterion."""
245
    ctype = criterion["type"]
1✔
246
    parts = criterion["parts"]
1✔
247
    if ctype in ("option_equals", "option_contains"):
1✔
248
        qid, value = parts
1✔
249
        return Answers.objects.filter(
1✔
250
            data_id__in=data_ids,
251
            question_id=qid,
252
            options__contains=[value],
253
        ).values_list("data_id", flat=True)
254
    if ctype == "option_in":
1✔
255
        qid, values = parts
1✔
256
        or_q = Q()
1✔
257
        for v in values:
1✔
258
            or_q |= Q(options__contains=[v])
1✔
259
        return Answers.objects.filter(
1✔
260
            or_q,
261
            data_id__in=data_ids,
262
            question_id=qid,
263
        ).values_list("data_id", flat=True)
264
    if ctype == "threshold_gt":
1!
265
        qid, threshold = parts
1✔
266
        return Answers.objects.filter(
1✔
267
            data_id__in=data_ids,
268
            question_id=qid,
269
            value__gt=threshold,
270
        ).values_list("data_id", flat=True)
271
    if ctype == "threshold_lt":
×
272
        qid, threshold = parts
×
273
        return Answers.objects.filter(
×
274
            data_id__in=data_ids,
275
            question_id=qid,
276
            value__lt=threshold,
277
        ).values_list("data_id", flat=True)
278
    return []
×
279

280

281
def narrow_data_ids_by_criteria(data_ids, criteria):
1✔
282
    """Return subset of data_ids where ALL criteria match (AND).
283

284
    Each criterion is evaluated as a separate Answers query over the
285
    current candidate set; the intersection shrinks monotonically so
286
    criteria that narrow heavily short-circuit the remaining work.
287
    """
288
    if not criteria:
1!
289
        return list(data_ids)
×
290
    matching = set(data_ids)
1✔
291
    for criterion in criteria:
1✔
292
        if not matching:
1!
293
            break
×
294
        ids = set(
1✔
295
            _criterion_matching_ids(list(matching), criterion)
296
        )
297
        matching &= ids
1✔
298
    return [i for i in data_ids if i in matching]
1✔
299

300

301
def apply_parent_criteria_to_qs(qs, is_latest, parent_criteria):
1✔
302
    """Narrow by criteria on the PARENT (registration) form's answers.
303

304
    In latest mode `qs` rows are parent FormData (with `latest_id`),
305
    so we match directly against `qs.id`. In non-latest mode `qs`
306
    rows are monitoring FormData, so we match against `qs.parent_id`.
307
    """
308
    if not parent_criteria:
1✔
309
        return qs
1✔
310
    if is_latest:
1✔
311
        parent_ids = list(qs.values_list("id", flat=True))
1✔
312
        narrowed = narrow_data_ids_by_criteria(
1✔
313
            parent_ids, parent_criteria,
314
        )
315
        return qs.filter(id__in=narrowed)
1✔
316
    parent_ids = list(
1✔
317
        qs.values_list("parent_id", flat=True).distinct()
318
    )
319
    narrowed = narrow_data_ids_by_criteria(
1✔
320
        parent_ids, parent_criteria,
321
    )
322
    return qs.filter(parent_id__in=narrowed)
1✔
323

324

325
def apply_criteria_to_monitoring_qs(qs, is_latest, criteria):
1✔
326
    """Narrow a base monitoring queryset by multi-criteria filter.
327

328
    Fetches the current data_ids from `qs` (either `latest_id` or
329
    `id` depending on the mode), intersects them against each
330
    criterion's matching set, then re-filters `qs` so downstream
331
    callers see a consistent narrowed view.
332
    """
333
    if not criteria:
1✔
334
        return qs
1✔
335
    if is_latest:
1✔
336
        ids = list(qs.values_list("latest_id", flat=True))
1✔
337
        narrowed = narrow_data_ids_by_criteria(ids, criteria)
1✔
338
        return qs.filter(latest_id__in=narrowed)
1✔
339
    ids = list(qs.values_list("id", flat=True))
1✔
340
    narrowed = narrow_data_ids_by_criteria(ids, criteria)
1✔
341
    return qs.filter(id__in=narrowed)
1✔
342

343

344
def split_criteria_by_form(criteria, form_id, parent_form_id):
1✔
345
    """Split parsed criteria list into same-form and parent-form."""
346
    if not criteria:
1✔
347
        return None, None
1✔
348
    qids = {c["parts"][0] for c in criteria}
1✔
349
    on_form = set(
1✔
350
        Questions.objects.filter(
351
            pk__in=qids, form_id=form_id,
352
        ).values_list("pk", flat=True)
353
    )
354
    on_parent = set()
1✔
355
    if parent_form_id:
1!
356
        remaining = qids - on_form
1✔
357
        if remaining:
1!
358
            on_parent = set(
×
359
                Questions.objects.filter(
360
                    pk__in=remaining,
361
                    form_id=parent_form_id,
362
                ).values_list("pk", flat=True)
363
            )
364
    same = [c for c in criteria if c["parts"][0] in on_form]
1✔
365
    parent = [c for c in criteria if c["parts"][0] in on_parent]
1✔
366
    return same or None, parent or None
1✔
367

368

369
def get_base_monitoring_qs(form, monitoring_form_id, params):
1✔
370
    """Build base queryset for monitoring data.
371

372
    Returns:
373
        Tuple of (queryset, is_monitoring_form, date_filters)
374
    """
375
    monitoring = params.get("monitoring", "latest")
1✔
376
    from_date = params.get("from_date")
1✔
377
    to_date = params.get("to_date")
1✔
378
    date_question_id = params.get("date_question_id")
1✔
379
    administration_id = params.get("administration_id")
1✔
380

381
    date_filters = build_date_filters(params)
1✔
382

383
    is_monitoring = form.parent is not None
1✔
384
    parent_form = (
1✔
385
        form.parent if is_monitoring else form
386
    )
387

388
    if is_monitoring and monitoring == "latest":
1✔
389
        qs = FormData.objects.filter(
1✔
390
            form=parent_form,
391
            parent__isnull=True,
392
            is_pending=False,
393
            is_draft=False,
394
        ).annotate(
395
            latest_id=latest_monitoring_subquery(
396
                monitoring_form_id,
397
                date_filters or None,
398
            ),
399
        ).filter(latest_id__isnull=False)
400

401
        if administration_id:
1!
402
            qs = apply_administration_filter(
1✔
403
                qs, administration_id
404
            )
405
        qs = apply_criteria_to_monitoring_qs(
1✔
406
            qs, True, params.get("criteria"),
407
        )
408
        qs = apply_parent_criteria_to_qs(
1✔
409
            qs, True, params.get("parent_criteria"),
410
        )
411
        return qs, True, date_filters
1✔
412

413
    qs = FormData.objects.filter(
1✔
414
        form_id=monitoring_form_id,
415
        is_pending=False,
416
        is_draft=False,
417
    )
418
    if administration_id:
1!
419
        qs = apply_administration_filter(
1✔
420
            qs, administration_id
421
        )
422

423
    if date_filters:
1✔
424
        if date_question_id:
1✔
425
            matching_ids = Answers.objects.filter(
1✔
426
                data__form_id=monitoring_form_id,
427
                question_id=date_question_id,
428
                name__isnull=False,
429
            )
430
            if from_date:
1✔
431
                matching_ids = matching_ids.filter(
1✔
432
                    name__gte=from_date
433
                )
434
            if to_date:
1✔
435
                matching_ids = matching_ids.filter(
1✔
436
                    name__lte=_to_date_upper_bound(to_date)
437
                )
438
            qs = qs.filter(
1✔
439
                id__in=matching_ids.values("data_id")
440
            )
441
        else:
442
            if from_date:
1!
443
                qs = qs.filter(
1✔
444
                    created__date__gte=from_date
445
                )
446
            if to_date:
1✔
447
                qs = qs.filter(
1✔
448
                    created__date__lte=to_date
449
                )
450

451
    qs = apply_criteria_to_monitoring_qs(
1✔
452
        qs, False, params.get("criteria"),
453
    )
454
    qs = apply_parent_criteria_to_qs(
1✔
455
        qs, False, params.get("parent_criteria"),
456
    )
457
    return qs, False, date_filters
1✔
458

459

460
def get_monitoring_data_ids(qs, is_latest_mode):
1✔
461
    """Extract monitoring data IDs from queryset."""
462
    if is_latest_mode:
1✔
463
        return list(
1✔
464
            qs.values_list("latest_id", flat=True)
465
        )
466
    return list(qs.values_list("id", flat=True))
1✔
467

468

469
def format_month_label(dt):
1✔
470
    """Format a date/datetime to 'Mon YYYY' label."""
471
    if hasattr(dt, 'strftime'):
1✔
472
        return dt.strftime("%b %Y")
1✔
473
    try:
1✔
474
        d = dt_datetime.strptime(str(dt)[:7], "%Y-%m")
1✔
475
        return d.strftime("%b %Y")
1✔
476
    except (ValueError, TypeError):
×
477
        return str(dt)
×
478

479

480
def format_month_group(dt):
1✔
481
    """Format to YYYY-MM group key."""
482
    if hasattr(dt, 'strftime'):
1!
483
        return dt.strftime("%Y-%m")
1✔
484
    return str(dt)[:7]
×
485

486

487
def format_date_group(dt):
1✔
488
    """Format to YYYY-MM-DD group key."""
489
    if hasattr(dt, 'strftime'):
1✔
490
        return dt.strftime("%Y-%m-%d")
1✔
491
    return str(dt)[:10]
1✔
492

493

494
def _parse_iso_date(value):
1✔
495
    """Parse YYYY-MM-DD string or pass through date/datetime."""
496
    if isinstance(value, (dt_datetime, date)):
1!
497
        return value if isinstance(value, date) else value.date()
1✔
498
    return dt_datetime.strptime(str(value)[:10], "%Y-%m-%d").date()
×
499

500

501
def fill_month_gaps(data, from_date, to_date):
1✔
502
    """Return a new list with zero-filled month rows between bounds.
503

504
    Preserves existing rows (by `group` key) and inserts zero rows
505
    for every month in [from_date, to_date] that is missing. Output
506
    is sorted chronologically by `group`.
507
    """
508
    start = _parse_iso_date(from_date).replace(day=1)
1✔
509
    end = _parse_iso_date(to_date).replace(day=1)
1✔
510
    existing = {row["group"]: row for row in data}
1✔
511

512
    filled = []
1✔
513
    cursor = start
1✔
514
    while cursor <= end:
1✔
515
        key = cursor.strftime("%Y-%m")
1✔
516
        if key in existing:
1✔
517
            filled.append(existing[key])
1✔
518
        else:
519
            filled.append({
1✔
520
                "value": 0,
521
                "label": cursor.strftime("%b %Y"),
522
                "group": key,
523
            })
524
        # advance to first day of next month
525
        if cursor.month == 12:
1✔
526
            cursor = cursor.replace(year=cursor.year + 1, month=1)
1✔
527
        else:
528
            cursor = cursor.replace(month=cursor.month + 1)
1✔
529
    return filled
1✔
530

531

532
def fill_date_gaps(data, from_date, to_date):
1✔
533
    """Return a new list with zero-filled day rows between bounds.
534

535
    Preserves existing rows (by `group` key) and inserts zero rows
536
    for every day in [from_date, to_date] that is missing. Output
537
    is sorted chronologically by `group`.
538
    """
539
    start = _parse_iso_date(from_date)
1✔
540
    end = _parse_iso_date(to_date)
1✔
541
    existing = {row["group"]: row for row in data}
1✔
542

543
    filled = []
1✔
544
    cursor = start
1✔
545
    while cursor <= end:
1✔
546
        key = cursor.strftime("%Y-%m-%d")
1✔
547
        if key in existing:
1✔
548
            filled.append(existing[key])
1✔
549
        else:
550
            filled.append({
1✔
551
                "value": 0,
552
                "label": key,
553
                "group": key,
554
            })
555
        cursor = cursor + timedelta(days=1)
1✔
556
    return filled
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc