• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / rtmis / #1059

09 Apr 2024 10:11AM UTC coverage: 78.87% (+0.02%) from 78.85%
#1059

push

coveralls-python

web-flow
Merge pull request #1323 from akvo/feature/1322-bug-filter-params-lost-after-saving-data

[#1322] Fix manage user page filter lost after saving data

2375 of 3172 branches covered (74.87%)

Branch coverage included in aggregate %.

5721 of 7093 relevant lines covered (80.66%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.33
backend/utils/custom_generator.py
1
import os
1✔
2
import sqlite3
1✔
3
import pandas as pd
1✔
4
import logging
1✔
5
from rtmis.settings import MASTER_DATA, STORAGE_PATH
1✔
6
from api.v1.v1_profile.models import Administration
1✔
7

8
logger = logging.getLogger(__name__)
1✔
9

10

11
def generate_sqlite(model):
1✔
12
    table_name = model._meta.db_table
1✔
13
    field_names = [f.name for f in model._meta.fields]
1✔
14
    objects = model.objects.all()
1✔
15
    file_name = f"{MASTER_DATA}/{table_name}.sqlite"
1✔
16
    if os.path.exists(file_name):
1✔
17
        os.remove(file_name)
1✔
18
    data = pd.DataFrame(list(objects.values(*field_names)))
1✔
19
    no_rows = data.shape[0]
1✔
20
    if no_rows < 1:
1✔
21
        return
1✔
22
    if "parent" in field_names:
1✔
23
        data["parent"] = data["parent"].apply(
1✔
24
            lambda x: int(x) if x == x else 0
25
        )
26
    elif "administration" in field_names:
1✔
27
        data["parent"] = data["administration"].apply(
1✔
28
            lambda x: int(x) if x == x else 0
29
        )
30
    else:
31
        data["parent"] = 0
1✔
32
    conn = sqlite3.connect(file_name)
1✔
33
    data.to_sql("nodes", conn, if_exists="replace", index=False)
1✔
34
    conn.close()
1✔
35
    return file_name
1✔
36

37

38
def update_sqlite(model, data, id=None):
1✔
39
    table_name = model._meta.db_table
1✔
40
    fields = data.keys()
1✔
41
    field_names = ', '.join([f for f in fields])
1✔
42
    placeholders = ', '.join(['?' for _ in range(len(fields))])
1✔
43
    update_placeholders = ', '.join([f"{f} = ?" for f in fields])
1✔
44
    params = list(data.values())
1✔
45
    if id:
1✔
46
        params += [id]
1✔
47
    file_name = f"{MASTER_DATA}/{table_name}.sqlite"
1✔
48
    conn = sqlite3.connect(file_name)
1✔
49
    try:
1✔
50
        with conn:
1✔
51
            c = conn.cursor()
1✔
52
            if id:
1✔
53
                c.execute("SELECT * FROM nodes WHERE id = ?", (id,))
1✔
54
                if c.fetchone():
1✔
55
                    query = f"UPDATE nodes \
1✔
56
                        SET {update_placeholders} WHERE id = ?"
57
                    c.execute(query, params)
1✔
58
            if not id:
1✔
59
                query = f"INSERT INTO nodes({field_names}) \
1✔
60
                    VALUES ({placeholders})"
61
                c.execute(query, params)
1✔
62
    except Exception as error:
1✔
63
        logger.error({
1✔
64
            'context': 'update_sqlite',
65
            'error': error,
66
            'table_name': table_name,
67
            'data': data,
68
            'id': id
69
        })
70
        conn.rollback()
1✔
71
    finally:
72
        conn.close()
1✔
73

74

75
def administration_csv_add(data: dict, test: bool = False):
1✔
76
    filename = "kenya-administration{0}.csv".format("_test" if test else "")
1✔
77
    filepath = f"{STORAGE_PATH}/master_data/{filename}"
1✔
78
    if os.path.exists(filepath):
1!
79
        df = pd.read_csv(filepath)
1✔
80
        new_data = {}
1✔
81
        if data.path:
1!
82
            parent_ids = list(filter(
1✔
83
                lambda path: path, data.path.split(".")
84
            ))
85
            parents = Administration.objects.filter(
1✔
86
                pk__in=parent_ids,
87
                level__id__gt=1
88
            ).all()
89
            for p in parents:
1✔
90
                new_data[p.level.name.lower()] = p.name
1✔
91
                new_data[f"{p.level.name.lower()}_id"] = p.id
1✔
92
        new_data[data.level.name.lower()] = data.name
1✔
93
        new_data[f"{data.level.name.lower()}_id"] = data.id
1✔
94
        new_df = pd.DataFrame([new_data])
1✔
95
        df = pd.concat([df, new_df], ignore_index=True)
1✔
96
        df.to_csv(filepath, index=False)
1✔
97
        return filepath
1✔
98
    else:
99
        logger.error({
×
100
            'context': 'insert_administration_row_csv',
101
            'message': "kenya-administration_test.csv doesn't exists"
102
        })
103
    return None
×
104

105

106
def find_index_by_id(df, id):
1✔
107
    for idx, row in df.iterrows():
1✔
108
        last_non_null_col = row.last_valid_index()
1✔
109
        last_non_null_value = row[last_non_null_col]
1✔
110
        if last_non_null_value == id:
1✔
111
            return idx
1✔
112
    return None
1✔
113

114

115
def administration_csv_update(data: dict, test: bool = False):
1✔
116
    filename = "kenya-administration{0}.csv".format("_test" if test else "")
1✔
117
    filepath = f"{STORAGE_PATH}/master_data/{filename}"
1✔
118
    if os.path.exists(filepath):
1!
119
        df = pd.read_csv(filepath)
1✔
120
        index = find_index_by_id(df=df, id=data.pk)
1✔
121
        if index is not None:
1✔
122
            if data.path:
1!
123
                parent_ids = list(filter(
1✔
124
                    lambda path: path, data.path.split(".")
125
                ))
126
                parents = Administration.objects.filter(
1✔
127
                    pk__in=parent_ids,
128
                    level__id__gt=1
129
                ).all()
130
                for p in parents:
1✔
131
                    df.loc[index, p.level.name.lower()] = p.name
1✔
132
                    df.loc[index, f"{p.level.name.lower()}_id"] = p.id
1✔
133
            df.loc[index, data.level.name.lower()] = data.name
1✔
134
            df.loc[index, f"{data.level.name.lower()}_id"] = data.id
1✔
135
        df.to_csv(filepath, index=False)
1✔
136
        return filepath
1✔
137
    else:
138
        logger.error({
×
139
            'context': 'update_administration_row_csv',
140
            'message': "kenya-administration_test.csv doesn't exists"
141
        })
142
    return None
×
143

144

145
def administration_csv_delete(id: int, test: bool = False):
1✔
146
    filename = "kenya-administration{0}.csv".format("_test" if test else "")
1✔
147
    filepath = f"{STORAGE_PATH}/master_data/{filename}"
1✔
148
    if os.path.exists(filepath):
1!
149
        df = pd.read_csv(filepath)
1✔
150
        ix = find_index_by_id(df=df, id=id)
1✔
151
        if ix is not None:
1✔
152
            df.drop(index=ix, inplace=True)
1✔
153
        df.to_csv(filepath, index=False)
1✔
154
        return filepath
1✔
155
    else:
156
        logger.error({
×
157
            'context': 'delete_administration_row_csv',
158
            'message': "kenya-administration_test.csv doesn't exists"
159
        })
160
    return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc