• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / akvo-mis / #536

26 Jan 2026 09:37AM UTC coverage: 88.32% (+0.03%) from 88.291%
#536

push

coveralls-python

ifirmawan
[#150] feat: Filter invalid_records when question has existing answer

- Add invalid_records parameter to bulk_create_answers function
- Filter out invalid records if question already has existing answer
- Update flow_data_seeder to pass invalid records for filtering
- Update process_data_rows to pass invalid records for filtering
- Add test cases for invalid_records filtering functionality

This ensures that when manual answers are entered, re-running the seeder
will remove those questions from the invalid values CSV.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

3877 of 4511 branches covered (85.95%)

Branch coverage included in aggregate %.

7995 of 8931 relevant lines covered (89.52%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.25
backend/utils/seeder_data_processor.py
1
"""
2
Seeder Data Processing Module
3

4
This module provides data processing functionality for Flow Complete Seeder.
5
"""
6

7
import logging
1✔
8
from typing import Dict, Optional, Any, List, Tuple
1✔
9

10
import pandas as pd
1✔
11

12
from api.v1.v1_data.models import FormData
1✔
13
from api.v1.v1_forms.models import QuestionTypes, Forms
1✔
14

15
from .seeder_config import (
1✔
16
    CsvColumns,
17
    SeederConfig,
18
    FLOW_PREFIX,
19
)
20
from .seeder_answer_processor import AnswerProcessor
1✔
21

22
logger = logging.getLogger(__name__)
1✔
23

24

25
# =============================================================================
26
# Data Processing - UNIFIED GENERIC METHODS
27
# =============================================================================
28

29

30
def process_data_rows(
1✔
31
    df: pd.DataFrame,
32
    config: SeederConfig,
33
    questions: Dict[int, Any],
34
    administration_id: int,
35
    parent: Optional[FormData] = None,
36
    is_parent: bool = True,
37
    existing_records: Optional[Dict[int, int]] = [],
38
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
39
    """Generic method to process data rows (parent or child).
40

41
    This unified method eliminates code duplication by handling both parent
42
    and child record processing with parameterization.
43

44
    Args:
45
        df: DataFrame containing rows to process
46
        config: SeederConfig instance
47
        questions: Dictionary mapping question ID to Question object
48
        administration_id: Administration ID for all rows
49
        parent: Parent FormData (for child records only)
50
        is_parent: Whether processing parent records
51
        existing_records: Dict mapping flow_data_id to mis_data_id
52

53
    Returns:
54
        List of dictionaries containing flow_data_id and mis_data_id
55
    """
56
    answer_processor = AnswerProcessor()
1✔
57
    seeded_records = []
1✔
58
    invalid_answers = []
1✔
59

60
    for _, row in df.iterrows():
1✔
61
        try:
1✔
62
            # Prepare and create answers
63
            answers, row_invalid_answers = prepare_answer_data(
1✔
64
                row=row,
65
                questions=questions,
66
                administration_id=administration_id,
67
                answer_processor=answer_processor,
68
            )
69

70
            # Create child FormData
71
            datapoint_id = str(row[CsvColumns.DATAPOINT_ID])
1✔
72
            parent_pk = parent.pk if parent else None
1✔
73

74
            # Find matching existing record
75
            matching = [
1✔
76
                er for er in (existing_records or [])
77
                if datapoint_id in er.name and er.parent_id == parent_pk
78
            ]
79

80
            existing_record = matching[0] if matching else None
1✔
81
            form_data = create_form_data(
1✔
82
                row=row,
83
                user=config.user,
84
                administration_id=administration_id,
85
                parent=parent,
86
                existing_record=existing_record,
87
            )
88

89
            if not form_data:
1!
90
                continue
×
91

92
            is_incomplete = True
1✔
93
            if len(answers):
1!
94
                # Pass row_invalid_answers to
95
                # filter out records with existing answers
96
                bulk_create_answers(
1✔
97
                    form_data, answers, config.user, row_invalid_answers
98
                )
99
                is_incomplete = False
1✔
100

101
            # Extend invalid_answers after filtering by bulk_create_answers
102
            invalid_answers.extend(row_invalid_answers)
1✔
103

104
            seeded_records.append(
1✔
105
                {
106
                    "flow_data_id": row[CsvColumns.DATAPOINT_ID],
107
                    "mis_data_id": form_data.pk,
108
                    "is_new": existing_record is None,
109
                    "is_incomplete": is_incomplete,
110
                }
111
            )
112

113
        except Exception as e:
×
114
            logger.error(
×
115
                f"Error processing {'parent' if is_parent else 'child'} "
116
                f"row {row[CsvColumns.DATAPOINT_ID]}: {e}"
117
            )
118
            logger.exception(
×
119
                f"Error processing {'parent' if is_parent else 'child'} "
120
                f"row {row[CsvColumns.DATAPOINT_ID]}"
121
            )
122
            continue
×
123

124
    return seeded_records, invalid_answers
1✔
125

126

127
def process_child_data_for_parent(
1✔
128
    parent_row: pd.Series,
129
    config: SeederConfig,
130
    parent_form_data: FormData,
131
    child_data_groups_dict: Dict[int, pd.core.groupby.DataFrameGroupBy],
132
    child_questions_dict: Dict[int, Dict[int, Any]],
133
    existing_records: Optional[List[FormData]] = None,
134
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
135
    """Process all child rows for a given parent across multiple child forms.
136

137
    Supports multiple child forms where each child form has its own grouped
138
    DataFrame and questions.
139

140
    Args:
141
        parent_row: Parent row containing datapoint_id
142
        config: SeederConfig instance
143
        parent_form_data: Parent FormData instance
144
        child_data_groups_dict: Dict mapping form_id to grouped child dataframe
145
        child_questions_dict: Dict mapping form_id to questions
146
        existing_records: Optional list of existing child records
147

148
    Returns:
149
        Tuple of (results_list, invalid_answers_list)
150
    """
151
    # Use parent's datapoint_id to match children to parent
152
    # In Akvo Flow, monitoring submissions reference parent via
153
    # the 'parent' column (parent's datapoint_id), not identifier
154
    parent_datapoint_id = parent_row[CsvColumns.DATAPOINT_ID]
1✔
155
    all_results = []
1✔
156
    all_invalid = []
1✔
157

158
    for form_id, child_data_groups in child_data_groups_dict.items():
1✔
159
        child_questions = child_questions_dict.get(form_id, {})
1✔
160

161
        try:
1✔
162
            child_rows = child_data_groups.get_group(parent_datapoint_id)
1✔
163
        except KeyError:
1✔
164
            # No child rows for this parent in this form
165
            continue
1✔
166

167
        # Filter existing records for this form
168
        form_existing_records = [
1!
169
            r for r in (existing_records or [])
170
            if r.form_id == form_id
171
        ] if existing_records else None
172

173
        # Use generic process_data_rows method
174
        results, invalid = process_data_rows(
1✔
175
            df=child_rows,
176
            config=config,
177
            questions=child_questions,
178
            administration_id=parent_form_data.administration_id,
179
            parent=parent_form_data,
180
            is_parent=False,
181
            existing_records=form_existing_records,
182
        )
183

184
        # Add form_id to results for tracking
185
        for result in results:
1✔
186
            result['form_id'] = form_id
1✔
187

188
        all_results.extend(results)
1✔
189
        all_invalid.extend(invalid)
1✔
190

191
    return all_results, all_invalid
1✔
192

193

194
# =============================================================================
195
# Form Data Creation - GENERIC METHOD
196
# =============================================================================
197

198

199
def create_form_data(
1✔
200
    row: pd.Series,
201
    user,
202
    administration_id: int,
203
    parent: Optional[FormData] = None,
204
    existing_record: Optional[FormData] = None,
205
) -> Optional[FormData]:
206
    """Generic method to create FormData instance (parent or child).
207

208
    Args:
209
        row: Pandas Series containing row data
210
        user: User creating the record
211
        administration_id: Administration ID
212
        parent: Parent FormData (for child records only)
213

214
    Returns:
215
        Created or updated FormData instance or None if failed
216
    """
217
    try:
1✔
218
        geo_value = None
1✔
219
        uuid_value = row[CsvColumns.IDENTIFIER]
1✔
220
        if CsvColumns.GEO in row and pd.notna(row[CsvColumns.GEO]):
1✔
221
            geo_value = [
1✔
222
                float(g) for g in
223
                str(row[CsvColumns.GEO]).split("|")
224
            ]
225
        if parent and not geo_value:
1✔
226
            geo_value = parent.geo
1✔
227
            uuid_value = parent.uuid
1✔
228

229
        flow_data_id = int(row[CsvColumns.DATAPOINT_ID])
1✔
230

231
        # Sanitize name by replacing pipe characters
232
        dp_name = row[CsvColumns.NAME].replace("|", " - ")
1✔
233
        # Add FLOW-{flow_data_id} prefix to name
234
        dp_name = f"{FLOW_PREFIX}{flow_data_id} - {dp_name}"
1✔
235

236
        # Check if record already exists
237
        if existing_record:
1!
238
            # Update existing record
239
            existing_record.name = dp_name
×
240
            existing_record.administration_id = administration_id
×
241
            existing_record.geo = geo_value
×
242
            existing_record.created_by = user
×
243
            existing_record.submitter = row.get(CsvColumns.SUBMITTER, None)
×
244
            if parent:
×
245
                existing_record.parent = parent
×
246
            existing_record.save()
×
247
            logger.info(
×
248
                f"Updated existing FormData {existing_record.pk} "
249
                f"for flow_data_id {flow_data_id}"
250
            )
251
            return existing_record
×
252

253
        # Create new record
254
        new_data_id = None
1✔
255
        if not parent and flow_data_id:
1✔
256
            new_data_id = flow_data_id
1✔
257
        data = FormData.objects.create(
1✔
258
            id=new_data_id,
259
            form_id=row[CsvColumns.FORM_ID],
260
            uuid=uuid_value,
261
            name=dp_name,
262
            administration_id=administration_id,
263
            geo=geo_value,
264
            created_by=user,
265
            parent=parent,
266
            submitter=row.get(CsvColumns.SUBMITTER, None),
267
        )
268
        # Set created timestamp from source data
269
        data.created = row[CsvColumns.CREATED_AT]
1✔
270
        data.save()
1✔
271
        logger.info(
1✔
272
            f"Created new FormData {data.pk} "
273
            f"for flow_data_id {flow_data_id}"
274
        )
275
        # Save to datapoint json file if parent is None (Registration)
276
        if data.parent is None:
1✔
277
            data.save_to_file
1✔
278
        return data
1✔
279
    except Exception as e:
×
280
        logger.error(
×
281
            f"Error creating/updating FormData for row "
282
            f"{row[CsvColumns.DATAPOINT_ID]}: {e}"
283
        )
284
        return None
×
285

286

287
# =============================================================================
288
# Form Data Deletion (Reverting) - GENERIC METHODS
289
# =============================================================================
290

291
def revert_form_data(
1✔
292
    form: Forms
293
) -> int:
294
    """Generic method to revert all FormData for a given form.
295

296
    Args:
297
        form: Forms instance
298
    """
299
    form_data = form.form_form_data.filter(
1✔
300
        name__startswith=FLOW_PREFIX,
301
    )
302
    total_data = form_data.count()
1✔
303
    for data in form_data.all():
1✔
304
        data.children.all().delete(hard=True)
1✔
305
        data.delete(hard=True)
1✔
306
    return total_data + sum([d.children.count() for d in form_data.all()])
1✔
307

308
# =============================================================================
309
# Answer Processing - GENERIC METHODS
310
# =============================================================================
311

312

313
def prepare_answer_data(
1✔
314
    row: pd.Series,
315
    questions: Dict[int, Any],
316
    administration_id: Optional[int],
317
    answer_processor: AnswerProcessor,
318
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
319
    """Generic method to prepare answer data from a data row.
320

321
    This method works for both parent and child data without modification.
322

323
    Args:
324
        row: Pandas Series containing row data
325
        questions: Dictionary mapping question ID to Question object
326
        administration_id: Administration ID for admin-type questions
327
        answer_processor: AnswerProcessor instance
328

329
    Returns:
330
        List of dictionaries containing answer data
331
    """
332
    answer_records = []
1✔
333
    invalid_answers = []
1✔
334

335
    for question_id, question in questions.items():
1✔
336
        column_name = str(question_id)
1✔
337

338
        # Skip if value is NaN
339
        if pd.isna(row.get(column_name)):
1!
340
            continue
×
341

342
        row_value = row[column_name]
1✔
343

344
        # Process answer based on question type
345
        opt_list = []
1✔
346
        if question.type in [
1!
347
            QuestionTypes.option,
348
            QuestionTypes.multiple_option,
349
        ]:
350
            opt_list = question.options.values_list("value", flat=True)
×
351
            opt_list = list(opt_list)
×
352

353
        name, value, options = answer_processor.process(
1✔
354
            question_type=question.type,
355
            row_value=row_value,
356
            administration_id=administration_id,
357
            opt_list=opt_list,
358
        )
359

360
        if name is None and value is None and options is None:
1!
361
            invalid_answers.append({
×
362
                "mis_form_id": question.form_id,
363
                "mis_question_id": question.pk,
364
                "mis_question_type": QuestionTypes.FieldStr[question.type],
365
                "flow_data_id": row[CsvColumns.DATAPOINT_ID],
366
                "value": row_value,
367
            })
368
            # Skip invalid answer
369
            continue
×
370

371
        answer_records.append(
1✔
372
            {
373
                "question_id": question.pk,
374
                "name": name,
375
                "value": value,
376
                "options": options,
377
            }
378
        )
379

380
    return answer_records, invalid_answers
1✔
381

382

383
def bulk_create_answers(
1✔
384
    data: FormData,
385
    answer_records: List[Dict[str, Any]],
386
    user,
387
    invalid_records: Optional[List[Dict[str, Any]]] = None,
388
):
389
    """Generic method to bulk create or update answer records.
390

391
    Works for both parent and child FormData instances.
392
    Updates existing answers instead of deleting them to preserve
393
    manual input answers that are not in the seeder data.
394

395
    Args:
396
        data: FormData instance (parent or child)
397
        answer_records: List of answer data dictionaries
398
        user: User creating the answers
399
        invalid_records: Optional list of invalid answers to filter.
400
            Records are removed if the question already has an existing answer.
401
            Expected format: {"mis_question_id": int, ...}
402
    """
403
    # Get existing answers indexed by question_id
404
    existing_answers = {
1✔
405
        answer.question_id: answer
406
        for answer in data.data_answer.all()
407
    }
408

409
    # Filter out invalid_records if question already has existing answer
410
    if invalid_records is not None:
1✔
411
        invalid_records[:] = [
1✔
412
            r for r in invalid_records
413
            if r.get("mis_question_id") not in existing_answers
414
        ]
415

416
    if not answer_records:
1✔
417
        return
1✔
418

419
    AnswerModel = data.data_answer.model
1✔
420

421
    answers_to_create = []
1✔
422
    answers_to_update = []
1✔
423

424
    for a in answer_records:
1✔
425
        question_id = a["question_id"]
1✔
426

427
        if question_id in existing_answers:
1✔
428
            # Update existing answer
429
            existing = existing_answers[question_id]
1✔
430
            existing.value = a["value"]
1✔
431
            existing.options = a["options"]
1✔
432
            existing.name = a["name"]
1✔
433
            existing.created_by = user
1✔
434
            answers_to_update.append(existing)
1✔
435
        else:
436
            # Create new answer
437
            answers_to_create.append(
1✔
438
                AnswerModel(
439
                    data=data,
440
                    question_id=question_id,
441
                    value=a["value"],
442
                    options=a["options"],
443
                    name=a["name"],
444
                    created_by=user,
445
                )
446
            )
447

448
    # Bulk update existing answers
449
    if answers_to_update:
1✔
450
        AnswerModel.objects.bulk_update(
1✔
451
            answers_to_update,
452
            fields=["value", "options", "name", "created_by"]
453
        )
454

455
    # Bulk create new answers
456
    if answers_to_create:
1✔
457
        data.data_answer.bulk_create(answers_to_create)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc