• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / akvo-mis / #543

27 Jan 2026 02:58AM UTC coverage: 88.342% (+0.007%) from 88.335%
#543

push

coveralls-python

ifirmawan
[#151] fix: Remove unnecessary assignment of created_by

in create_form_data function for existing data

3878 of 4511 branches covered (85.97%)

Branch coverage included in aggregate %.

7996 of 8930 relevant lines covered (89.54%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.68
backend/utils/seeder_data_processor.py
1
"""
2
Seeder Data Processing Module
3

4
This module provides data processing functionality for Flow Complete Seeder.
5
"""
6

7
import logging
1✔
8
from typing import Dict, Optional, Any, List, Tuple
1✔
9

10
import pandas as pd
1✔
11

12
from api.v1.v1_data.models import FormData
1✔
13
from api.v1.v1_forms.models import QuestionTypes, Forms
1✔
14

15
from .seeder_config import (
1✔
16
    CsvColumns,
17
    SeederConfig,
18
    FLOW_PREFIX,
19
)
20
from .seeder_answer_processor import AnswerProcessor
1✔
21

22
logger = logging.getLogger(__name__)
1✔
23

24

25
# =============================================================================
26
# Data Processing - UNIFIED GENERIC METHODS
27
# =============================================================================
28

29

30
def process_data_rows(
1✔
31
    df: pd.DataFrame,
32
    config: SeederConfig,
33
    questions: Dict[int, Any],
34
    administration_id: int,
35
    parent: Optional[FormData] = None,
36
    is_parent: bool = True,
37
    existing_records: Optional[Dict[int, int]] = [],
38
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
39
    """Generic method to process data rows (parent or child).
40

41
    This unified method eliminates code duplication by handling both parent
42
    and child record processing with parameterization.
43

44
    Args:
45
        df: DataFrame containing rows to process
46
        config: SeederConfig instance
47
        questions: Dictionary mapping question ID to Question object
48
        administration_id: Administration ID for all rows
49
        parent: Parent FormData (for child records only)
50
        is_parent: Whether processing parent records
51
        existing_records: Dict mapping flow_data_id to mis_data_id
52

53
    Returns:
54
        List of dictionaries containing flow_data_id and mis_data_id
55
    """
56
    answer_processor = AnswerProcessor()
1✔
57
    seeded_records = []
1✔
58
    invalid_answers = []
1✔
59

60
    for _, row in df.iterrows():
1✔
61
        try:
1✔
62
            # Prepare and create answers
63
            answers, row_invalid_answers = prepare_answer_data(
1✔
64
                row=row,
65
                questions=questions,
66
                administration_id=administration_id,
67
                answer_processor=answer_processor,
68
            )
69

70
            # Create child FormData
71
            datapoint_id = str(row[CsvColumns.DATAPOINT_ID])
1✔
72
            parent_pk = parent.pk if parent else None
1✔
73

74
            # Find matching existing record
75
            matching = [
1✔
76
                er for er in (existing_records or [])
77
                if datapoint_id in er.name and er.parent_id == parent_pk
78
            ]
79

80
            existing_record = matching[0] if matching else None
1✔
81
            form_data = create_form_data(
1✔
82
                row=row,
83
                user=config.user,
84
                administration_id=administration_id,
85
                parent=parent,
86
                existing_record=existing_record,
87
            )
88

89
            if not form_data:
1!
90
                continue
×
91

92
            is_incomplete = True
1✔
93
            if len(answers):
1!
94
                # Pass row_invalid_answers to
95
                # filter out records with existing answers
96
                bulk_create_answers(
1✔
97
                    form_data, answers, config.user, row_invalid_answers
98
                )
99
                is_incomplete = False
1✔
100

101
            # Extend invalid_answers after filtering by bulk_create_answers
102
            invalid_answers.extend(row_invalid_answers)
1✔
103

104
            seeded_records.append(
1✔
105
                {
106
                    "flow_data_id": row[CsvColumns.DATAPOINT_ID],
107
                    "mis_data_id": form_data.pk,
108
                    "is_new": existing_record is None,
109
                    "is_incomplete": is_incomplete,
110
                }
111
            )
112

113
        except Exception as e:
×
114
            logger.error(
×
115
                f"Error processing {'parent' if is_parent else 'child'} "
116
                f"row {row[CsvColumns.DATAPOINT_ID]}: {e}"
117
            )
118
            logger.exception(
×
119
                f"Error processing {'parent' if is_parent else 'child'} "
120
                f"row {row[CsvColumns.DATAPOINT_ID]}"
121
            )
122
            continue
×
123

124
    return seeded_records, invalid_answers
1✔
125

126

127
def process_child_data_for_parent(
1✔
128
    parent_row: pd.Series,
129
    config: SeederConfig,
130
    parent_form_data: FormData,
131
    child_data_groups_dict: Dict[int, pd.core.groupby.DataFrameGroupBy],
132
    child_questions_dict: Dict[int, Dict[int, Any]],
133
    existing_records: Optional[List[FormData]] = None,
134
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
135
    """Process all child rows for a given parent across multiple child forms.
136

137
    Supports multiple child forms where each child form has its own grouped
138
    DataFrame and questions.
139

140
    Args:
141
        parent_row: Parent row containing datapoint_id
142
        config: SeederConfig instance
143
        parent_form_data: Parent FormData instance
144
        child_data_groups_dict: Dict mapping form_id to grouped child dataframe
145
        child_questions_dict: Dict mapping form_id to questions
146
        existing_records: Optional list of existing child records
147

148
    Returns:
149
        Tuple of (results_list, invalid_answers_list)
150
    """
151
    # Use parent's datapoint_id to match children to parent
152
    # In Akvo Flow, monitoring submissions reference parent via
153
    # the 'parent' column (parent's datapoint_id), not identifier
154
    parent_datapoint_id = parent_row[CsvColumns.DATAPOINT_ID]
1✔
155
    all_results = []
1✔
156
    all_invalid = []
1✔
157

158
    for form_id, child_data_groups in child_data_groups_dict.items():
1✔
159
        child_questions = child_questions_dict.get(form_id, {})
1✔
160

161
        try:
1✔
162
            child_rows = child_data_groups.get_group(parent_datapoint_id)
1✔
163
        except KeyError:
1✔
164
            # No child rows for this parent in this form
165
            continue
1✔
166

167
        # Filter existing records for this form
168
        form_existing_records = [
1!
169
            r for r in (existing_records or [])
170
            if r.form_id == form_id
171
        ] if existing_records else None
172

173
        # Use generic process_data_rows method
174
        results, invalid = process_data_rows(
1✔
175
            df=child_rows,
176
            config=config,
177
            questions=child_questions,
178
            administration_id=parent_form_data.administration_id,
179
            parent=parent_form_data,
180
            is_parent=False,
181
            existing_records=form_existing_records,
182
        )
183

184
        # Add form_id to results for tracking
185
        for result in results:
1✔
186
            result['form_id'] = form_id
1✔
187

188
        all_results.extend(results)
1✔
189
        all_invalid.extend(invalid)
1✔
190

191
    return all_results, all_invalid
1✔
192

193

194
# =============================================================================
195
# Form Data Creation - GENERIC METHOD
196
# =============================================================================
197

198

199
def create_form_data(
1✔
200
    row: pd.Series,
201
    user,
202
    administration_id: int,
203
    parent: Optional[FormData] = None,
204
    existing_record: Optional[FormData] = None,
205
) -> Optional[FormData]:
206
    """Generic method to create FormData instance (parent or child).
207

208
    Args:
209
        row: Pandas Series containing row data
210
        user: User creating the record
211
        administration_id: Administration ID
212
        parent: Parent FormData (for child records only)
213

214
    Returns:
215
        Created or updated FormData instance or None if failed
216
    """
217
    try:
1✔
218
        geo_value = None
1✔
219
        uuid_value = row[CsvColumns.IDENTIFIER]
1✔
220
        if CsvColumns.GEO in row and pd.notna(row[CsvColumns.GEO]):
1✔
221
            geo_value = [
1✔
222
                float(g) for g in
223
                str(row[CsvColumns.GEO]).split("|")
224
            ]
225
        if parent and not geo_value:
1✔
226
            geo_value = parent.geo
1✔
227
            uuid_value = parent.uuid
1✔
228

229
        flow_data_id = int(row[CsvColumns.DATAPOINT_ID])
1✔
230

231
        # Sanitize name by replacing pipe characters
232
        dp_name = row[CsvColumns.NAME].replace("|", " - ")
1✔
233
        # Add FLOW-{flow_data_id} prefix to name
234
        dp_name = f"{FLOW_PREFIX}{flow_data_id} - {dp_name}"
1✔
235

236
        # Check if record already exists
237
        if existing_record:
1!
238
            # Update existing record
239
            existing_record.name = dp_name
×
240
            existing_record.administration_id = administration_id
×
241
            existing_record.geo = geo_value
×
242
            existing_record.submitter = row.get(CsvColumns.SUBMITTER, None)
×
243
            if parent:
×
244
                existing_record.parent = parent
×
245
            existing_record.save()
×
246
            logger.info(
×
247
                f"Updated existing FormData {existing_record.pk} "
248
                f"for flow_data_id {flow_data_id}"
249
            )
250
            return existing_record
×
251

252
        # Create new record
253
        new_data_id = None
1✔
254
        if not parent and flow_data_id:
1✔
255
            new_data_id = flow_data_id
1✔
256
        data = FormData.objects.create(
1✔
257
            id=new_data_id,
258
            form_id=row[CsvColumns.FORM_ID],
259
            uuid=uuid_value,
260
            name=dp_name,
261
            administration_id=administration_id,
262
            geo=geo_value,
263
            created_by=user,
264
            parent=parent,
265
            submitter=row.get(CsvColumns.SUBMITTER, None),
266
        )
267
        # Set created timestamp from source data
268
        data.created = row[CsvColumns.CREATED_AT]
1✔
269
        data.save()
1✔
270
        logger.info(
1✔
271
            f"Created new FormData {data.pk} "
272
            f"for flow_data_id {flow_data_id}"
273
        )
274
        # Save to datapoint json file if parent is None (Registration)
275
        if data.parent is None:
1✔
276
            data.save_to_file
1✔
277
        return data
1✔
278
    except Exception as e:
×
279
        logger.error(
×
280
            f"Error creating/updating FormData for row "
281
            f"{row[CsvColumns.DATAPOINT_ID]}: {e}"
282
        )
283
        return None
×
284

285

286
# =============================================================================
287
# Form Data Deletion (Reverting) - GENERIC METHODS
288
# =============================================================================
289

290
def revert_form_data(
1✔
291
    form: Forms
292
) -> int:
293
    """Generic method to revert all FormData for a given form.
294

295
    Args:
296
        form: Forms instance
297
    """
298
    form_data = form.form_form_data.filter(
1✔
299
        name__startswith=FLOW_PREFIX,
300
    )
301
    total_data = form_data.count()
1✔
302
    for data in form_data.all():
1✔
303
        data.children.all().delete(hard=True)
1✔
304
        data.delete(hard=True)
1✔
305
    return total_data + sum([d.children.count() for d in form_data.all()])
1✔
306

307
# =============================================================================
308
# Answer Processing - GENERIC METHODS
309
# =============================================================================
310

311

312
def prepare_answer_data(
1✔
313
    row: pd.Series,
314
    questions: Dict[int, Any],
315
    administration_id: Optional[int],
316
    answer_processor: AnswerProcessor,
317
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
318
    """Generic method to prepare answer data from a data row.
319

320
    This method works for both parent and child data without modification.
321

322
    Args:
323
        row: Pandas Series containing row data
324
        questions: Dictionary mapping question ID to Question object
325
        administration_id: Administration ID for admin-type questions
326
        answer_processor: AnswerProcessor instance
327

328
    Returns:
329
        List of dictionaries containing answer data
330
    """
331
    answer_records = []
1✔
332
    invalid_answers = []
1✔
333

334
    for question_id, question in questions.items():
1✔
335
        column_name = str(question_id)
1✔
336

337
        # Skip if value is NaN
338
        if pd.isna(row.get(column_name)):
1!
339
            continue
×
340

341
        row_value = row[column_name]
1✔
342

343
        # Process answer based on question type
344
        opt_list = []
1✔
345
        if question.type in [
1!
346
            QuestionTypes.option,
347
            QuestionTypes.multiple_option,
348
        ]:
349
            opt_list = question.options.values_list("value", flat=True)
×
350
            opt_list = list(opt_list)
×
351

352
        name, value, options = answer_processor.process(
1✔
353
            question_type=question.type,
354
            row_value=row_value,
355
            administration_id=administration_id,
356
            opt_list=opt_list,
357
        )
358

359
        if name is None and value is None and options is None:
1!
360
            invalid_answers.append({
×
361
                "mis_form_id": question.form_id,
362
                "mis_question_id": question.pk,
363
                "mis_question_type": QuestionTypes.FieldStr[question.type],
364
                "flow_data_id": row[CsvColumns.DATAPOINT_ID],
365
                "value": row_value,
366
            })
367
            # Skip invalid answer
368
            continue
×
369

370
        answer_records.append(
1✔
371
            {
372
                "question_id": question.pk,
373
                "name": name,
374
                "value": value,
375
                "options": options,
376
            }
377
        )
378

379
    return answer_records, invalid_answers
1✔
380

381

382
def bulk_create_answers(
1✔
383
    data: FormData,
384
    answer_records: List[Dict[str, Any]],
385
    user,
386
    invalid_records: Optional[List[Dict[str, Any]]] = None,
387
):
388
    """Generic method to bulk create or update answer records.
389

390
    Works for both parent and child FormData instances.
391
    Updates existing answers instead of deleting them to preserve
392
    manual input answers that are not in the seeder data.
393

394
    Args:
395
        data: FormData instance (parent or child)
396
        answer_records: List of answer data dictionaries
397
        user: User creating the answers
398
        invalid_records: Optional list of invalid answers to filter.
399
            Records are removed if the question already has an existing answer.
400
            Expected format: {"mis_question_id": int, ...}
401
    """
402
    # Get existing answers indexed by question_id
403
    existing_answers = {
1✔
404
        answer.question_id: answer
405
        for answer in data.data_answer.all()
406
    }
407

408
    # Filter out invalid_records if question already has existing answer
409
    if invalid_records is not None:
1✔
410
        invalid_records[:] = [
1✔
411
            r for r in invalid_records
412
            if r.get("mis_question_id") not in existing_answers
413
        ]
414

415
    if not answer_records:
1✔
416
        return
1✔
417

418
    AnswerModel = data.data_answer.model
1✔
419

420
    answers_to_create = []
1✔
421
    answers_to_update = []
1✔
422

423
    for a in answer_records:
1✔
424
        question_id = a["question_id"]
1✔
425

426
        if question_id in existing_answers:
1✔
427
            # Update existing answer
428
            existing = existing_answers[question_id]
1✔
429
            existing.value = a["value"]
1✔
430
            existing.options = a["options"]
1✔
431
            existing.name = a["name"]
1✔
432
            existing.created_by = user
1✔
433
            answers_to_update.append(existing)
1✔
434
        else:
435
            # Create new answer
436
            answers_to_create.append(
1✔
437
                AnswerModel(
438
                    data=data,
439
                    question_id=question_id,
440
                    value=a["value"],
441
                    options=a["options"],
442
                    name=a["name"],
443
                    created_by=user,
444
                )
445
            )
446

447
    # Bulk update existing answers
448
    if answers_to_update:
1✔
449
        AnswerModel.objects.bulk_update(
1✔
450
            answers_to_update,
451
            fields=["value", "options", "name", "created_by"]
452
        )
453

454
    # Bulk create new answers
455
    if answers_to_create:
1✔
456
        data.data_answer.bulk_create(answers_to_create)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc