• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / akvo-mis / #457

24 Dec 2025 10:13PM UTC coverage: 88.647% (-0.008%) from 88.655%
#457

push

coveralls-python

web-flow
Merge pull request #145 from akvo/feature/144-akvo-flow-administration-mapping

Feature/144 akvo flow administration mapping

3506 of 4069 branches covered (86.16%)

Branch coverage included in aggregate %.

7238 of 8051 relevant lines covered (89.9%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.11
backend/utils/custom_generator.py
1
import os
1✔
2
import sqlite3
1✔
3
import pandas as pd
1✔
4
import logging
1✔
5
from django.conf import settings
1✔
6
from mis.settings import MASTER_DATA, STORAGE_PATH, COUNTRY_NAME
1✔
7
from api.v1.v1_profile.models import Administration
1✔
8

9
logger = logging.getLogger(__name__)
1✔
10

11

12
def generate_sqlite(model, test: bool = False):
1✔
13
    if not test:
1✔
14
        test = settings.TEST_ENV
1✔
15
    table_name = model._meta.db_table
1✔
16
    field_names = [f.name for f in model._meta.fields]
1✔
17
    objects = model.objects.all()
1✔
18
    file_name = "{0}/{1}{2}.sqlite".format(
1✔
19
        MASTER_DATA,
20
        "test_" if test else "",
21
        table_name,
22
    )
23
    if os.path.exists(file_name):
1✔
24
        os.remove(file_name)
1✔
25
    data = pd.DataFrame(list(objects.values(*field_names)))
1✔
26
    no_rows = data.shape[0]
1✔
27
    if no_rows < 1:
1✔
28
        return
1✔
29
    # Add full_path_name for Administration model
30
    if model.__name__ == "Administration":
1✔
31
        # Get all Administration objects with their full_path_name property
32
        full_path_names = {
1✔
33
            obj.id: obj.full_path_name.replace("|", " - ") for obj in objects
34
        }
35
        # Add the full_path_name column to the DataFrame
36
        data["full_path_name"] = data["id"].apply(
1✔
37
            lambda id_: full_path_names.get(id_, "")
38
        )
39

40
    if "parent" in field_names:
1✔
41
        data["parent"] = data["parent"].apply(
1✔
42
            lambda x: int(x) if x == x else 0
43
        )
44
    elif "administration" in field_names:
1✔
45
        data["parent"] = data["administration"].apply(
1✔
46
            lambda x: int(x) if x == x else 0
47
        )
48
    else:
49
        data["parent"] = 0
1✔
50
    conn = sqlite3.connect(file_name)
1✔
51
    data.to_sql("nodes", conn, if_exists="replace", index=False)
1✔
52
    conn.close()
1✔
53
    return file_name
1✔
54

55

56
def update_sqlite(model, data, id=None):
1✔
57
    test = settings.TEST_ENV
1✔
58
    table_name = model._meta.db_table
1✔
59
    fields = data.keys()
1✔
60
    field_names = ", ".join([f for f in fields])
1✔
61
    placeholders = ", ".join(["?" for _ in range(len(fields))])
1✔
62
    update_placeholders = ", ".join([f"{f} = ?" for f in fields])
1✔
63
    params = list(data.values())
1✔
64
    if id:
1✔
65
        params += [id]
1✔
66
    file_name = "{0}/{1}{2}.sqlite".format(
1✔
67
        MASTER_DATA,
68
        "test_" if test else "",
69
        table_name,
70
    )
71
    conn = sqlite3.connect(file_name)
1✔
72
    try:
1✔
73
        with conn:
1✔
74
            c = conn.cursor()
1✔
75
            if id:
1✔
76
                c.execute("SELECT * FROM nodes WHERE id = ?", (id,))
1✔
77
                if c.fetchone():
1✔
78
                    query = f"UPDATE nodes \
1✔
79
                        SET {update_placeholders} WHERE id = ?"
80
                    c.execute(query, params)
1✔
81
            if not id:
1✔
82
                query = f"INSERT INTO nodes({field_names}) \
1✔
83
                    VALUES ({placeholders})"
84
                c.execute(query, params)
1✔
85
    except sqlite3.OperationalError:
1✔
86
        generate_sqlite(model=model, test=test)
1✔
87
    finally:
88
        conn.close()
1✔
89

90

91
def administration_csv_add(data: dict):
1✔
92
    test = settings.TEST_ENV
1✔
93
    filename = "{0}-administration.csv".format(
1✔
94
        "test" if test else COUNTRY_NAME
95
    )
96
    filepath = f"{STORAGE_PATH}/master_data/{filename}"
1✔
97
    if os.path.exists(filepath):
1✔
98
        df = pd.read_csv(filepath)
1✔
99
        new_data = {}
1✔
100
        if data.path:
1!
101
            parent_ids = list(filter(lambda path: path, data.path.split(".")))
1✔
102
            parents = Administration.objects.filter(
1✔
103
                pk__in=parent_ids, level__id__gt=1
104
            ).all()
105
            for p in parents:
1✔
106
                new_data[p.level.name.lower()] = p.name
1✔
107
                new_data[f"{p.level.name.lower()}_id"] = p.id
1✔
108
        new_data[data.level.name.lower()] = data.name
1✔
109
        new_data[f"{data.level.name.lower()}_id"] = data.id
1✔
110
        new_df = pd.DataFrame([new_data])
1✔
111
        df = pd.concat([df, new_df], ignore_index=True)
1✔
112
        df.to_csv(filepath, index=False)
1✔
113
        return filepath
1✔
114
    else:
115
        logger.error(
116
            {
117
                "context": "insert_administration_row_csv",
118
                "message": (
119
                    f"{('test' if test else COUNTRY_NAME)}-administration.csv"
120
                    " doesn't exist"
121
                ),
122
            }
123
        )  # pragma: no cover
124
    return None
×
125

126

127
def find_index_by_id(df, id):
1✔
128
    for idx, row in df.iterrows():
1✔
129
        last_non_null_col = row.last_valid_index()
1✔
130
        last_non_null_value = row[last_non_null_col]
1✔
131
        if last_non_null_value == id:
1✔
132
            return idx
1✔
133
    return None
1✔
134

135

136
def administration_csv_update(data: dict):
1✔
137
    test = settings.TEST_ENV
1✔
138
    filename = "{0}-administration.csv".format(
1✔
139
        "test" if test else COUNTRY_NAME
140
    )
141
    filepath = f"{STORAGE_PATH}/master_data/{filename}"
1✔
142
    if os.path.exists(filepath):
1✔
143
        df = pd.read_csv(filepath)
1✔
144
        index = find_index_by_id(df=df, id=data.pk)
1✔
145
        if index is not None:
1✔
146
            if data.path:
1!
147
                parent_ids = list(
1✔
148
                    filter(lambda path: path, data.path.split("."))
149
                )
150
                parents = Administration.objects.filter(
1✔
151
                    pk__in=parent_ids, level__id__gt=1
152
                ).all()
153
                for p in parents:
1✔
154
                    df.loc[index, p.level.name.lower()] = p.name
1✔
155
                    df.loc[index, f"{p.level.name.lower()}_id"] = p.id
1✔
156
            df.loc[index, data.level.name.lower()] = data.name
1✔
157
            df.loc[index, f"{data.level.name.lower()}_id"] = data.id
1✔
158
        df.to_csv(filepath, index=False)
1✔
159
        return filepath
1✔
160
    else:
161
        logger.error(
162
            {
163
                "context": "update_administration_row_csv",
164
                "message": f"{filename} doesn't exist",
165
            }
166
        )  # pragma: no cover
167
    return None
×
168

169

170
def administration_csv_delete(id: int):
1✔
171
    test = settings.TEST_ENV
1✔
172
    filename = "{0}-administration.csv".format(
1✔
173
        "test" if test else COUNTRY_NAME
174
    )
175
    filepath = f"{STORAGE_PATH}/master_data/{filename}"
1✔
176
    if os.path.exists(filepath):
1✔
177
        df = pd.read_csv(filepath)
1✔
178
        ix = find_index_by_id(df=df, id=id)
1✔
179
        if ix is not None:
1✔
180
            df.drop(index=ix, inplace=True)
1✔
181
        df.to_csv(filepath, index=False)
1✔
182
        return filepath
1✔
183
    else:
184
        logger.error(
185
            {
186
                "context": "delete_administration_row_csv",
187
                "message": f"{filename} doesn't exist",
188
            }
189
        )  # pragma: no cover
190
    return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc