• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / akvo-mis / #516

19 Jan 2026 05:02AM UTC coverage: 88.496% (+0.4%) from 88.144%
#516

Pull #152

coveralls-python

ifirmawan
[#151] feat: update flow data seeder to group child data by identifier for improved reliability
Pull Request #152: Add Akvo Flow Datapoint ID to Datapoint Name

3724 of 4320 branches covered (86.2%)

Branch coverage included in aggregate %.

7623 of 8502 relevant lines covered (89.66%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.22
backend/utils/seeder_data_processor.py
1
"""
2
Seeder Data Processing Module
3

4
This module provides data processing functionality for Flow Complete Seeder.
5
"""
6

7
import logging
1✔
8
from typing import Dict, Optional, Any, List, Tuple
1✔
9

10
import pandas as pd
1✔
11

12
from api.v1.v1_data.models import FormData
1✔
13
from api.v1.v1_forms.models import QuestionTypes, Forms
1✔
14

15
from .seeder_config import (
1✔
16
    CsvColumns,
17
    SeederConfig,
18
    FLOW_PREFIX,
19
)
20
from .seeder_answer_processor import AnswerProcessor
1✔
21

22
logger = logging.getLogger(__name__)
1✔
23

24

25
# =============================================================================
26
# Data Processing - UNIFIED GENERIC METHODS
27
# =============================================================================
28

29

30
def process_data_rows(
1✔
31
    df: pd.DataFrame,
32
    config: SeederConfig,
33
    questions: Dict[int, Any],
34
    administration_id: int,
35
    parent: Optional[FormData] = None,
36
    is_parent: bool = True,
37
    existing_records: Optional[Dict[int, int]] = [],
38
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
39
    """Generic method to process data rows (parent or child).
40

41
    This unified method eliminates code duplication by handling both parent
42
    and child record processing with parameterization.
43

44
    Args:
45
        df: DataFrame containing rows to process
46
        config: SeederConfig instance
47
        questions: Dictionary mapping question ID to Question object
48
        administration_id: Administration ID for all rows
49
        parent: Parent FormData (for child records only)
50
        is_parent: Whether processing parent records
51
        existing_records: Dict mapping flow_data_id to mis_data_id
52

53
    Returns:
54
        List of dictionaries containing flow_data_id and mis_data_id
55
    """
56
    answer_processor = AnswerProcessor()
1✔
57
    seeded_records = []
1✔
58
    invalid_answers = []
1✔
59

60
    for _, row in df.iterrows():
1✔
61
        try:
1✔
62
            # Prepare and create answers
63
            answers, row_invalid_answers = prepare_answer_data(
1✔
64
                row=row,
65
                questions=questions,
66
                administration_id=administration_id,
67
                answer_processor=answer_processor,
68
            )
69
            invalid_answers.extend(row_invalid_answers)
1✔
70

71
            if len(answers) == 0:
1!
72
                continue
×
73

74
            # Create child FormData
75
            datapoint_id = str(row[CsvColumns.DATAPOINT_ID])
1✔
76
            parent_pk = parent.pk if parent else None
1✔
77

78
            # Find matching existing record
79
            matching = [
1✔
80
                er for er in existing_records
81
                if datapoint_id in er.name and er.parent_id == parent_pk
82
            ]
83

84
            existing_record = matching[0] if matching else None
1✔
85
            form_data = create_form_data(
1✔
86
                row=row,
87
                user=config.user,
88
                administration_id=administration_id,
89
                parent=parent,
90
                existing_record=existing_record,
91
            )
92

93
            if not form_data:
1!
94
                continue
×
95

96
            bulk_create_answers(form_data, answers, config.user)
1✔
97

98
            seeded_records.append(
1✔
99
                {
100
                    "flow_data_id": row[CsvColumns.DATAPOINT_ID],
101
                    "mis_data_id": form_data.pk,
102
                    "is_new": existing_record is None,
103
                }
104
            )
105

106
        except Exception as e:
×
107
            logger.error(
×
108
                f"Error processing {'parent' if is_parent else 'child'} "
109
                f"row {row[CsvColumns.DATAPOINT_ID]}: {e}"
110
            )
111
            logger.exception(
×
112
                f"Error processing {'parent' if is_parent else 'child'} "
113
                f"row {row[CsvColumns.DATAPOINT_ID]}"
114
            )
115
            continue
×
116

117
    return seeded_records, invalid_answers
1✔
118

119

120
def process_child_data_for_parent(
1✔
121
    parent_row: pd.Series,
122
    config: SeederConfig,
123
    parent_form_data: FormData,
124
    child_data_groups: pd.core.groupby.DataFrameGroupBy,
125
    child_questions: Dict[int, Any],
126
    existing_records: Optional[List[FormData]] = None,
127
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
128
    """Process all child rows for a given parent using generic method.
129

130
    Args:
131
        parent_row: Parent row containing identifier (uuid)
132
        config: SeederConfig instance
133
        parent_form_data: Parent FormData instance
134
        child_data_groups: Grouped child dataframe (grouped by identifier)
135
        child_questions: Questions for child data
136
        existing_records: Optional[List[FormData]] = None
137

138
    Returns:
139
        List of seeded child records
140
    """
141
    # Use identifier (uuid) to match children to parent
142
    # In Akvo Flow, monitoring submissions share the same identifier as
143
    # their parent registration
144
    parent_identifier = parent_row[CsvColumns.IDENTIFIER]
1✔
145

146
    try:
1✔
147
        child_rows = child_data_groups.get_group(parent_identifier)
1✔
148
    except KeyError:
1✔
149
        # No child rows for this parent
150
        return [], []
1✔
151

152
    # Use generic process_data_rows method
153
    return process_data_rows(
1✔
154
        df=child_rows,
155
        config=config,
156
        questions=child_questions,
157
        administration_id=parent_form_data.administration_id,
158
        parent=parent_form_data,
159
        is_parent=False,
160
        existing_records=existing_records,
161
    )
162

163

164
# =============================================================================
165
# Form Data Creation - GENERIC METHOD
166
# =============================================================================
167

168

169
def create_form_data(
1✔
170
    row: pd.Series,
171
    user,
172
    administration_id: int,
173
    parent: Optional[FormData] = None,
174
    existing_record: Optional[FormData] = None,
175
) -> Optional[FormData]:
176
    """Generic method to create FormData instance (parent or child).
177

178
    Args:
179
        row: Pandas Series containing row data
180
        user: User creating the record
181
        administration_id: Administration ID
182
        parent: Parent FormData (for child records only)
183

184
    Returns:
185
        Created or updated FormData instance or None if failed
186
    """
187
    try:
1✔
188
        geo_value = None
1✔
189
        if CsvColumns.GEO in row and pd.notna(row[CsvColumns.GEO]):
1✔
190
            geo_value = [
1✔
191
                float(g) for g in
192
                str(row[CsvColumns.GEO]).split("|")
193
            ]
194
        if parent and not geo_value:
1✔
195
            geo_value = parent.geo
1✔
196

197
        flow_data_id = int(row[CsvColumns.DATAPOINT_ID])
1✔
198

199
        # Sanitize name by replacing pipe characters
200
        dp_name = row[CsvColumns.NAME].replace("|", " - ")
1✔
201
        # Add FLOW-{flow_data_id} prefix to name
202
        dp_name = f"{FLOW_PREFIX}{flow_data_id} - {dp_name}"
1✔
203

204
        # Check if record already exists
205
        if existing_record:
1!
206
            # Update existing record
207
            existing_record.name = dp_name
×
208
            existing_record.administration_id = administration_id
×
209
            existing_record.geo = geo_value
×
210
            existing_record.created_by = user
×
211
            existing_record.submitter = row.get(CsvColumns.SUBMITTER, None)
×
212
            if parent:
×
213
                existing_record.parent = parent
×
214
            existing_record.save()
×
215
            logger.info(
×
216
                f"Updated existing FormData {existing_record.pk} "
217
                f"for flow_data_id {flow_data_id}"
218
            )
219
            return existing_record
×
220

221
        # Create new record
222
        new_data_id = None
1✔
223
        if not parent and flow_data_id:
1✔
224
            new_data_id = flow_data_id
1✔
225
        data = FormData.objects.create(
1✔
226
            id=new_data_id,
227
            form_id=row[CsvColumns.FORM_ID],
228
            uuid=row[CsvColumns.IDENTIFIER],
229
            name=dp_name,
230
            administration_id=administration_id,
231
            geo=geo_value,
232
            created_by=user,
233
            parent=parent,
234
            submitter=row.get(CsvColumns.SUBMITTER, None),
235
        )
236
        # Set created timestamp from source data
237
        data.created = row[CsvColumns.CREATED_AT]
1✔
238
        data.save()
1✔
239
        logger.info(
1✔
240
            f"Created new FormData {data.pk} "
241
            f"for flow_data_id {flow_data_id}"
242
        )
243
        # Save to datapoint json file if parent is None (Registration)
244
        if data.parent is None:
1✔
245
            data.save_to_file
1✔
246
        return data
1✔
247
    except Exception as e:
×
248
        logger.error(
×
249
            f"Error creating/updating FormData for row "
250
            f"{row[CsvColumns.DATAPOINT_ID]}: {e}"
251
        )
252
        return None
×
253

254

255
# =============================================================================
256
# Form Data Deletion (Reverting) - GENERIC METHODS
257
# =============================================================================
258

259
def revert_form_data(
1✔
260
    form: Forms
261
) -> int:
262
    """Generic method to revert all FormData for a given form.
263

264
    Args:
265
        form: Forms instance
266
    """
267
    form_data = form.form_form_data.filter(
1✔
268
        name__startswith=FLOW_PREFIX,
269
    )
270
    total_data = form_data.count()
1✔
271
    for data in form_data.all():
1✔
272
        data.children.all().delete(hard=True)
1✔
273
        data.delete(hard=True)
1✔
274
    return total_data + sum([d.children.count() for d in form_data.all()])
1✔
275

276
# =============================================================================
277
# Answer Processing - GENERIC METHODS
278
# =============================================================================
279

280

281
def prepare_answer_data(
1✔
282
    row: pd.Series,
283
    questions: Dict[int, Any],
284
    administration_id: Optional[int],
285
    answer_processor: AnswerProcessor,
286
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
287
    """Generic method to prepare answer data from a data row.
288

289
    This method works for both parent and child data without modification.
290

291
    Args:
292
        row: Pandas Series containing row data
293
        questions: Dictionary mapping question ID to Question object
294
        administration_id: Administration ID for admin-type questions
295
        answer_processor: AnswerProcessor instance
296

297
    Returns:
298
        List of dictionaries containing answer data
299
    """
300
    answer_records = []
1✔
301
    invalid_answers = []
1✔
302

303
    for question_id, question in questions.items():
1✔
304
        column_name = str(question_id)
1✔
305

306
        # Skip if value is NaN
307
        if pd.isna(row.get(column_name)):
1!
308
            continue
×
309

310
        row_value = row[column_name]
1✔
311

312
        # Process answer based on question type
313
        opt_list = []
1✔
314
        if question.type in [
1!
315
            QuestionTypes.option,
316
            QuestionTypes.multiple_option,
317
        ]:
318
            opt_list = question.options.values_list("value", flat=True)
×
319
            opt_list = list(opt_list)
×
320

321
        name, value, options = answer_processor.process(
1✔
322
            question_type=question.type,
323
            row_value=row_value,
324
            administration_id=administration_id,
325
            opt_list=opt_list,
326
        )
327

328
        if name is None and value is None and options is None:
1!
329
            invalid_answers.append({
×
330
                "mis_form_id": question.form_id,
331
                "mis_question_id": question.pk,
332
                "mis_question_type": QuestionTypes.FieldStr[question.type],
333
                "flow_data_id": row[CsvColumns.DATAPOINT_ID],
334
                "value": row_value,
335
            })
336
            # Skip invalid answer
337
            continue
×
338

339
        answer_records.append(
1✔
340
            {
341
                "question_id": question.pk,
342
                "name": name,
343
                "value": value,
344
                "options": options,
345
            }
346
        )
347

348
    return answer_records, invalid_answers
1✔
349

350

351
def bulk_create_answers(
1✔
352
    data: FormData,
353
    answer_records: List[Dict[str, Any]],
354
    user,
355
):
356
    """Generic method to bulk create answer records.
357

358
    Works for both parent and child FormData instances.
359

360
    Args:
361
        data: FormData instance (parent or child)
362
        answer_records: List of answer data dictionaries
363
        user: User creating the answers
364
    """
365
    if not answer_records:
1!
366
        return
×
367

368
    # Clear existing answers (if any)
369
    data.data_answer.all().delete()
1✔
370

371
    # Bulk create new answers
372
    AnswerModel = data.data_answer.model
1✔
373
    data.data_answer.bulk_create(
1✔
374
        [
375
            AnswerModel(
376
                data=data,
377
                question_id=a["question_id"],
378
                value=a["value"],
379
                options=a["options"],
380
                name=a["name"],
381
                created_by=user,
382
            )
383
            for a in answer_records
384
        ]
385
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc