• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / akvo-mis / #531

19 Jan 2026 12:46PM UTC coverage: 88.346% (-0.1%) from 88.482%
#531

push

coveralls-python

web-flow
Merge a117ae04c into e15dc2c3e

3831 of 4456 branches covered (85.97%)

Branch coverage included in aggregate %.

7881 of 8801 relevant lines covered (89.55%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.99
backend/utils/seeder_data_processor.py
1
"""
2
Seeder Data Processing Module
3

4
This module provides data processing functionality for Flow Complete Seeder.
5
"""
6

7
import logging
1✔
8
from typing import Dict, Optional, Any, List, Tuple
1✔
9

10
import pandas as pd
1✔
11

12
from api.v1.v1_data.models import FormData
1✔
13
from api.v1.v1_forms.models import QuestionTypes, Forms
1✔
14

15
from .seeder_config import (
1✔
16
    CsvColumns,
17
    SeederConfig,
18
    FLOW_PREFIX,
19
)
20
from .seeder_answer_processor import AnswerProcessor
1✔
21

22
logger = logging.getLogger(__name__)
1✔
23

24

25
# =============================================================================
26
# Data Processing - UNIFIED GENERIC METHODS
27
# =============================================================================
28

29

30
def process_data_rows(
1✔
31
    df: pd.DataFrame,
32
    config: SeederConfig,
33
    questions: Dict[int, Any],
34
    administration_id: int,
35
    parent: Optional[FormData] = None,
36
    is_parent: bool = True,
37
    existing_records: Optional[Dict[int, int]] = [],
38
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
39
    """Generic method to process data rows (parent or child).
40

41
    This unified method eliminates code duplication by handling both parent
42
    and child record processing with parameterization.
43

44
    Args:
45
        df: DataFrame containing rows to process
46
        config: SeederConfig instance
47
        questions: Dictionary mapping question ID to Question object
48
        administration_id: Administration ID for all rows
49
        parent: Parent FormData (for child records only)
50
        is_parent: Whether processing parent records
51
        existing_records: Dict mapping flow_data_id to mis_data_id
52

53
    Returns:
54
        List of dictionaries containing flow_data_id and mis_data_id
55
    """
56
    answer_processor = AnswerProcessor()
1✔
57
    seeded_records = []
1✔
58
    invalid_answers = []
1✔
59

60
    for _, row in df.iterrows():
1✔
61
        try:
1✔
62
            # Prepare and create answers
63
            answers, row_invalid_answers = prepare_answer_data(
1✔
64
                row=row,
65
                questions=questions,
66
                administration_id=administration_id,
67
                answer_processor=answer_processor,
68
            )
69
            invalid_answers.extend(row_invalid_answers)
1✔
70

71
            if len(answers) == 0:
1!
72
                continue
×
73

74
            # Create child FormData
75
            datapoint_id = str(row[CsvColumns.DATAPOINT_ID])
1✔
76
            parent_pk = parent.pk if parent else None
1✔
77

78
            # Find matching existing record
79
            matching = [
1✔
80
                er for er in (existing_records or [])
81
                if datapoint_id in er.name and er.parent_id == parent_pk
82
            ]
83

84
            existing_record = matching[0] if matching else None
1✔
85
            form_data = create_form_data(
1✔
86
                row=row,
87
                user=config.user,
88
                administration_id=administration_id,
89
                parent=parent,
90
                existing_record=existing_record,
91
            )
92

93
            if not form_data:
1!
94
                continue
×
95

96
            bulk_create_answers(form_data, answers, config.user)
1✔
97

98
            seeded_records.append(
1✔
99
                {
100
                    "flow_data_id": row[CsvColumns.DATAPOINT_ID],
101
                    "mis_data_id": form_data.pk,
102
                    "is_new": existing_record is None,
103
                }
104
            )
105

106
        except Exception as e:
×
107
            logger.error(
×
108
                f"Error processing {'parent' if is_parent else 'child'} "
109
                f"row {row[CsvColumns.DATAPOINT_ID]}: {e}"
110
            )
111
            logger.exception(
×
112
                f"Error processing {'parent' if is_parent else 'child'} "
113
                f"row {row[CsvColumns.DATAPOINT_ID]}"
114
            )
115
            continue
×
116

117
    return seeded_records, invalid_answers
1✔
118

119

120
def process_child_data_for_parent(
1✔
121
    parent_row: pd.Series,
122
    config: SeederConfig,
123
    parent_form_data: FormData,
124
    child_data_groups_dict: Dict[int, pd.core.groupby.DataFrameGroupBy],
125
    child_questions_dict: Dict[int, Dict[int, Any]],
126
    existing_records: Optional[List[FormData]] = None,
127
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
128
    """Process all child rows for a given parent across multiple child forms.
129

130
    Supports multiple child forms where each child form has its own grouped
131
    DataFrame and questions.
132

133
    Args:
134
        parent_row: Parent row containing identifier (uuid)
135
        config: SeederConfig instance
136
        parent_form_data: Parent FormData instance
137
        child_data_groups_dict: Dict mapping form_id to grouped child dataframe
138
        child_questions_dict: Dict mapping form_id to questions
139
        existing_records: Optional list of existing child records
140

141
    Returns:
142
        Tuple of (results_list, invalid_answers_list)
143
    """
144
    # Use identifier (uuid) to match children to parent
145
    # In Akvo Flow, monitoring submissions share the same identifier as
146
    # their parent registration
147
    parent_identifier = parent_row[CsvColumns.IDENTIFIER]
1✔
148
    all_results = []
1✔
149
    all_invalid = []
1✔
150

151
    for form_id, child_data_groups in child_data_groups_dict.items():
1✔
152
        child_questions = child_questions_dict.get(form_id, {})
1✔
153

154
        try:
1✔
155
            child_rows = child_data_groups.get_group(parent_identifier)
1✔
156
        except KeyError:
1✔
157
            # No child rows for this parent in this form
158
            continue
1✔
159

160
        # Filter existing records for this form
161
        form_existing_records = [
1!
162
            r for r in (existing_records or [])
163
            if r.form_id == form_id
164
        ] if existing_records else None
165

166
        # Use generic process_data_rows method
167
        results, invalid = process_data_rows(
1✔
168
            df=child_rows,
169
            config=config,
170
            questions=child_questions,
171
            administration_id=parent_form_data.administration_id,
172
            parent=parent_form_data,
173
            is_parent=False,
174
            existing_records=form_existing_records,
175
        )
176

177
        # Add form_id to results for tracking
178
        for result in results:
1✔
179
            result['form_id'] = form_id
1✔
180

181
        all_results.extend(results)
1✔
182
        all_invalid.extend(invalid)
1✔
183

184
    return all_results, all_invalid
1✔
185

186

187
# =============================================================================
188
# Form Data Creation - GENERIC METHOD
189
# =============================================================================
190

191

192
def create_form_data(
1✔
193
    row: pd.Series,
194
    user,
195
    administration_id: int,
196
    parent: Optional[FormData] = None,
197
    existing_record: Optional[FormData] = None,
198
) -> Optional[FormData]:
199
    """Generic method to create FormData instance (parent or child).
200

201
    Args:
202
        row: Pandas Series containing row data
203
        user: User creating the record
204
        administration_id: Administration ID
205
        parent: Parent FormData (for child records only)
206

207
    Returns:
208
        Created or updated FormData instance or None if failed
209
    """
210
    try:
1✔
211
        geo_value = None
1✔
212
        if CsvColumns.GEO in row and pd.notna(row[CsvColumns.GEO]):
1✔
213
            geo_value = [
1✔
214
                float(g) for g in
215
                str(row[CsvColumns.GEO]).split("|")
216
            ]
217
        if parent and not geo_value:
1✔
218
            geo_value = parent.geo
1✔
219

220
        flow_data_id = int(row[CsvColumns.DATAPOINT_ID])
1✔
221

222
        # Sanitize name by replacing pipe characters
223
        dp_name = row[CsvColumns.NAME].replace("|", " - ")
1✔
224
        # Add FLOW-{flow_data_id} prefix to name
225
        dp_name = f"{FLOW_PREFIX}{flow_data_id} - {dp_name}"
1✔
226

227
        # Check if record already exists
228
        if existing_record:
1!
229
            # Update existing record
230
            existing_record.name = dp_name
×
231
            existing_record.administration_id = administration_id
×
232
            existing_record.geo = geo_value
×
233
            existing_record.created_by = user
×
234
            existing_record.submitter = row.get(CsvColumns.SUBMITTER, None)
×
235
            if parent:
×
236
                existing_record.parent = parent
×
237
            existing_record.save()
×
238
            logger.info(
×
239
                f"Updated existing FormData {existing_record.pk} "
240
                f"for flow_data_id {flow_data_id}"
241
            )
242
            return existing_record
×
243

244
        # Create new record
245
        new_data_id = None
1✔
246
        if not parent and flow_data_id:
1✔
247
            new_data_id = flow_data_id
1✔
248
        data = FormData.objects.create(
1✔
249
            id=new_data_id,
250
            form_id=row[CsvColumns.FORM_ID],
251
            uuid=row[CsvColumns.IDENTIFIER],
252
            name=dp_name,
253
            administration_id=administration_id,
254
            geo=geo_value,
255
            created_by=user,
256
            parent=parent,
257
            submitter=row.get(CsvColumns.SUBMITTER, None),
258
        )
259
        # Set created timestamp from source data
260
        data.created = row[CsvColumns.CREATED_AT]
1✔
261
        data.save()
1✔
262
        logger.info(
1✔
263
            f"Created new FormData {data.pk} "
264
            f"for flow_data_id {flow_data_id}"
265
        )
266
        # Save to datapoint json file if parent is None (Registration)
267
        if data.parent is None:
1✔
268
            data.save_to_file
1✔
269
        return data
1✔
270
    except Exception as e:
×
271
        logger.error(
×
272
            f"Error creating/updating FormData for row "
273
            f"{row[CsvColumns.DATAPOINT_ID]}: {e}"
274
        )
275
        return None
×
276

277

278
# =============================================================================
279
# Form Data Deletion (Reverting) - GENERIC METHODS
280
# =============================================================================
281

282
def revert_form_data(
1✔
283
    form: Forms
284
) -> int:
285
    """Generic method to revert all FormData for a given form.
286

287
    Args:
288
        form: Forms instance
289
    """
290
    form_data = form.form_form_data.filter(
1✔
291
        name__startswith=FLOW_PREFIX,
292
    )
293
    total_data = form_data.count()
1✔
294
    for data in form_data.all():
1✔
295
        data.children.all().delete(hard=True)
1✔
296
        data.delete(hard=True)
1✔
297
    return total_data + sum([d.children.count() for d in form_data.all()])
1✔
298

299
# =============================================================================
300
# Answer Processing - GENERIC METHODS
301
# =============================================================================
302

303

304
def prepare_answer_data(
1✔
305
    row: pd.Series,
306
    questions: Dict[int, Any],
307
    administration_id: Optional[int],
308
    answer_processor: AnswerProcessor,
309
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
310
    """Generic method to prepare answer data from a data row.
311

312
    This method works for both parent and child data without modification.
313

314
    Args:
315
        row: Pandas Series containing row data
316
        questions: Dictionary mapping question ID to Question object
317
        administration_id: Administration ID for admin-type questions
318
        answer_processor: AnswerProcessor instance
319

320
    Returns:
321
        List of dictionaries containing answer data
322
    """
323
    answer_records = []
1✔
324
    invalid_answers = []
1✔
325

326
    for question_id, question in questions.items():
1✔
327
        column_name = str(question_id)
1✔
328

329
        # Skip if value is NaN
330
        if pd.isna(row.get(column_name)):
1!
331
            continue
×
332

333
        row_value = row[column_name]
1✔
334

335
        # Process answer based on question type
336
        opt_list = []
1✔
337
        if question.type in [
1!
338
            QuestionTypes.option,
339
            QuestionTypes.multiple_option,
340
        ]:
341
            opt_list = question.options.values_list("value", flat=True)
×
342
            opt_list = list(opt_list)
×
343

344
        name, value, options = answer_processor.process(
1✔
345
            question_type=question.type,
346
            row_value=row_value,
347
            administration_id=administration_id,
348
            opt_list=opt_list,
349
        )
350

351
        if name is None and value is None and options is None:
1!
352
            invalid_answers.append({
×
353
                "mis_form_id": question.form_id,
354
                "mis_question_id": question.pk,
355
                "mis_question_type": QuestionTypes.FieldStr[question.type],
356
                "flow_data_id": row[CsvColumns.DATAPOINT_ID],
357
                "value": row_value,
358
            })
359
            # Skip invalid answer
360
            continue
×
361

362
        answer_records.append(
1✔
363
            {
364
                "question_id": question.pk,
365
                "name": name,
366
                "value": value,
367
                "options": options,
368
            }
369
        )
370

371
    return answer_records, invalid_answers
1✔
372

373

374
def bulk_create_answers(
1✔
375
    data: FormData,
376
    answer_records: List[Dict[str, Any]],
377
    user,
378
):
379
    """Generic method to bulk create answer records.
380

381
    Works for both parent and child FormData instances.
382

383
    Args:
384
        data: FormData instance (parent or child)
385
        answer_records: List of answer data dictionaries
386
        user: User creating the answers
387
    """
388
    if not answer_records:
1!
389
        return
×
390

391
    # Clear existing answers (if any)
392
    data.data_answer.all().delete()
1✔
393

394
    # Bulk create new answers
395
    AnswerModel = data.data_answer.model
1✔
396
    data.data_answer.bulk_create(
1✔
397
        [
398
            AnswerModel(
399
                data=data,
400
                question_id=a["question_id"],
401
                value=a["value"],
402
                options=a["options"],
403
                name=a["name"],
404
                created_by=user,
405
            )
406
            for a in answer_records
407
        ]
408
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc