• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / iwsims / #59

18 Jun 2026 07:20AM UTC coverage: 88.033% (-0.1%) from 88.13%
#59

push

coveralls-python

web-flow
Merge 5dfcb298b into a6f6761c9

5183 of 6053 branches covered (85.63%)

Branch coverage included in aggregate %.

9979 of 11170 relevant lines covered (89.34%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.63
backend/api/v1/v1_visualization/progress_functions.py
1
from api.v1.v1_data.models import FormData
1✔
2
from api.v1.v1_visualization.functions import (
1✔
3
    apply_administration_filter,
4
    apply_administration_filter_mv,
5
    apply_criteria_to_monitoring_qs,
6
    apply_parent_criteria_to_qs,
7
    build_date_filters,
8
    get_latest_monitoring_subquery,
9
    narrow_data_ids_by_criteria,
10
)
11
from api.v1.v1_visualization.models import (
1✔
12
    MVAnswerDenormalized,
13
    MVLatestMonitoring,
14
)
15

16

17
def compute_any_yes(latest_data_id, question_names, answers_map, **kwargs):
1✔
18
    """100% if ANY listed question answered 'Yes'."""
19
    for qname in question_names:
×
20
        a = answers_map.get((latest_data_id, qname))
×
21
        if a and a.get("options") and "yes" in a["options"]:
×
22
            return 100.0
×
23
    return 0.0
×
24

25

26
def compute_completed_binary(
1✔
27
    latest_data_id, question_names, answers_map, **kwargs
28
):
29
    """100% if answered 'Completed'."""
30
    for qname in question_names:
×
31
        a = answers_map.get((latest_data_id, qname))
×
32
        if a and a.get("options") and "completed" in a["options"]:
×
33
            return 100.0
×
34
    return 0.0
×
35

36

37
def compute_ratio(
1✔
38
    latest_data_id, question_names, answers_map, **kwargs
39
):
40
    """(Implemented / Planned) * 100, clamped to [0, 100].
41

42
    Expects question_names = [implemented_qname, planned_qname].
43
    Returns 0.0 if either value is missing or planned <= 0.
44
    """
45
    if len(question_names) < 2:
1!
46
        return 0.0
×
47
    implemented_qname, planned_qname = question_names[0], question_names[1]
1✔
48
    impl_row = answers_map.get((latest_data_id, implemented_qname))
1✔
49
    plan_row = answers_map.get((latest_data_id, planned_qname))
1✔
50
    implemented = impl_row.get("value") if impl_row else None
1✔
51
    planned = plan_row.get("value") if plan_row else None
1✔
52
    if implemented is None or planned is None:
1!
53
        return 0.0
×
54
    try:
1✔
55
        planned = float(planned)
1✔
56
        implemented = float(implemented)
1✔
57
    except (TypeError, ValueError):
×
58
        return 0.0
×
59
    if planned <= 0:
1!
60
        return 0.0
×
61
    return round(min((implemented / planned) * 100, 100.0), 2)
1✔
62

63

64
def compute_multi_select_proportion(
1✔
65
    latest_data_id, question_names, answers_map,
66
    total_items=1, **kwargs
67
):
68
    """Percentage based on number of selected options."""
69
    selected = None
1✔
70
    for qname in question_names:
1!
71
        a = answers_map.get((latest_data_id, qname))
1✔
72
        if a and a.get("options"):
1!
73
            selected = a["options"]
1✔
74
            break
1✔
75
    if not selected:
1!
76
        return 0.0
×
77
    if not total_items or total_items <= 0:
1!
78
        return 0.0
×
79
    pct = (len(selected) / total_items) * 100
1✔
80
    return round(min(pct, 100.0), 2)
1✔
81

82

83
FORMULA_HANDLERS = {
1✔
84
    "any_yes": compute_any_yes,
85
    "completed_binary": compute_completed_binary,
86
    "ratio": compute_ratio,
87
    "multi_select_proportion": (
88
        compute_multi_select_proportion
89
    ),
90
}
91

92

93
def filter_components_by_scope(components, scope_value):
1✔
94
    """Filter components to those applicable for a scope value.
95

96
    If scope_value is None or a component has no applicable_types,
97
    the component is always included.
98
    """
99
    if not scope_value:
1!
100
        return components
1✔
101
    return [
×
102
        c for c in components
103
        if not c.get("applicable_types")
104
        or scope_value in c["applicable_types"]
105
    ]
106

107

108
def compute_component_scores(
1✔
109
    latest_id, components, answers_map, scope_value=None,
110
):
111
    """Compute progress scores for applicable components."""
112
    active = filter_components_by_scope(components, scope_value)
1✔
113
    scores = {}
1✔
114
    for comp in active:
1✔
115
        handler = FORMULA_HANDLERS[comp["formula"]]
1✔
116
        kwargs = {}
1✔
117
        if comp.get("total_items"):
1✔
118
            kwargs["total_items"] = comp["total_items"]
1✔
119
        scores[comp["key"]] = handler(
1✔
120
            latest_id,
121
            comp["question_names"],
122
            answers_map,
123
            **kwargs,
124
        )
125
    return scores
1✔
126

127

128
def build_progress_answers_map(latest_ids, components):
1✔
129
    """Bulk-fetch answers needed to score all components for all parents.
130

131
    Returns dict keyed by (data_id, question_name) carrying the fields
132
    formula handlers read (options, value).
133
    """
134
    qnames = {
1✔
135
        q for c in components for q in c.get("question_names", [])
136
    }
137
    if not qnames or not latest_ids:
1!
138
        return {}
×
139
    rows = MVAnswerDenormalized.objects.filter(
1✔
140
        data_id__in=latest_ids,
141
        question_name__in=qnames,
142
    ).values(
143
        "data_id", "question_name",
144
        "answer_options", "answer_value",
145
    )
146
    return {
1✔
147
        (r["data_id"], r["question_name"]): {
148
            "options": r["answer_options"],
149
            "value": r["answer_value"],
150
        }
151
        for r in rows
152
    }
153

154

155
def build_histogram(eps_results):
1✔
156
    """Bucket overall progress into 10% ranges.
157

158
    Returns list of 10 buckets, each with progress label
159
    and count.
160
    """
161
    buckets = [
1✔
162
        "0-10%", "11-20%", "21-30%", "31-40%", "41-50%",
163
        "51-60%", "61-70%", "71-80%", "81-90%", "91-100%",
164
    ]
165
    counts = [0] * 10
1✔
166
    for eps in eps_results:
1✔
167
        overall = eps["overall"]
1✔
168
        if overall <= 0:
1!
169
            idx = 0
×
170
        else:
171
            idx = min(max(0, int(overall - 1) // 10), 9)
1✔
172
        counts[idx] += 1
1✔
173
    return [
1✔
174
        {"progress": buckets[i], "count": counts[i]}
175
        for i in range(10)
176
    ]
177

178

179
def handle_progress(
1✔
180
    parent_form, monitoring_form_id,
181
    components, params,
182
):
183
    """Handle progress query.
184

185
    Uses mv_latest_monitoring as the primary queryset when no date
186
    filters are set — the MV has parent_id, parent_name and
187
    latest_data_id pre-joined so no FormData outer query is needed.
188

189
    Falls back to FormData + correlated subquery when date_filters is
190
    set: the MV stores the absolute latest monitoring, not the most
191
    recent within a date range.
192

193
    Args:
194
        parent_form: Registration form instance.
195
        monitoring_form_id: Monitoring form ID.
196
        components: Parsed component list with
197
            key/formula/question_ids/total_items.
198
        params: Dict with filter_question_id,
199
            filter_option_value, administration_id,
200
            from_date, to_date, date_question_id.
201

202
    Returns:
203
        Dict with histogram and details.
204
    """
205
    administration_id = params.get("administration_id")
1✔
206
    filter_qname = params.get("filter_question_name")
1✔
207
    filter_value = params.get("filter_option_value")
1✔
208
    criteria = params.get("criteria")
1✔
209
    parent_criteria = params.get("parent_criteria")
1✔
210
    scope_qname = params.get("scope_question_name")
1✔
211

212
    date_filters = build_date_filters(params)
1✔
213

214
    if not date_filters:
1!
215
        # MV path: single indexed lookup, no FormData join needed.
216
        # The mixin always refreshes MVs in setUp, so this path also
217
        # works correctly inside TestCase transactions.
218
        mv_qs = MVLatestMonitoring.objects.filter(
1✔
219
            form_id=monitoring_form_id
220
        )
221

222
        if administration_id:
1!
223
            mv_qs = apply_administration_filter_mv(
1✔
224
                mv_qs, administration_id,
225
                'parent_administration_id',
226
            )
227

228
        if criteria:
1✔
229
            ids = list(
1✔
230
                mv_qs.values_list("latest_data_id", flat=True)
231
            )
232
            narrowed = narrow_data_ids_by_criteria(ids, criteria)
1✔
233
            mv_qs = mv_qs.filter(latest_data_id__in=narrowed)
1✔
234

235
        if parent_criteria:
1!
236
            pids = list(
×
237
                mv_qs.values_list("parent_id", flat=True)
238
            )
239
            narrowed = narrow_data_ids_by_criteria(
×
240
                pids, parent_criteria
241
            )
242
            mv_qs = mv_qs.filter(parent_id__in=narrowed)
×
243

244
        if filter_qname and filter_value:
1✔
245
            matching = MVAnswerDenormalized.objects.filter(
1✔
246
                data_id__in=mv_qs.values("latest_data_id"),
247
                question_name=filter_qname,
248
                answer_options__contains=[filter_value],
249
            ).values_list("data_id", flat=True)
250
            mv_qs = mv_qs.filter(latest_data_id__in=matching)
1✔
251

252
        parents_data = list(
1✔
253
            mv_qs.values("parent_id", "parent_name", "latest_data_id")
254
        )
255
        latest_ids = [p["latest_data_id"] for p in parents_data]
1✔
256

257
    else:
258
        # FormData fallback for date-filtered queries.
259
        # The correlated subquery finds the most recent submission
260
        # WITHIN the date range — semantics the MV cannot provide.
261
        fd_qs = FormData.objects.filter(
×
262
            form=parent_form,
263
            parent__isnull=True,
264
            is_pending=False,
265
            is_draft=False,
266
        ).annotate(
267
            latest_id=get_latest_monitoring_subquery(
268
                monitoring_form_id, date_filters
269
            ),
270
        ).filter(latest_id__isnull=False)
271

272
        if administration_id:
×
273
            fd_qs = apply_administration_filter(
×
274
                fd_qs, administration_id
275
            )
276

277
        fd_qs = apply_criteria_to_monitoring_qs(
×
278
            fd_qs, True, criteria,
279
        )
280
        fd_qs = apply_parent_criteria_to_qs(
×
281
            fd_qs, True, parent_criteria,
282
        )
283

284
        if filter_qname and filter_value:
×
285
            matching = MVAnswerDenormalized.objects.filter(
×
286
                data_id__in=fd_qs.values("latest_id"),
287
                question_name=filter_qname,
288
                answer_options__contains=[filter_value],
289
            ).values_list("data_id", flat=True)
290
            fd_qs = fd_qs.filter(latest_id__in=matching)
×
291

292
        parents_data = [
×
293
            {
294
                "parent_id": p.id,
295
                "parent_name": p.name,
296
                "latest_data_id": p.latest_id,
297
            }
298
            for p in fd_qs.only("id", "name")
299
        ]
300
        latest_ids = [p["latest_data_id"] for p in parents_data]
×
301

302
    answers_map = build_progress_answers_map(latest_ids, components)
1✔
303

304
    scope_map = {}
1✔
305
    if scope_qname:
1!
306
        scope_rows = MVAnswerDenormalized.objects.filter(
×
307
            data_id__in=latest_ids,
308
            question_name=scope_qname,
309
        ).values("data_id", "answer_options")
310
        for row in scope_rows:
×
311
            opts = row.get("answer_options") or []
×
312
            if opts:
×
313
                scope_map[row["data_id"]] = opts[0]
×
314

315
    eps_results = []
1✔
316
    for parent in parents_data:
1✔
317
        scope_value = scope_map.get(parent["latest_data_id"])
1✔
318
        scores = compute_component_scores(
1✔
319
            parent["latest_data_id"], components, answers_map,
320
            scope_value=scope_value,
321
        )
322
        overall = (
1✔
323
            round(sum(scores.values()) / len(scores), 2)
324
            if scores else 0.0
325
        )
326
        eps_results.append({
1✔
327
            "label": parent["parent_name"],
328
            "group": str(parent["parent_id"]),
329
            "components": scores,
330
            "overall": overall,
331
        })
332

333
    histogram = build_histogram(eps_results)
1✔
334
    return {"histogram": histogram, "details": eps_results}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc