• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / akvo-mis / #500

14 Jan 2026 07:26AM UTC coverage: 88.136% (-0.05%) from 88.186%
#500

push

coveralls-python

ifirmawan
[#150] feat: Temporarily show the Add button in AdministrationFilters

3680 of 4284 branches covered (85.9%)

Branch coverage included in aggregate %.

7612 of 8528 relevant lines covered (89.26%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.66
backend/utils/seeder_data_loader.py
1
"""
2
Seeder Data Loader Module
3

4
This module provides data loading functionality for Flow Complete Seeder.
5
"""
6

7
import logging
1✔
8
import os
1✔
9
from typing import Dict, Optional, Tuple
1✔
10

11
import pandas as pd
1✔
12
from django.db import transaction
1✔
13

14
from api.v1.v1_forms.models import Questions
1✔
15
from api.v1.v1_profile.models import Administration
1✔
16

17
from .seeder_config import (
1✔
18
    FilePaths,
19
    CsvColumns,
20
    NON_QUESTION_COLUMNS,
21
    DataLoadError,
22
    AdministrationMappingError,
23
    SeederConfig,
24
)
25

26
logger = logging.getLogger(__name__)
1✔
27

28

29
# =============================================================================
30
# Data Loading
31
# =============================================================================
32

33

34
def load_and_prepare_data(
1✔
35
    config: SeederConfig,
36
) -> Tuple[Optional[pd.DataFrame], Optional[pd.DataFrame]]:
37
    """Load and prepare data files.
38

39
    Args:
40
        config: SeederConfig instance
41

42
    Returns:
43
        Tuple of (parent_df, child_df) DataFrames
44
    """
45
    parent_df = load_data_file(
1✔
46
        config.flow_form_id,
47
        is_parent=True,
48
        config=config,
49
    )
50
    child_df = load_data_file(
1✔
51
        config.flow_form_id,
52
        is_parent=False,
53
        config=config,
54
    )
55

56
    # Apply limit if specified
57
    if config.limit:
1✔
58
        if parent_df is not None:
1✔
59
            parent_df = parent_df.head(config.limit)
1✔
60
        if child_df is not None and parent_df is not None:
1✔
61
            # Only include child rows for the limited parent datapoints
62
            # to avoid skipping child data for parents beyond the first N rows
63
            parent_datapoints = parent_df[CsvColumns.DATAPOINT_ID].unique()
1✔
64
            child_df = child_df[child_df[CsvColumns.DATAPOINT_ID].isin(
1✔
65
                parent_datapoints
66
            )]
67

68
    return parent_df, child_df
1✔
69

70

71
def load_data_file(
1✔
72
    flow_id: int,
73
    is_parent: bool,
74
    config: SeederConfig,
75
) -> Optional[pd.DataFrame]:
76
    """Load data file from CSV.
77

78
    Args:
79
        flow_id: Flow form ID
80
        is_parent: Whether loading parent or child data
81
        config: SeederConfig instance
82

83
    Returns:
84
        DataFrame with loaded data or None if file not found
85

86
    Raises:
87
        DataLoadError: If file cannot be loaded
88
    """
89
    csv_file = (
1✔
90
        f"{flow_id}_parent_data.csv"
91
        if is_parent
92
        else f"{flow_id}_child_data.csv"
93
    )
94
    csv_path = os.path.join(
1✔
95
        config.source_dir,
96
        FilePaths.OUTPUT_DIR,
97
        csv_file,
98
    )
99

100
    try:
1✔
101
        df = pd.read_csv(
1✔
102
            csv_path,
103
            encoding=config.encoding,
104
            low_memory=False,
105
        )
106
        logger.info(f"Loaded {len(df)} rows from {csv_file}")
1✔
107
        return df
1✔
108
    except FileNotFoundError:
1✔
109
        logger.warning(f"File not found: {csv_path}")
1✔
110
        return None
1✔
111
    except pd.errors.EmptyDataError:
1!
112
        logger.warning(f"File is empty: {csv_path}")
1✔
113
        return None
1✔
114
    except pd.errors.ParserError as e:
×
115
        raise DataLoadError(f"CSV parsing error in {csv_path}: {e}")
×
116
    except UnicodeDecodeError:
×
117
        raise DataLoadError(f"Encoding error in file: {csv_path}")
×
118

119

120
# =============================================================================
121
# Question Loading
122
# =============================================================================
123

124

125
def load_questions(df: Optional[pd.DataFrame]) -> Dict[int, Questions]:
1✔
126
    """Load questions from dataframe columns.
127

128
    Args:
129
        df: DataFrame to extract question IDs from
130

131
    Returns:
132
        Dictionary mapping question ID to Question object
133
    """
134
    if df is None or df.empty:
1✔
135
        return {}
1✔
136

137
    question_ids = [
1✔
138
        int(float(col))
139
        for col in df.columns
140
        if col not in NON_QUESTION_COLUMNS
141
    ]
142

143
    if not question_ids:
1✔
144
        return {}
1✔
145

146
    questions = Questions.objects.filter(pk__in=question_ids).all()
1✔
147

148
    return {q.pk: q for q in questions}
1✔
149

150

151
# =============================================================================
152
# Administration Mappings
153
# =============================================================================
154

155

156
def load_administration_mappings(
1✔
157
    config: SeederConfig,
158
) -> Dict[int, str]:
159
    """Load administration mapping values from CSV file.
160

161
    Args:
162
        config: SeederConfig instance
163

164
    Returns:
165
        Dictionary mapping flow_datapoint_id to mis_value
166

167
    Raises:
168
        AdministrationMappingError: If mapping file cannot be loaded
169
    """
170
    csv_path = os.path.join(
1✔
171
        config.source_dir,
172
        FilePaths.ADMINISTRATION_MAPPING,
173
    )
174

175
    try:
1✔
176
        df = pd.read_csv(
1✔
177
            csv_path,
178
            encoding=config.encoding,
179
            dtype={
180
                "flow_question_id": str,
181
                "mis_question_id": str,
182
            },
183
        )
184
        # Filter to only include rows with valid 'mis_value'
185
        df = df[df["mis_value"].notna() & (df["mis_value"] != "")]
1✔
186
        # Create a dict for adm[flow_datapoint_id] = mis_value
187
        adm_mappings = {
1✔
188
            int(row["flow_datapoint_id"]): row["mis_value"]
189
            for _, row in df.iterrows()
190
        }
191
        logger.info(
1✔
192
            f"Loaded {len(adm_mappings)} "
193
            f"administration mappings"
194
        )
195
        return adm_mappings
1✔
196
    except FileNotFoundError:
1✔
197
        logger.warning(
1✔
198
            f"Administration mapping file not found: {csv_path}"
199
        )
200
        return {}
1✔
201
    except pd.errors.EmptyDataError:
1✔
202
        logger.warning(
1✔
203
            f"Administration mapping file is empty: {csv_path}"
204
        )
205
        return {}
1✔
206
    except KeyError as e:
1✔
207
        raise AdministrationMappingError(f"CSV structure error: {e}")
1✔
208

209

210
def load_administration_db_mappings() -> Dict[str, str]:
1✔
211
    """Load administration mappings from database.
212

213
    Returns:
214
        Dictionary mapping administration name to ID
215

216
    Raises:
217
        AdministrationMappingError: If mappings cannot be loaded
218
    """
219
    try:
1✔
220
        adm_db_mappings = {
1✔
221
            adm.name: str(adm.id)
222
            for adm in Administration.objects.filter(
223
                parent__isnull=False
224
            ).only("id", "name")
225
        }
226
        logger.info(
1✔
227
            f"Loaded {len(adm_db_mappings)} "
228
            f"administration DB mappings"
229
        )
230
        return adm_db_mappings
1✔
231
    except Exception as e:
×
232
        raise AdministrationMappingError(
×
233
            f"Error loading DB mappings: {e}"
234
        )
235

236

237
def get_administration_id(
1✔
238
    row: pd.Series,
239
    adm_mappings: Dict[int, str],
240
    adm_db_mappings: Dict[str, str],
241
) -> Optional[int]:
242
    """Get administration ID from mappings.
243

244
    Args:
245
        row: Pandas Series containing row data
246
        adm_mappings: Flow datapoint to MIS administration mapping
247
        adm_db_mappings: Administration name to ID mapping
248

249
    Returns:
250
        Administration ID or None if not found
251
    """
252
    # Try flow datapoint mapping first
253
    administration_id = adm_mappings.get(
1✔
254
        int(row[CsvColumns.DATAPOINT_ID])
255
    )
256
    if administration_id:
1✔
257
        return int(administration_id)
1✔
258

259
    # Fall back to administration name mapping
260
    administration_id = adm_db_mappings.get(
1✔
261
        str(row[CsvColumns.ADMINISTRATION])
262
    )
263
    if administration_id:
1✔
264
        return int(administration_id)
1✔
265

266
    return None
1✔
267

268

269
# =============================================================================
270
# Revert Operations
271
# =============================================================================
272

273

274
def revert_seeded_file(
1✔
275
    flow_form_id: int,
276
    is_parent: bool,
277
    source_dir: str,
278
) -> bool:
279
    """Revert seeded data from a specific file.
280

281
    Args:
282
        flow_form_id: Form ID
283
        is_parent: Whether reverting parent or child data
284
        source_dir: Source directory path
285

286
    Returns:
287
        True if data was reverted, False otherwise
288
    """
289
    csv_file = (
×
290
        f"{flow_form_id}_child_data.csv"
291
        if not is_parent
292
        else f"{flow_form_id}_parent_data.csv"
293
    )
294
    seeded_csv_path = os.path.join(
×
295
        source_dir,
296
        FilePaths.SEEDED_DIR,
297
        csv_file,
298
    )
299

300
    try:
×
301
        seeded_df = pd.read_csv(seeded_csv_path, encoding="utf-8")
×
302
    except FileNotFoundError:
×
303
        logger.warning(f"Seeded file not found: {seeded_csv_path}")
×
304
        return False
×
305
    except pd.errors.EmptyDataError:
×
306
        logger.warning(f"Seeded file is empty: {seeded_csv_path}")
×
307
        return False
×
308

309
    if seeded_df.empty:
×
310
        logger.warning(f"No seeded data to revert in {csv_file}")
×
311
        return False
×
312

313
    # Bulk delete records
314
    from api.v1.v1_data.models import FormData
×
315

316
    mis_data_ids = seeded_df["mis_data_id"].tolist()
×
317
    with transaction.atomic():
×
318
        FormData.objects.filter(pk__in=mis_data_ids).delete(hard=True)
×
319

320
    record_type = "child" if not is_parent else "parent"
×
321
    logger.info(
×
322
        f"Successfully reverted {len(mis_data_ids)} "
323
        f"{record_type} records from {csv_file}"
324
    )
325

326
    # Set empty CSV to avoid re-reverting
327
    empty_df = pd.DataFrame(columns=seeded_df.columns)
×
328
    empty_df.to_csv(seeded_csv_path, index=False, encoding="utf-8")
×
329

330
    return True
×
331

332

333
def load_seeded_records(
1✔
334
    flow_form_id: int,
335
    is_parent: bool,
336
    source_dir: str,
337
) -> Dict[int, int]:
338
    """Load existing seeded records from CSV file.
339

340
    Args:
341
        flow_form_id: Flow form ID
342
        is_parent: Whether loading parent or child data
343
        source_dir: Source directory path
344

345
    Returns:
346
        Dictionary mapping flow_data_id to mis_data_id
347
    """
348
    csv_file = (
1✔
349
        f"{flow_form_id}_parent_data.csv"
350
        if is_parent
351
        else f"{flow_form_id}_child_data.csv"
352
    )
353
    seeded_csv_path = os.path.join(
1✔
354
        source_dir,
355
        FilePaths.SEEDED_DIR,
356
        csv_file,
357
    )
358

359
    try:
1✔
360
        seeded_df = pd.read_csv(seeded_csv_path, encoding="utf-8")
1✔
361
        if seeded_df.empty:
×
362
            logger.info(
×
363
                f"No existing seeded records in {csv_file}"
364
            )
365
            return {}
×
366

367
        seeded_records = {
×
368
            int(row["flow_data_id"]): int(row["mis_data_id"])
369
            for _, row in seeded_df.iterrows()
370
        }
371
        logger.info(
×
372
            f"Loaded {len(seeded_records)} existing "
373
            f"{'parent' if is_parent else 'child'} records "
374
            f"from {csv_file}"
375
        )
376
        return seeded_records
×
377
    except FileNotFoundError:
1✔
378
        logger.info(
1✔
379
            f"Seeded file not found: {seeded_csv_path}"
380
        )
381
        return {}
1✔
382
    except pd.errors.EmptyDataError:
1!
383
        logger.info(
1✔
384
            f"Seeded file is empty: {seeded_csv_path}"
385
        )
386
        return {}
1✔
387
    except KeyError as e:
×
388
        logger.error(f"CSV structure error: {e}")
×
389
        return {}
×
390
    except Exception as e:
×
391
        logger.error(f"Error loading seeded records: {e}")
×
392
        return {}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc