• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / rtmis / #1397

29 Apr 2024 09:46PM UTC coverage: 80.729% (-0.05%) from 80.783%
#1397

push

coveralls-python

web-flow
Merge pull request #1428 from akvo/feature/1424-administration-id-as-datapoint-name-when-submitting-via-webform

Feature/1424 administration id as datapoint name when submitting via webform

2688 of 3492 branches covered (76.98%)

Branch coverage included in aggregate %.

6298 of 7639 relevant lines covered (82.45%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.33
backend/utils/custom_generator.py
1
import os
1✔
2
import sqlite3
1✔
3
import pandas as pd
1✔
4
import logging
1✔
5
from rtmis.settings import MASTER_DATA, STORAGE_PATH
1✔
6
from api.v1.v1_profile.models import Administration
1✔
7

8
logger = logging.getLogger(__name__)
1✔
9

10

11
def generate_sqlite(model):
1✔
12
    table_name = model._meta.db_table
1✔
13
    field_names = [f.name for f in model._meta.fields]
1✔
14
    objects = model.objects.all()
1✔
15
    file_name = f"{MASTER_DATA}/{table_name}.sqlite"
1✔
16
    if os.path.exists(file_name):
1✔
17
        os.remove(file_name)
1✔
18
    data = pd.DataFrame(list(objects.values(*field_names)))
1✔
19
    no_rows = data.shape[0]
1✔
20
    if no_rows < 1:
1✔
21
        return
1✔
22
    if "parent" in field_names:
1✔
23
        data["parent"] = data["parent"].apply(
1✔
24
            lambda x: int(x) if x == x else 0
25
        )
26
    elif "administration" in field_names:
1✔
27
        data["parent"] = data["administration"].apply(
1✔
28
            lambda x: int(x) if x == x else 0
29
        )
30
    else:
31
        data["parent"] = 0
1✔
32
    conn = sqlite3.connect(file_name)
1✔
33
    data.to_sql("nodes", conn, if_exists="replace", index=False)
1✔
34
    conn.close()
1✔
35
    return file_name
1✔
36

37

38
def update_sqlite(model, data, id=None):
1✔
39
    table_name = model._meta.db_table
1✔
40
    fields = data.keys()
1✔
41
    field_names = ", ".join([f for f in fields])
1✔
42
    placeholders = ", ".join(["?" for _ in range(len(fields))])
1✔
43
    update_placeholders = ", ".join([f"{f} = ?" for f in fields])
1✔
44
    params = list(data.values())
1✔
45
    if id:
1✔
46
        params += [id]
1✔
47
    file_name = f"{MASTER_DATA}/{table_name}.sqlite"
1✔
48
    conn = sqlite3.connect(file_name)
1✔
49
    try:
1✔
50
        with conn:
1✔
51
            c = conn.cursor()
1✔
52
            if id:
1✔
53
                c.execute("SELECT * FROM nodes WHERE id = ?", (id,))
1✔
54
                if c.fetchone():
1✔
55
                    query = f"UPDATE nodes \
1✔
56
                        SET {update_placeholders} WHERE id = ?"
57
                    c.execute(query, params)
1✔
58
            if not id:
1✔
59
                query = f"INSERT INTO nodes({field_names}) \
1✔
60
                    VALUES ({placeholders})"
61
                c.execute(query, params)
1✔
62
    except Exception as error:
1✔
63
        logger.error(
1✔
64
            {
65
                "context": "update_sqlite",
66
                "error": error,
67
                "table_name": table_name,
68
                "data": data,
69
                "id": id,
70
            }
71
        )
72
        conn.rollback()
1✔
73
    finally:
74
        conn.close()
1✔
75

76

77
def administration_csv_add(data: dict, test: bool = False):
1✔
78
    filename = "kenya-administration{0}.csv".format("_test" if test else "")
1✔
79
    filepath = f"{STORAGE_PATH}/master_data/{filename}"
1✔
80
    if os.path.exists(filepath):
1!
81
        df = pd.read_csv(filepath)
1✔
82
        new_data = {}
1✔
83
        if data.path:
1!
84
            parent_ids = list(filter(lambda path: path, data.path.split(".")))
1✔
85
            parents = Administration.objects.filter(
1✔
86
                pk__in=parent_ids, level__id__gt=1
87
            ).all()
88
            for p in parents:
1✔
89
                new_data[p.level.name.lower()] = p.name
1✔
90
                new_data[f"{p.level.name.lower()}_id"] = p.id
1✔
91
        new_data[data.level.name.lower()] = data.name
1✔
92
        new_data[f"{data.level.name.lower()}_id"] = data.id
1✔
93
        new_df = pd.DataFrame([new_data])
1✔
94
        df = pd.concat([df, new_df], ignore_index=True)
1✔
95
        df.to_csv(filepath, index=False)
1✔
96
        return filepath
1✔
97
    else:
98
        logger.error(
×
99
            {
100
                "context": "insert_administration_row_csv",
101
                "message": "kenya-administration_test.csv doesn't exists",
102
            }
103
        )
104
    return None
×
105

106

107
def find_index_by_id(df, id):
1✔
108
    for idx, row in df.iterrows():
1✔
109
        last_non_null_col = row.last_valid_index()
1✔
110
        last_non_null_value = row[last_non_null_col]
1✔
111
        if last_non_null_value == id:
1✔
112
            return idx
1✔
113
    return None
1✔
114

115

116
def administration_csv_update(data: dict, test: bool = False):
1✔
117
    filename = "kenya-administration{0}.csv".format("_test" if test else "")
1✔
118
    filepath = f"{STORAGE_PATH}/master_data/{filename}"
1✔
119
    if os.path.exists(filepath):
1!
120
        df = pd.read_csv(filepath)
1✔
121
        index = find_index_by_id(df=df, id=data.pk)
1✔
122
        if index is not None:
1✔
123
            if data.path:
1!
124
                parent_ids = list(
1✔
125
                    filter(lambda path: path, data.path.split("."))
126
                )
127
                parents = Administration.objects.filter(
1✔
128
                    pk__in=parent_ids, level__id__gt=1
129
                ).all()
130
                for p in parents:
1✔
131
                    df.loc[index, p.level.name.lower()] = p.name
1✔
132
                    df.loc[index, f"{p.level.name.lower()}_id"] = p.id
1✔
133
            df.loc[index, data.level.name.lower()] = data.name
1✔
134
            df.loc[index, f"{data.level.name.lower()}_id"] = data.id
1✔
135
        df.to_csv(filepath, index=False)
1✔
136
        return filepath
1✔
137
    else:
138
        logger.error(
×
139
            {
140
                "context": "update_administration_row_csv",
141
                "message": "kenya-administration_test.csv doesn't exists",
142
            }
143
        )
144
    return None
×
145

146

147
def administration_csv_delete(id: int, test: bool = False):
1✔
148
    filename = "kenya-administration{0}.csv".format("_test" if test else "")
1✔
149
    filepath = f"{STORAGE_PATH}/master_data/{filename}"
1✔
150
    if os.path.exists(filepath):
1!
151
        df = pd.read_csv(filepath)
1✔
152
        ix = find_index_by_id(df=df, id=id)
1✔
153
        if ix is not None:
1✔
154
            df.drop(index=ix, inplace=True)
1✔
155
        df.to_csv(filepath, index=False)
1✔
156
        return filepath
1✔
157
    else:
158
        logger.error(
×
159
            {
160
                "context": "delete_administration_row_csv",
161
                "message": "kenya-administration_test.csv doesn't exists",
162
            }
163
        )
164
    return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc