• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / rtmis / #1787

31 May 2024 03:58AM UTC coverage: 84.921% (-0.06%) from 84.982%
#1787

push

coveralls-python

ifirmawan
[#1519] Replace get adm w/ filter by pk in DownloadListSerializer

3025 of 3694 branches covered (81.89%)

Branch coverage included in aggregate %.

6814 of 7892 relevant lines covered (86.34%)

0.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.53
backend/utils/custom_generator.py
1
import os
1✔
2
import sqlite3
1✔
3
import pandas as pd
1✔
4
import logging
1✔
5
from rtmis.settings import MASTER_DATA, STORAGE_PATH
1✔
6
from api.v1.v1_profile.models import Administration
1✔
7

8
logger = logging.getLogger(__name__)
1✔
9

10

11
def generate_sqlite(model):
1✔
12
    table_name = model._meta.db_table
1✔
13
    field_names = [f.name for f in model._meta.fields]
1✔
14
    objects = model.objects.all()
1✔
15
    file_name = f"{MASTER_DATA}/{table_name}.sqlite"
1✔
16
    if os.path.exists(file_name):
1✔
17
        os.remove(file_name)
1✔
18
    data = pd.DataFrame(list(objects.values(*field_names)))
1✔
19
    no_rows = data.shape[0]
1✔
20
    if no_rows < 1:
1✔
21
        return
1✔
22
    if "parent" in field_names:
1✔
23
        data["parent"] = data["parent"].apply(
1✔
24
            lambda x: int(x) if x == x else 0
25
        )
26
    elif "administration" in field_names:
1✔
27
        data["parent"] = data["administration"].apply(
1✔
28
            lambda x: int(x) if x == x else 0
29
        )
30
    else:
31
        data["parent"] = 0
1✔
32
    conn = sqlite3.connect(file_name)
1✔
33
    data.to_sql("nodes", conn, if_exists="replace", index=False)
1✔
34
    conn.close()
1✔
35
    return file_name
1✔
36

37

38
def update_sqlite(model, data, id=None):
1✔
39
    table_name = model._meta.db_table
1✔
40
    fields = data.keys()
1✔
41
    field_names = ", ".join([f for f in fields])
1✔
42
    placeholders = ", ".join(["?" for _ in range(len(fields))])
1✔
43
    update_placeholders = ", ".join([f"{f} = ?" for f in fields])
1✔
44
    params = list(data.values())
1✔
45
    if id:
1✔
46
        params += [id]
1✔
47
    file_name = f"{MASTER_DATA}/{table_name}.sqlite"
1✔
48
    conn = sqlite3.connect(file_name)
1✔
49
    try:
1✔
50
        with conn:
1✔
51
            c = conn.cursor()
1✔
52
            if id:
1✔
53
                c.execute("SELECT * FROM nodes WHERE id = ?", (id,))
1✔
54
                if c.fetchone():
1!
55
                    query = f"UPDATE nodes \
1✔
56
                        SET {update_placeholders} WHERE id = ?"
57
                    c.execute(query, params)
1✔
58
            if not id:
1✔
59
                query = f"INSERT INTO nodes({field_names}) \
1✔
60
                    VALUES ({placeholders})"
61
                c.execute(query, params)
1✔
62
    except sqlite3.OperationalError:
1!
63
        generate_sqlite(model=model)
1✔
64
    except Exception as error:
×
65
        logger.error(
×
66
            {
67
                "context": "update_sqlite",
68
                "error": error,
69
                "table_name": table_name,
70
                "data": data,
71
                "id": id,
72
            }
73
        )
74
        conn.rollback()
×
75
    finally:
76
        conn.close()
1✔
77

78

79
def administration_csv_add(data: dict, test: bool = False):
1✔
80
    filename = "kenya-administration{0}.csv".format("_test" if test else "")
1✔
81
    filepath = f"{STORAGE_PATH}/master_data/{filename}"
1✔
82
    if os.path.exists(filepath):
1!
83
        df = pd.read_csv(filepath)
1✔
84
        new_data = {}
1✔
85
        if data.path:
1!
86
            parent_ids = list(filter(lambda path: path, data.path.split(".")))
1✔
87
            parents = Administration.objects.filter(
1✔
88
                pk__in=parent_ids, level__id__gt=1
89
            ).all()
90
            for p in parents:
1✔
91
                new_data[p.level.name.lower()] = p.name
1✔
92
                new_data[f"{p.level.name.lower()}_id"] = p.id
1✔
93
        new_data[data.level.name.lower()] = data.name
1✔
94
        new_data[f"{data.level.name.lower()}_id"] = data.id
1✔
95
        new_df = pd.DataFrame([new_data])
1✔
96
        df = pd.concat([df, new_df], ignore_index=True)
1✔
97
        df.to_csv(filepath, index=False)
1✔
98
        return filepath
1✔
99
    else:
100
        logger.error(
×
101
            {
102
                "context": "insert_administration_row_csv",
103
                "message": "kenya-administration_test.csv doesn't exists",
104
            }
105
        )
106
    return None
×
107

108

109
def find_index_by_id(df, id):
1✔
110
    for idx, row in df.iterrows():
1✔
111
        last_non_null_col = row.last_valid_index()
1✔
112
        last_non_null_value = row[last_non_null_col]
1✔
113
        if last_non_null_value == id:
1✔
114
            return idx
1✔
115
    return None
1✔
116

117

118
def administration_csv_update(data: dict, test: bool = False):
1✔
119
    filename = "kenya-administration{0}.csv".format("_test" if test else "")
1✔
120
    filepath = f"{STORAGE_PATH}/master_data/{filename}"
1✔
121
    if os.path.exists(filepath):
1!
122
        df = pd.read_csv(filepath)
1✔
123
        index = find_index_by_id(df=df, id=data.pk)
1✔
124
        if index is not None:
1✔
125
            if data.path:
1!
126
                parent_ids = list(
1✔
127
                    filter(lambda path: path, data.path.split("."))
128
                )
129
                parents = Administration.objects.filter(
1✔
130
                    pk__in=parent_ids, level__id__gt=1
131
                ).all()
132
                for p in parents:
1✔
133
                    df.loc[index, p.level.name.lower()] = p.name
1✔
134
                    df.loc[index, f"{p.level.name.lower()}_id"] = p.id
1✔
135
            df.loc[index, data.level.name.lower()] = data.name
1✔
136
            df.loc[index, f"{data.level.name.lower()}_id"] = data.id
1✔
137
        df.to_csv(filepath, index=False)
1✔
138
        return filepath
1✔
139
    else:
140
        logger.error(
×
141
            {
142
                "context": "update_administration_row_csv",
143
                "message": "kenya-administration_test.csv doesn't exists",
144
            }
145
        )
146
    return None
×
147

148

149
def administration_csv_delete(id: int, test: bool = False):
1✔
150
    filename = "kenya-administration{0}.csv".format("_test" if test else "")
1✔
151
    filepath = f"{STORAGE_PATH}/master_data/{filename}"
1✔
152
    if os.path.exists(filepath):
1!
153
        df = pd.read_csv(filepath)
1✔
154
        ix = find_index_by_id(df=df, id=id)
1✔
155
        if ix is not None:
1✔
156
            df.drop(index=ix, inplace=True)
1✔
157
        df.to_csv(filepath, index=False)
1✔
158
        return filepath
1✔
159
    else:
160
        logger.error(
×
161
            {
162
                "context": "delete_administration_row_csv",
163
                "message": "kenya-administration_test.csv doesn't exists",
164
            }
165
        )
166
    return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc