• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / rtmis / #1842

27 Jun 2024 11:05AM UTC coverage: 84.738% (-0.1%) from 84.867%
#1842

push

coveralls-python

ifirmawan
[#1050] Update mobile version to 4.0.10

3062 of 3747 branches covered (81.72%)

Branch coverage included in aggregate %.

6849 of 7949 relevant lines covered (86.16%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.87
backend/api/v1/v1_data/functions.py
1
import re
1✔
2
import pandas as pd
1✔
3
from operator import or_
1✔
4
from functools import reduce
1✔
5
from django.core.cache import cache
1✔
6
from datetime import datetime
1✔
7
from django.db.models import Q, Count, Sum
1✔
8
from django.db import transaction, connection
1✔
9
from api.v1.v1_profile.functions import get_administration_ids_by_path
1✔
10
from api.v1.v1_data.models import ViewOptions, ViewDataOptions, Answers
1✔
11
from api.v1.v1_forms.models import QuestionOptions, QuestionTypes
1✔
12

13

14
@transaction.atomic
1✔
15
def refresh_materialized_data():
1✔
16
    with connection.cursor() as cursor:
1✔
17
        cursor.execute("""
1✔
18
            REFRESH MATERIALIZED VIEW view_data_options;
19
            REFRESH MATERIALIZED VIEW view_options;
20
            REFRESH MATERIALIZED VIEW view_jmp_criteria;
21
            REFRESH MATERIALIZED VIEW view_jmp_data;
22
            REFRESH MATERIALIZED VIEW view_jmp_count;
23
            """)
24

25

26
def get_cache(name):
1✔
27
    name = re.sub(r'[\W_]+', '_', name)
1✔
28
    today = datetime.now().strftime("%Y%m%d")
1✔
29
    cache_name = f"{today}-{name}"
1✔
30
    data = cache.get(cache_name)
1✔
31
    if data:
1!
32
        return data
×
33
    return None
1✔
34

35

36
def create_cache(name, resp, timeout=None):
1✔
37
    name = re.sub(r'[\W_]+', '_', name)
1✔
38
    today = datetime.now().strftime("%Y%m%d")
1✔
39
    cache_name = f"{today}-{name}"
1✔
40
    cache.add(cache_name, resp, timeout=timeout)
1✔
41

42

43
def get_questions_options_from_params(params):
1✔
44
    question_ids = [
×
45
        o.get('question').id for p in params
46
        for o in p.get("options")]
47
    options = [
×
48
        b for p in params for o in p.get("options")
49
        for b in o.get('option')]
50
    return question_ids, options
×
51

52

53
def filter_by_criteria(params, question_ids, options,
1✔
54
                       administration_ids, filter,
55
                       is_map=False, data_ids=[]):
56
    result = []
×
57
    data_views = ViewOptions.objects.filter(
×
58
        question_id__in=question_ids,
59
        options__in=options,
60
        administration_id__in=administration_ids)
61
    if filter:
×
62
        data_views = data_views.filter(data_id__in=data_ids)
×
63
    data_views = data_views.values_list(
×
64
        'data_id', 'question_id', 'options')
65

66
    df = pd.DataFrame(
×
67
        list(data_views),
68
        columns=['data_id', 'question_id', 'options'])
69
    for param in params:
×
70
        filter_criteria = []
×
71
        for index, option in enumerate(param.get('options')):
×
72
            question = option.get('question').id
×
73
            opts = [o for o in option.get('option')]
×
74
            if df.shape[0]:
×
75
                filter_df = df[
×
76
                    (df['question_id'] == question) &
77
                    (df['options'].isin(opts))]
78
                if filter_criteria:
×
79
                    filter_df = filter_df[
×
80
                        (filter_df['data_id'].isin(filter_criteria))]
81
                if filter_criteria and index > 0:
×
82
                    # reset filter_criteria for next question
83
                    # start from second question criteria
84
                    # support and filter
85
                    filter_criteria = []
×
86
                if filter_df.shape[0]:
×
87
                    filter_df = filter_df[~filter_df['data_id'].isin(
×
88
                        filter_criteria)]
89
                    filter_criteria += list(
×
90
                        filter_df['data_id'].unique())
91
        if is_map:
×
92
            result.append(len(filter_criteria))
×
93
        if not is_map:
×
94
            result.append({
×
95
                "name": param.get('name'),
96
                "value": len(filter_criteria)})
97
    del df
×
98
    del question_ids
×
99
    del options
×
100
    return result
×
101

102

103
def get_advance_filter_data_ids(form_id, administration_id,
1✔
104
                                options, is_glaas=False):
105
    data = ViewDataOptions.objects.filter(form_id=form_id)
1✔
106
    if administration_id and not is_glaas:
1✔
107
        administration_ids = get_administration_ids_by_path(
1✔
108
            administration_id=administration_id)
109
        data = data.filter(administration_id__in=administration_ids)
1✔
110
    if administration_id and is_glaas:
1!
111
        data = data.filter(administration_id__in=administration_id)
×
112
    if options:
1!
113
        # create unique list for question id in options
114
        qids = []
1✔
115
        for opt in options:
1✔
116
            val = opt.split("||")[0]
1✔
117
            if val in qids:
1!
118
                continue
×
119
            qids.append(val)
1✔
120
        # filter options and_ when different question id
121
        for qid in qids:
1✔
122
            # filter options or_ when same question id
123
            or_filter_value = filter(lambda x: qid in x, options)
1✔
124
            data = data.filter(reduce(
1✔
125
                or_, [Q(options__contains=op) for op in or_filter_value]))
126
    data = data.values_list('data_id', flat=True)
1✔
127
    return data
1✔
128

129

130
def transform_glass_answer(temp, questions, data_ids):
1✔
131
    for q in questions:
×
132
        value = None
×
133
        answers = Answers.objects.filter(
×
134
            question_id=q.id, data_id__in=data_ids)
135
        if q.type == QuestionTypes.option:
×
136
            options = QuestionOptions.objects.filter(
×
137
                question_id=q.id).values_list('name', flat=True)
138
            value = {opt: 0 for opt in options}
×
139
            answers = answers.values(
×
140
                'options').annotate(count=Count('options'))
141
            for a in answers:
×
142
                key = a.get('options')[0]
×
143
                prev = value.get(key)
×
144
                value[key] = prev + a.get('count')
×
145
        if q.type == QuestionTypes.number:
×
146
            value = 0
×
147
            answers = answers.values(
×
148
                'value').annotate(sum=Sum('value'))
149
            for a in answers:
×
150
                value += a.get('sum')
×
151
        temp.update({q.id: value})
×
152
    return temp
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc