• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / akvo-mis / #535

22 Jan 2026 04:55AM UTC coverage: 88.291% (-0.03%) from 88.321%
#535

push

coveralls-python

ifirmawan
[#151] fix: Update flow data processing to use parent datapoint_id for child grouping and success status

3869 of 4503 branches covered (85.92%)

Branch coverage included in aggregate %.

7977 of 8914 relevant lines covered (89.49%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.01
backend/utils/seeder_data_processor.py
1
"""
2
Seeder Data Processing Module
3

4
This module provides data processing functionality for Flow Complete Seeder.
5
"""
6

7
import logging
1✔
8
from typing import Dict, Optional, Any, List, Tuple
1✔
9

10
import pandas as pd
1✔
11

12
from api.v1.v1_data.models import FormData
1✔
13
from api.v1.v1_forms.models import QuestionTypes, Forms
1✔
14

15
from .seeder_config import (
1✔
16
    CsvColumns,
17
    SeederConfig,
18
    FLOW_PREFIX,
19
)
20
from .seeder_answer_processor import AnswerProcessor
1✔
21

22
logger = logging.getLogger(__name__)
1✔
23

24

25
# =============================================================================
26
# Data Processing - UNIFIED GENERIC METHODS
27
# =============================================================================
28

29

30
def process_data_rows(
1✔
31
    df: pd.DataFrame,
32
    config: SeederConfig,
33
    questions: Dict[int, Any],
34
    administration_id: int,
35
    parent: Optional[FormData] = None,
36
    is_parent: bool = True,
37
    existing_records: Optional[Dict[int, int]] = [],
38
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
39
    """Generic method to process data rows (parent or child).
40

41
    This unified method eliminates code duplication by handling both parent
42
    and child record processing with parameterization.
43

44
    Args:
45
        df: DataFrame containing rows to process
46
        config: SeederConfig instance
47
        questions: Dictionary mapping question ID to Question object
48
        administration_id: Administration ID for all rows
49
        parent: Parent FormData (for child records only)
50
        is_parent: Whether processing parent records
51
        existing_records: Dict mapping flow_data_id to mis_data_id
52

53
    Returns:
54
        List of dictionaries containing flow_data_id and mis_data_id
55
    """
56
    answer_processor = AnswerProcessor()
1✔
57
    seeded_records = []
1✔
58
    invalid_answers = []
1✔
59

60
    for _, row in df.iterrows():
1✔
61
        try:
1✔
62
            # Prepare and create answers
63
            answers, row_invalid_answers = prepare_answer_data(
1✔
64
                row=row,
65
                questions=questions,
66
                administration_id=administration_id,
67
                answer_processor=answer_processor,
68
            )
69
            invalid_answers.extend(row_invalid_answers)
1✔
70

71
            # Create child FormData
72
            datapoint_id = str(row[CsvColumns.DATAPOINT_ID])
1✔
73
            parent_pk = parent.pk if parent else None
1✔
74

75
            # Find matching existing record
76
            matching = [
1✔
77
                er for er in (existing_records or [])
78
                if datapoint_id in er.name and er.parent_id == parent_pk
79
            ]
80

81
            existing_record = matching[0] if matching else None
1✔
82
            form_data = create_form_data(
1✔
83
                row=row,
84
                user=config.user,
85
                administration_id=administration_id,
86
                parent=parent,
87
                existing_record=existing_record,
88
            )
89

90
            if not form_data:
1!
91
                continue
×
92

93
            is_incomplete = True
1✔
94
            if len(answers):
1!
95
                bulk_create_answers(form_data, answers, config.user)
1✔
96
                is_incomplete = False
1✔
97

98
            seeded_records.append(
1✔
99
                {
100
                    "flow_data_id": row[CsvColumns.DATAPOINT_ID],
101
                    "mis_data_id": form_data.pk,
102
                    "is_new": existing_record is None,
103
                    "is_incomplete": is_incomplete,
104
                }
105
            )
106

107
        except Exception as e:
×
108
            logger.error(
×
109
                f"Error processing {'parent' if is_parent else 'child'} "
110
                f"row {row[CsvColumns.DATAPOINT_ID]}: {e}"
111
            )
112
            logger.exception(
×
113
                f"Error processing {'parent' if is_parent else 'child'} "
114
                f"row {row[CsvColumns.DATAPOINT_ID]}"
115
            )
116
            continue
×
117

118
    return seeded_records, invalid_answers
1✔
119

120

121
def process_child_data_for_parent(
1✔
122
    parent_row: pd.Series,
123
    config: SeederConfig,
124
    parent_form_data: FormData,
125
    child_data_groups_dict: Dict[int, pd.core.groupby.DataFrameGroupBy],
126
    child_questions_dict: Dict[int, Dict[int, Any]],
127
    existing_records: Optional[List[FormData]] = None,
128
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
129
    """Process all child rows for a given parent across multiple child forms.
130

131
    Supports multiple child forms where each child form has its own grouped
132
    DataFrame and questions.
133

134
    Args:
135
        parent_row: Parent row containing datapoint_id
136
        config: SeederConfig instance
137
        parent_form_data: Parent FormData instance
138
        child_data_groups_dict: Dict mapping form_id to grouped child dataframe
139
        child_questions_dict: Dict mapping form_id to questions
140
        existing_records: Optional list of existing child records
141

142
    Returns:
143
        Tuple of (results_list, invalid_answers_list)
144
    """
145
    # Use parent's datapoint_id to match children to parent
146
    # In Akvo Flow, monitoring submissions reference parent via
147
    # the 'parent' column (parent's datapoint_id), not identifier
148
    parent_datapoint_id = parent_row[CsvColumns.DATAPOINT_ID]
1✔
149
    all_results = []
1✔
150
    all_invalid = []
1✔
151

152
    for form_id, child_data_groups in child_data_groups_dict.items():
1✔
153
        child_questions = child_questions_dict.get(form_id, {})
1✔
154

155
        try:
1✔
156
            child_rows = child_data_groups.get_group(parent_datapoint_id)
1✔
157
        except KeyError:
1✔
158
            # No child rows for this parent in this form
159
            continue
1✔
160

161
        # Filter existing records for this form
162
        form_existing_records = [
1!
163
            r for r in (existing_records or [])
164
            if r.form_id == form_id
165
        ] if existing_records else None
166

167
        # Use generic process_data_rows method
168
        results, invalid = process_data_rows(
1✔
169
            df=child_rows,
170
            config=config,
171
            questions=child_questions,
172
            administration_id=parent_form_data.administration_id,
173
            parent=parent_form_data,
174
            is_parent=False,
175
            existing_records=form_existing_records,
176
        )
177

178
        # Add form_id to results for tracking
179
        for result in results:
1✔
180
            result['form_id'] = form_id
1✔
181

182
        all_results.extend(results)
1✔
183
        all_invalid.extend(invalid)
1✔
184

185
    return all_results, all_invalid
1✔
186

187

188
# =============================================================================
189
# Form Data Creation - GENERIC METHOD
190
# =============================================================================
191

192

193
def create_form_data(
1✔
194
    row: pd.Series,
195
    user,
196
    administration_id: int,
197
    parent: Optional[FormData] = None,
198
    existing_record: Optional[FormData] = None,
199
) -> Optional[FormData]:
200
    """Generic method to create FormData instance (parent or child).
201

202
    Args:
203
        row: Pandas Series containing row data
204
        user: User creating the record
205
        administration_id: Administration ID
206
        parent: Parent FormData (for child records only)
207

208
    Returns:
209
        Created or updated FormData instance or None if failed
210
    """
211
    try:
1✔
212
        geo_value = None
1✔
213
        uuid_value = row[CsvColumns.IDENTIFIER]
1✔
214
        if CsvColumns.GEO in row and pd.notna(row[CsvColumns.GEO]):
1✔
215
            geo_value = [
1✔
216
                float(g) for g in
217
                str(row[CsvColumns.GEO]).split("|")
218
            ]
219
        if parent and not geo_value:
1✔
220
            geo_value = parent.geo
1✔
221
            uuid_value = parent.uuid
1✔
222

223
        flow_data_id = int(row[CsvColumns.DATAPOINT_ID])
1✔
224

225
        # Sanitize name by replacing pipe characters
226
        dp_name = row[CsvColumns.NAME].replace("|", " - ")
1✔
227
        # Add FLOW-{flow_data_id} prefix to name
228
        dp_name = f"{FLOW_PREFIX}{flow_data_id} - {dp_name}"
1✔
229

230
        # Check if record already exists
231
        if existing_record:
1!
232
            # Update existing record
233
            existing_record.name = dp_name
×
234
            existing_record.administration_id = administration_id
×
235
            existing_record.geo = geo_value
×
236
            existing_record.created_by = user
×
237
            existing_record.submitter = row.get(CsvColumns.SUBMITTER, None)
×
238
            if parent:
×
239
                existing_record.parent = parent
×
240
            existing_record.save()
×
241
            logger.info(
×
242
                f"Updated existing FormData {existing_record.pk} "
243
                f"for flow_data_id {flow_data_id}"
244
            )
245
            return existing_record
×
246

247
        # Create new record
248
        new_data_id = None
1✔
249
        if not parent and flow_data_id:
1✔
250
            new_data_id = flow_data_id
1✔
251
        data = FormData.objects.create(
1✔
252
            id=new_data_id,
253
            form_id=row[CsvColumns.FORM_ID],
254
            uuid=uuid_value,
255
            name=dp_name,
256
            administration_id=administration_id,
257
            geo=geo_value,
258
            created_by=user,
259
            parent=parent,
260
            submitter=row.get(CsvColumns.SUBMITTER, None),
261
        )
262
        # Set created timestamp from source data
263
        data.created = row[CsvColumns.CREATED_AT]
1✔
264
        data.save()
1✔
265
        logger.info(
1✔
266
            f"Created new FormData {data.pk} "
267
            f"for flow_data_id {flow_data_id}"
268
        )
269
        # Save to datapoint json file if parent is None (Registration)
270
        if data.parent is None:
1✔
271
            data.save_to_file
1✔
272
        return data
1✔
273
    except Exception as e:
×
274
        logger.error(
×
275
            f"Error creating/updating FormData for row "
276
            f"{row[CsvColumns.DATAPOINT_ID]}: {e}"
277
        )
278
        return None
×
279

280

281
# =============================================================================
282
# Form Data Deletion (Reverting) - GENERIC METHODS
283
# =============================================================================
284

285
def revert_form_data(
1✔
286
    form: Forms
287
) -> int:
288
    """Generic method to revert all FormData for a given form.
289

290
    Args:
291
        form: Forms instance
292
    """
293
    form_data = form.form_form_data.filter(
1✔
294
        name__startswith=FLOW_PREFIX,
295
    )
296
    total_data = form_data.count()
1✔
297
    for data in form_data.all():
1✔
298
        data.children.all().delete(hard=True)
1✔
299
        data.delete(hard=True)
1✔
300
    return total_data + sum([d.children.count() for d in form_data.all()])
1✔
301

302
# =============================================================================
303
# Answer Processing - GENERIC METHODS
304
# =============================================================================
305

306

307
def prepare_answer_data(
1✔
308
    row: pd.Series,
309
    questions: Dict[int, Any],
310
    administration_id: Optional[int],
311
    answer_processor: AnswerProcessor,
312
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
313
    """Generic method to prepare answer data from a data row.
314

315
    This method works for both parent and child data without modification.
316

317
    Args:
318
        row: Pandas Series containing row data
319
        questions: Dictionary mapping question ID to Question object
320
        administration_id: Administration ID for admin-type questions
321
        answer_processor: AnswerProcessor instance
322

323
    Returns:
324
        List of dictionaries containing answer data
325
    """
326
    answer_records = []
1✔
327
    invalid_answers = []
1✔
328

329
    for question_id, question in questions.items():
1✔
330
        column_name = str(question_id)
1✔
331

332
        # Skip if value is NaN
333
        if pd.isna(row.get(column_name)):
1!
334
            continue
×
335

336
        row_value = row[column_name]
1✔
337

338
        # Process answer based on question type
339
        opt_list = []
1✔
340
        if question.type in [
1!
341
            QuestionTypes.option,
342
            QuestionTypes.multiple_option,
343
        ]:
344
            opt_list = question.options.values_list("value", flat=True)
×
345
            opt_list = list(opt_list)
×
346

347
        name, value, options = answer_processor.process(
1✔
348
            question_type=question.type,
349
            row_value=row_value,
350
            administration_id=administration_id,
351
            opt_list=opt_list,
352
        )
353

354
        if name is None and value is None and options is None:
1!
355
            invalid_answers.append({
×
356
                "mis_form_id": question.form_id,
357
                "mis_question_id": question.pk,
358
                "mis_question_type": QuestionTypes.FieldStr[question.type],
359
                "flow_data_id": row[CsvColumns.DATAPOINT_ID],
360
                "value": row_value,
361
            })
362
            # Skip invalid answer
363
            continue
×
364

365
        answer_records.append(
1✔
366
            {
367
                "question_id": question.pk,
368
                "name": name,
369
                "value": value,
370
                "options": options,
371
            }
372
        )
373

374
    return answer_records, invalid_answers
1✔
375

376

377
def bulk_create_answers(
1✔
378
    data: FormData,
379
    answer_records: List[Dict[str, Any]],
380
    user,
381
):
382
    """Generic method to bulk create answer records.
383

384
    Works for both parent and child FormData instances.
385

386
    Args:
387
        data: FormData instance (parent or child)
388
        answer_records: List of answer data dictionaries
389
        user: User creating the answers
390
    """
391
    if not answer_records:
1!
392
        return
×
393

394
    # Clear existing answers (if any)
395
    data.data_answer.all().delete()
1✔
396

397
    # Bulk create new answers
398
    AnswerModel = data.data_answer.model
1✔
399
    data.data_answer.bulk_create(
1✔
400
        [
401
            AnswerModel(
402
                data=data,
403
                question_id=a["question_id"],
404
                value=a["value"],
405
                options=a["options"],
406
                name=a["name"],
407
                created_by=user,
408
            )
409
            for a in answer_records
410
        ]
411
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc