• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / iwsims / #59

18 Jun 2026 07:20AM UTC coverage: 88.033% (-0.1%) from 88.13%
#59

push

coveralls-python

web-flow
Merge 5dfcb298b into a6f6761c9

5183 of 6053 branches covered (85.63%)

Branch coverage included in aggregate %.

9979 of 11170 relevant lines covered (89.34%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.6
backend/api/v1/v1_visualization/escalation_functions.py
1
from datetime import date
1✔
2
from urllib.parse import urlencode
1✔
3

4
from django.db.models import Q
1✔
5

6
from api.v1.v1_data.models import FormData
1✔
7
from api.v1.v1_visualization.functions import (
1✔
8
    apply_administration_filter,
9
    apply_criteria_to_monitoring_qs,
10
    apply_parent_criteria_to_qs,
11
    build_date_filters,
12
    get_latest_monitoring_subquery,
13
    format_date_group,
14
    split_criteria_by_form,
15
)
16
from api.v1.v1_visualization.models import MVAnswerDenormalized
1✔
17

18

19
def build_escalation_criteria_filter(criteria, latest_ids):
1✔
20
    """Build OR query from parsed escalation criteria.
21

22
    Uses mv_answer_denormalized exclusively — GIN index on answer_options
23
    and indexed answer_value/answer_name fields replace the direct Answers
24
    table scans. Tests must call _refresh_all_mvs() in setUp so the MV
25
    contains the test data before this function is called.
26

27
    Args:
28
        criteria: List of parsed criteria dicts with
29
            'type' and 'parts' keys.
30
        latest_ids: List of latest monitoring data IDs.
31

32
    Returns:
33
        Q object combining all criteria with OR logic.
34
    """
35
    or_condition = Q()
1✔
36

37
    for criterion in criteria:
1✔
38
        ctype = criterion["type"]
1✔
39
        parts = criterion["parts"]
1✔
40

41
        if ctype == "option_equals":
1✔
42
            qname = parts[0]
1✔
43
            value = parts[1]
1✔
44
            matching = MVAnswerDenormalized.objects.filter(
1✔
45
                data_id__in=latest_ids,
46
                question_name=qname,
47
                answer_options__contains=[value],
48
            ).values_list("data_id", flat=True)
49
            or_condition |= Q(latest_id__in=matching)
1✔
50

51
        elif ctype == "threshold_gt":
1✔
52
            qname = parts[0]
1✔
53
            threshold = float(parts[1])
1✔
54
            matching = MVAnswerDenormalized.objects.filter(
1✔
55
                data_id__in=latest_ids,
56
                question_name=qname,
57
                answer_value__gt=threshold,
58
            ).values_list("data_id", flat=True)
59
            or_condition |= Q(latest_id__in=matching)
1✔
60

61
        elif ctype == "threshold_lt":
1!
62
            qname = parts[0]
1✔
63
            threshold = float(parts[1])
1✔
64
            matching = MVAnswerDenormalized.objects.filter(
1✔
65
                data_id__in=latest_ids,
66
                question_name=qname,
67
                answer_value__lt=threshold,
68
            ).values_list("data_id", flat=True)
69
            or_condition |= Q(latest_id__in=matching)
1✔
70

71
        elif ctype == "overdue":
×
72
            completion_qname = parts[0]
×
73
            deadline_qname = parts[1]
×
74
            today = date.today().isoformat()
×
75
            incomplete = set(
×
76
                MVAnswerDenormalized.objects.filter(
77
                    data_id__in=latest_ids,
78
                    question_name=completion_qname,
79
                    answer_options__contains=["no"],
80
                ).values_list("data_id", flat=True)
81
            )
82
            overdue_ids = set(
×
83
                MVAnswerDenormalized.objects.filter(
84
                    data_id__in=latest_ids,
85
                    question_name=deadline_qname,
86
                    answer_name__lt=today,
87
                ).values_list("data_id", flat=True)
88
            )
89
            or_condition |= Q(
×
90
                latest_id__in=incomplete & overdue_ids
91
            )
92

93
    return or_condition
1✔
94

95

96
def _answer_cell_value(answer):
1✔
97
    """Collapse an Answers row into the cell value used by escalation."""
98
    if not answer:
1✔
99
        return None
1✔
100
    if answer.get("options"):
1✔
101
        return answer["options"][0]
1✔
102
    if answer.get("value") is not None:
1!
103
        return answer["value"]
1✔
104
    return answer.get("name")
×
105

106

107
def build_column_caches(paginated, columns):
1✔
108
    """Pre-fetch answers and FormData needed for rendering one page.
109

110
    Reduces the per-row work in extract_column_value from O(columns)
111
    queries per row to a handful of bulk queries per page.
112
    """
113
    latest_ids = [p.latest_id for p in paginated]
1✔
114
    parent_ids = [p.id for p in paginated]
1✔
115

116
    answer_qnames = set()
1✔
117
    parent_answer_qnames = set()
1✔
118
    latest_date_qnames = set()
1✔
119
    need_created_fallback = False
1✔
120

121
    for c in columns:
1✔
122
        src = c["source"]
1✔
123
        qname = c.get("question_name")
1✔
124
        if src == "answer" and qname:
1✔
125
            answer_qnames.add(qname)
1✔
126
        elif src == "parent_answer" and qname:
1✔
127
            parent_answer_qnames.add(qname)
1✔
128
        elif src == "latest_date":
1✔
129
            if qname:
1!
130
                latest_date_qnames.add(qname)
1✔
131
            else:
132
                need_created_fallback = True
×
133

134
    answer_map = {}
1✔
135
    if answer_qnames or latest_date_qnames:
1✔
136
        rows = MVAnswerDenormalized.objects.filter(
1✔
137
            data_id__in=latest_ids,
138
            question_name__in=answer_qnames | latest_date_qnames,
139
        ).values(
140
            "data_id", "question_name",
141
            "answer_name", "answer_value", "answer_options",
142
        )
143
        for r in rows:
1✔
144
            answer_map[(r["data_id"], r["question_name"])] = {
1✔
145
                "name": r["answer_name"],
146
                "value": r["answer_value"],
147
                "options": r["answer_options"],
148
            }
149

150
    parent_answer_map = {}
1✔
151
    if parent_answer_qnames:
1✔
152
        rows = MVAnswerDenormalized.objects.filter(
1✔
153
            data_id__in=parent_ids,
154
            question_name__in=parent_answer_qnames,
155
        ).values(
156
            "data_id", "question_name",
157
            "answer_name", "answer_value", "answer_options",
158
        )
159
        for r in rows:
1✔
160
            parent_answer_map[(r["data_id"], r["question_name"])] = {
1✔
161
                "name": r["answer_name"],
162
                "value": r["answer_value"],
163
                "options": r["answer_options"],
164
            }
165

166
    created_map = {}
1✔
167
    if need_created_fallback:
1!
168
        fd_rows = FormData.objects.filter(
×
169
            id__in=latest_ids,
170
        ).values("id", "created")
171
        created_map = {r["id"]: r["created"] for r in fd_rows}
×
172

173
    return {
1✔
174
        "answer": answer_map,
175
        "parent_answer": parent_answer_map,
176
        "created": created_map,
177
    }
178

179

180
def extract_column_value(parent, latest_id, col, caches):
1✔
181
    """Extract a column value for a single parent row using caches."""
182
    source = col["source"]
1✔
183

184
    if source == "parent_name":
1✔
185
        return parent.name
1✔
186

187
    if source == "administration":
1✔
188
        adm = parent.administration
1✔
189
        if adm and adm.path:
1!
190
            ancestors = adm.ancestors.values_list(
1✔
191
                "name", flat=True,
192
            )
193
            parts = list(ancestors) + [adm.name]
1✔
194
            return " > ".join(parts)
1✔
195
        return adm.name if adm else None
×
196

197
    if source == "answer":
1✔
198
        qname = col.get("question_name")
1✔
199
        if not qname:
1!
200
            return None
×
201
        return _answer_cell_value(
1✔
202
            caches["answer"].get((latest_id, qname))
203
        )
204

205
    if source == "parent_answer":
1✔
206
        qname = col.get("question_name")
1✔
207
        if not qname:
1!
208
            return None
×
209
        return _answer_cell_value(
1✔
210
            caches["parent_answer"].get((parent.id, qname))
211
        )
212

213
    if source == "latest_date":
1!
214
        qname = col.get("question_name")
1✔
215
        if qname:
1!
216
            answer = caches["answer"].get((latest_id, qname))
1✔
217
            if answer and answer.get("name"):
1!
218
                return format_date_group(answer["name"])
1✔
219
        created = caches["created"].get(latest_id)
×
220
        if created:
×
221
            return format_date_group(created)
×
222
        return None
×
223

224
    return None
×
225

226

227
def handle_escalation(
1✔
228
    parent_form, monitoring_form_id,
229
    criteria, columns, params,
230
):
231
    """Handle escalation query.
232

233
    Args:
234
        parent_form: Registration form instance.
235
        monitoring_form_id: Monitoring form ID.
236
        criteria: Parsed criteria list.
237
        columns: Parsed columns list.
238
        params: Dict with page, page_size,
239
            administration_id, from_date, to_date.
240

241
    Returns:
242
        Dict with count, next, previous, results.
243
    """
244
    page = params.get("page", 1)
1✔
245
    page_size = params.get("page_size", 20)
1✔
246
    administration_id = params.get("administration_id")
1✔
247

248
    date_filters = build_date_filters(params)
1✔
249

250
    parents = FormData.objects.filter(
1✔
251
        form=parent_form,
252
        parent__isnull=True,
253
        is_pending=False,
254
        is_draft=False,
255
    ).annotate(
256
        latest_id=get_latest_monitoring_subquery(
257
            monitoring_form_id, date_filters or None
258
        ),
259
    ).filter(latest_id__isnull=False)
260

261
    if administration_id:
1!
262
        parents = apply_administration_filter(
1✔
263
            parents, administration_id
264
        )
265

266
    # Optional AND-narrowing criteria (shared grammar with /values)
267
    # applied on top of the OR-escalation criteria. Split by form so
268
    # parent-form filters (e.g. implementing_agency) go through
269
    # apply_parent_criteria_to_qs while monitoring-form filters go
270
    # through apply_criteria_to_monitoring_qs.
271
    filter_criteria = params.get("filter_criteria")
1✔
272
    if filter_criteria:
1✔
273
        mon_criteria, parent_criteria = split_criteria_by_form(
1✔
274
            filter_criteria, monitoring_form_id, parent_form.id,
275
        )
276
        parents = apply_criteria_to_monitoring_qs(
1✔
277
            parents, True, mon_criteria,
278
        )
279
        parents = apply_parent_criteria_to_qs(
1✔
280
            parents, True, parent_criteria,
281
        )
282

283
    latest_ids = list(
1✔
284
        parents.values_list("latest_id", flat=True)
285
    )
286

287
    or_condition = build_escalation_criteria_filter(
1✔
288
        criteria, latest_ids
289
    )
290
    matching = parents.filter(or_condition).order_by("id")
1✔
291

292
    total = matching.count()
1✔
293
    start = (page - 1) * page_size
1✔
294
    end = start + page_size
1✔
295
    paginated = list(
1✔
296
        matching[start:end].select_related("administration")
297
    )
298

299
    caches = build_column_caches(paginated, columns)
1✔
300

301
    results = []
1✔
302
    for parent in paginated:
1✔
303
        row = {"id": parent.id}
1✔
304
        for col in columns:
1✔
305
            row[col["key"]] = extract_column_value(
1✔
306
                parent, parent.latest_id, col, caches,
307
            )
308
        results.append(row)
1✔
309

310
    query_string = params.get("query_string")
1✔
311
    if query_string:
1!
312
        base_params = [
1✔
313
            (k, v) for k, v in query_string
314
            if k != "page"
315
        ]
316
    else:
317
        base_params = [
×
318
            ("monitoring_form_id", monitoring_form_id),
319
            ("page_size", page_size),
320
        ]
321
        if administration_id:
×
322
            base_params.append(
×
323
                ("administration_id", administration_id)
324
            )
325
        for key in ("from_date", "to_date", "date_question_name"):
×
326
            if params.get(key):
×
327
                base_params.append((key, params[key]))
×
328

329
    def build_link(target_page):
1✔
330
        link_params = base_params + [("page", target_page)]
1✔
331
        return f"?{urlencode(link_params, doseq=True)}"
1✔
332

333
    return {
1✔
334
        "count": total,
335
        "next": build_link(page + 1) if end < total else None,
336
        "previous": build_link(page - 1) if page > 1 else None,
337
        "results": results,
338
    }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc