• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / akvo-mis / #659

09 Jun 2026 05:00AM UTC coverage: 88.618% (+0.6%) from 88.049%
#659

push

coveralls-python

web-flow
Merge 01a05f330 into 08ec1c106

5354 of 6205 branches covered (86.29%)

Branch coverage included in aggregate %.

10226 of 11376 relevant lines covered (89.89%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.68
backend/utils/seeder_data_processor.py
1
"""
2
Seeder Data Processing Module
3

4
This module provides data processing functionality for Flow Complete Seeder.
5
"""
6

7
import logging
1✔
8
from typing import Dict, Optional, Any, List, Tuple
1✔
9

10
import pandas as pd
1✔
11

12
from api.v1.v1_data.models import FormData
1✔
13
from api.v1.v1_forms.models import QuestionTypes, Forms
1✔
14

15
from .seeder_config import (
1✔
16
    CsvColumns,
17
    SeederConfig,
18
    FLOW_PREFIX,
19
)
20
from .seeder_answer_processor import AnswerProcessor
1✔
21

22
logger = logging.getLogger(__name__)
1✔
23

24

25
# =============================================================================
26
# Data Processing - UNIFIED GENERIC METHODS
27
# =============================================================================
28

29

30
def process_data_rows(
1✔
31
    df: pd.DataFrame,
32
    config: SeederConfig,
33
    questions: Dict[int, Any],
34
    administration_id: int,
35
    parent: Optional[FormData] = None,
36
    is_parent: bool = True,
37
    existing_records: Optional[Dict[int, int]] = [],
38
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
39
    """Generic method to process data rows (parent or child).
40

41
    This unified method eliminates code duplication by handling both parent
42
    and child record processing with parameterization.
43

44
    Args:
45
        df: DataFrame containing rows to process
46
        config: SeederConfig instance
47
        questions: Dictionary mapping question ID to Question object
48
        administration_id: Administration ID for all rows
49
        parent: Parent FormData (for child records only)
50
        is_parent: Whether processing parent records
51
        existing_records: Dict mapping flow_data_id to mis_data_id
52

53
    Returns:
54
        List of dictionaries containing flow_data_id and mis_data_id
55
    """
56
    answer_processor = AnswerProcessor()
1✔
57
    seeded_records = []
1✔
58
    invalid_answers = []
1✔
59

60
    for _, row in df.iterrows():
1✔
61
        try:
1✔
62
            # Prepare and create answers
63
            answers, row_invalid_answers = prepare_answer_data(
1✔
64
                row=row,
65
                questions=questions,
66
                administration_id=administration_id,
67
                answer_processor=answer_processor,
68
            )
69

70
            # Create child FormData
71
            datapoint_id = str(row[CsvColumns.DATAPOINT_ID])
1✔
72
            parent_pk = parent.pk if parent else None
1✔
73

74
            # Find matching existing record
75
            matching = [
1✔
76
                er for er in (existing_records or [])
77
                if datapoint_id in er.name and er.parent_id == parent_pk
78
            ]
79

80
            existing_record = matching[0] if matching else None
1✔
81
            form_data = create_form_data(
1✔
82
                row=row,
83
                user=config.user,
84
                administration_id=administration_id,
85
                parent=parent,
86
                existing_record=existing_record,
87
            )
88

89
            if not form_data:
1!
90
                continue
×
91

92
            is_incomplete = True
1✔
93
            if len(answers):
1!
94
                # Pass row_invalid_answers to
95
                # filter out records with existing answers
96
                bulk_create_answers(
1✔
97
                    form_data, answers, config.user, row_invalid_answers
98
                )
99
                is_incomplete = False
1✔
100

101
            # Extend invalid_answers after filtering by bulk_create_answers
102
            invalid_answers.extend(row_invalid_answers)
1✔
103

104
            seeded_records.append(
1✔
105
                {
106
                    "flow_data_id": row[CsvColumns.DATAPOINT_ID],
107
                    "mis_data_id": form_data.pk,
108
                    "is_new": existing_record is None,
109
                    "is_incomplete": is_incomplete,
110
                }
111
            )
112

113
        except Exception as e:
×
114
            logger.error(
×
115
                f"Error processing {'parent' if is_parent else 'child'} "
116
                f"row {row[CsvColumns.DATAPOINT_ID]}: {e}"
117
            )
118
            logger.exception(
×
119
                f"Error processing {'parent' if is_parent else 'child'} "
120
                f"row {row[CsvColumns.DATAPOINT_ID]}"
121
            )
122
            continue
×
123

124
    return seeded_records, invalid_answers
1✔
125

126

127
def process_child_data_for_parent(
1✔
128
    parent_row: pd.Series,
129
    config: SeederConfig,
130
    parent_form_data: FormData,
131
    child_data_groups_dict: Dict[int, pd.core.groupby.DataFrameGroupBy],
132
    child_questions_dict: Dict[int, Dict[int, Any]],
133
    existing_records: Optional[List[FormData]] = None,
134
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
135
    """Process all child rows for a given parent across multiple child forms.
136

137
    Supports multiple child forms where each child form has its own grouped
138
    DataFrame and questions.
139

140
    Args:
141
        parent_row: Parent row containing datapoint_id
142
        config: SeederConfig instance
143
        parent_form_data: Parent FormData instance
144
        child_data_groups_dict: Dict mapping form_id to grouped child dataframe
145
        child_questions_dict: Dict mapping form_id to questions
146
        existing_records: Optional list of existing child records
147

148
    Returns:
149
        Tuple of (results_list, invalid_answers_list)
150
    """
151
    # Use parent's datapoint_id to match children to parent
152
    # In Akvo Flow, monitoring submissions reference parent via
153
    # the 'parent' column (parent's datapoint_id), not identifier
154
    parent_datapoint_id = parent_row[CsvColumns.DATAPOINT_ID]
1✔
155
    all_results = []
1✔
156
    all_invalid = []
1✔
157

158
    for form_id, child_data_groups in child_data_groups_dict.items():
1✔
159
        child_questions = child_questions_dict.get(form_id, {})
1✔
160

161
        try:
1✔
162
            child_rows = child_data_groups.get_group(parent_datapoint_id)
1✔
163
        except KeyError:
1✔
164
            # No child rows for this parent in this form
165
            continue
1✔
166

167
        # Filter existing records for this form
168
        form_existing_records = [
1!
169
            r for r in (existing_records or [])
170
            if r.form_id == form_id
171
        ] if existing_records else None
172

173
        # Use generic process_data_rows method
174
        results, invalid = process_data_rows(
1✔
175
            df=child_rows,
176
            config=config,
177
            questions=child_questions,
178
            administration_id=parent_form_data.administration_id,
179
            parent=parent_form_data,
180
            is_parent=False,
181
            existing_records=form_existing_records,
182
        )
183

184
        # Add form_id to results for tracking
185
        for result in results:
1✔
186
            result['form_id'] = form_id
1✔
187

188
        all_results.extend(results)
1✔
189
        all_invalid.extend(invalid)
1✔
190

191
    return all_results, all_invalid
1✔
192

193

194
# =============================================================================
195
# Form Data Creation - GENERIC METHOD
196
# =============================================================================
197

198

199
def create_form_data(
1✔
200
    row: pd.Series,
201
    user,
202
    administration_id: int,
203
    parent: Optional[FormData] = None,
204
    existing_record: Optional[FormData] = None,
205
) -> Optional[FormData]:
206
    """Generic method to create FormData instance (parent or child).
207

208
    Args:
209
        row: Pandas Series containing row data
210
        user: User creating the record
211
        administration_id: Administration ID
212
        parent: Parent FormData (for child records only)
213

214
    Returns:
215
        Created or updated FormData instance or None if failed
216
    """
217
    try:
1✔
218
        geo_value = None
1✔
219
        uuid_value = row[CsvColumns.IDENTIFIER]
1✔
220
        if CsvColumns.GEO in row and pd.notna(row[CsvColumns.GEO]):
1✔
221
            geo_value = [
1✔
222
                float(g) for g in
223
                str(row[CsvColumns.GEO]).split("|")
224
            ]
225
        if parent and not geo_value:
1✔
226
            geo_value = parent.geo
1✔
227
            uuid_value = parent.uuid
1✔
228

229
        flow_data_id = int(row[CsvColumns.DATAPOINT_ID])
1✔
230

231
        # Sanitize name by replacing pipe characters
232
        dp_name = row[CsvColumns.NAME].replace("|", " - ")
1✔
233
        # Add FLOW-{flow_data_id} prefix to name
234
        dp_name = f"{FLOW_PREFIX}{flow_data_id} - {dp_name}"
1✔
235

236
        # Check if record already exists
237
        if existing_record:
1!
238
            # Update existing record
239
            existing_record.name = dp_name
×
240
            existing_record.administration_id = administration_id
×
241
            existing_record.geo = geo_value
×
242
            existing_record.submitter = row.get(CsvColumns.SUBMITTER, None)
×
243
            if parent:
×
244
                existing_record.parent = parent
×
245
            existing_record.save()
×
246
            logger.info(
×
247
                f"Updated existing FormData {existing_record.pk} "
248
                f"for flow_data_id {flow_data_id}"
249
            )
250
            return existing_record
×
251

252
        # Create new record
253
        new_data_id = None
1✔
254
        if not parent and flow_data_id:
1✔
255
            new_data_id = flow_data_id
1✔
256
        data = FormData.objects.create(
1✔
257
            id=new_data_id,
258
            form_id=row[CsvColumns.FORM_ID],
259
            uuid=uuid_value,
260
            name=dp_name,
261
            administration_id=administration_id,
262
            geo=geo_value,
263
            created_by=user,
264
            parent=parent,
265
            submitter=row.get(CsvColumns.SUBMITTER, None),
266
        )
267
        # Set created timestamp from source data
268
        data.created = row[CsvColumns.CREATED_AT]
1✔
269
        data.save()
1✔
270
        logger.info(
1✔
271
            f"Created new FormData {data.pk} "
272
            f"for flow_data_id {flow_data_id}"
273
        )
274
        # Save to datapoint json file if parent is None (Registration)
275
        if data.parent is None:
1✔
276
            data.save_to_file
1✔
277
        return data
1✔
278
    except Exception as e:
×
279
        logger.error(
×
280
            f"Error creating/updating FormData for row "
281
            f"{row[CsvColumns.DATAPOINT_ID]}: {e}"
282
        )
283
        return None
×
284

285

286
# =============================================================================
287
# Form Data Deletion (Reverting) - GENERIC METHODS
288
# =============================================================================
289

290
def revert_form_data(
1✔
291
    form: Forms
292
) -> int:
293
    """Generic method to revert all FormData for a given form.
294

295
    Args:
296
        form: Forms instance
297
    """
298
    form_data = form.form_form_data.filter(
1✔
299
        name__startswith=FLOW_PREFIX,
300
    )
301
    total_data = form_data.count()
1✔
302
    for data in form_data.all():
1✔
303
        data.children.all().delete(hard=True)
1✔
304
        data.delete(hard=True)
1✔
305
    return total_data + sum([d.children.count() for d in form_data.all()])
1✔
306

307
# =============================================================================
308
# Answer Processing - GENERIC METHODS
309
# =============================================================================
310

311

312
def prepare_answer_data(
1✔
313
    row: pd.Series,
314
    questions: Dict[int, Any],
315
    administration_id: Optional[int],
316
    answer_processor: AnswerProcessor,
317
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
318
    """Generic method to prepare answer data from a data row.
319

320
    This method works for both parent and child data without modification.
321

322
    Args:
323
        row: Pandas Series containing row data
324
        questions: Dictionary mapping question ID to Question object
325
        administration_id: Administration ID for admin-type questions
326
        answer_processor: AnswerProcessor instance
327

328
    Returns:
329
        List of dictionaries containing answer data
330
    """
331
    answer_records = []
1✔
332
    invalid_answers = []
1✔
333

334
    for question_id, question in questions.items():
1✔
335
        column_name = str(question_id)
1✔
336

337
        # Skip if value is NaN
338
        if pd.isna(row.get(column_name)):
1!
339
            continue
×
340

341
        row_value = row[column_name]
1✔
342

343
        # Process answer based on question type
344
        opt_list = []
1✔
345
        if question.type in [
1!
346
            QuestionTypes.option,
347
            QuestionTypes.multiple_option,
348
        ]:
349
            opt_list = question.options.values_list("value", flat=True)
×
350
            opt_list = list(opt_list)
×
351

352
        name, value, options = answer_processor.process(
1✔
353
            question_type=question.type,
354
            row_value=row_value,
355
            administration_id=administration_id,
356
            opt_list=opt_list,
357
            extra=question.extra,
358
        )
359

360
        if name is None and value is None and options is None:
1!
361
            invalid_answers.append({
×
362
                "mis_form_id": question.form_id,
363
                "mis_question_id": question.pk,
364
                "mis_question_type": QuestionTypes.FieldStr[question.type],
365
                "flow_data_id": row[CsvColumns.DATAPOINT_ID],
366
                "value": row_value,
367
            })
368
            # Skip invalid answer
369
            continue
×
370

371
        answer_records.append(
1✔
372
            {
373
                "question_id": question.pk,
374
                "name": name,
375
                "value": value,
376
                "options": options,
377
            }
378
        )
379

380
    return answer_records, invalid_answers
1✔
381

382

383
def bulk_create_answers(
1✔
384
    data: FormData,
385
    answer_records: List[Dict[str, Any]],
386
    user,
387
    invalid_records: Optional[List[Dict[str, Any]]] = None,
388
):
389
    """Generic method to bulk create or update answer records.
390

391
    Works for both parent and child FormData instances.
392
    Updates existing answers instead of deleting them to preserve
393
    manual input answers that are not in the seeder data.
394

395
    Args:
396
        data: FormData instance (parent or child)
397
        answer_records: List of answer data dictionaries
398
        user: User creating the answers
399
        invalid_records: Optional list of invalid answers to filter.
400
            Records are removed if the question already has an existing answer.
401
            Expected format: {"mis_question_id": int, ...}
402
    """
403
    # Get existing answers indexed by question_id
404
    existing_answers = {
1✔
405
        answer.question_id: answer
406
        for answer in data.data_answer.all()
407
    }
408

409
    # Filter out invalid_records if question already has existing answer
410
    if invalid_records is not None:
1✔
411
        invalid_records[:] = [
1✔
412
            r for r in invalid_records
413
            if r.get("mis_question_id") not in existing_answers
414
        ]
415

416
    if not answer_records:
1✔
417
        return
1✔
418

419
    AnswerModel = data.data_answer.model
1✔
420

421
    answers_to_create = []
1✔
422
    answers_to_update = []
1✔
423

424
    for a in answer_records:
1✔
425
        question_id = a["question_id"]
1✔
426

427
        if question_id in existing_answers:
1✔
428
            # Update existing answer
429
            existing = existing_answers[question_id]
1✔
430
            existing.value = a["value"]
1✔
431
            existing.options = a["options"]
1✔
432
            existing.name = a["name"]
1✔
433
            existing.created_by = user
1✔
434
            answers_to_update.append(existing)
1✔
435
        else:
436
            # Create new answer
437
            answers_to_create.append(
1✔
438
                AnswerModel(
439
                    data=data,
440
                    question_id=question_id,
441
                    value=a["value"],
442
                    options=a["options"],
443
                    name=a["name"],
444
                    created_by=user,
445
                )
446
            )
447

448
    # Bulk update existing answers
449
    if answers_to_update:
1✔
450
        AnswerModel.objects.bulk_update(
1✔
451
            answers_to_update,
452
            fields=["value", "options", "name", "created_by"]
453
        )
454

455
    # Bulk create new answers
456
    if answers_to_create:
1✔
457
        data.data_answer.bulk_create(answers_to_create)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc