• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

akvo / akvo-mis / #512

18 Jan 2026 04:14PM UTC coverage: 88.528% (+0.4%) from 88.144%
#512

Pull #152

coveralls-python

ifirmawan
[#151] fix: make flow_data_seeder tests parallel-safe for CI

- Add temp directories with proper structure for CSV output in command tests
- Mock refresh_materialized_data to prevent SQL errors in isolated tests
- Add PropertyMock for FormData.save_to_file to prevent file system errors
- Mock all external dependencies for complete test isolation
- Generate unique ID offsets per test instance to avoid parallel conflicts
- Change backward compatibility tests from database queries to output verification

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Pull Request #152: Add Akvo Flow Datapoint ID to Datapoint Name

3724 of 4320 branches covered (86.2%)

Branch coverage included in aggregate %.

7627 of 8502 relevant lines covered (89.71%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.22
backend/utils/seeder_data_processor.py
1
"""
2
Seeder Data Processing Module
3

4
This module provides data processing functionality for Flow Complete Seeder.
5
"""
6

7
import logging
1✔
8
from typing import Dict, Optional, Any, List, Tuple
1✔
9

10
import pandas as pd
1✔
11

12
from api.v1.v1_data.models import FormData
1✔
13
from api.v1.v1_forms.models import QuestionTypes, Forms
1✔
14

15
from .seeder_config import (
1✔
16
    CsvColumns,
17
    SeederConfig,
18
    FLOW_PREFIX,
19
)
20
from .seeder_answer_processor import AnswerProcessor
1✔
21

22
logger = logging.getLogger(__name__)
1✔
23

24

25
# =============================================================================
26
# Data Processing - UNIFIED GENERIC METHODS
27
# =============================================================================
28

29

30
def process_data_rows(
1✔
31
    df: pd.DataFrame,
32
    config: SeederConfig,
33
    questions: Dict[int, Any],
34
    administration_id: int,
35
    parent: Optional[FormData] = None,
36
    is_parent: bool = True,
37
    existing_records: Optional[Dict[int, int]] = [],
38
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
39
    """Generic method to process data rows (parent or child).
40

41
    This unified method eliminates code duplication by handling both parent
42
    and child record processing with parameterization.
43

44
    Args:
45
        df: DataFrame containing rows to process
46
        config: SeederConfig instance
47
        questions: Dictionary mapping question ID to Question object
48
        administration_id: Administration ID for all rows
49
        parent: Parent FormData (for child records only)
50
        is_parent: Whether processing parent records
51
        existing_records: Dict mapping flow_data_id to mis_data_id
52

53
    Returns:
54
        List of dictionaries containing flow_data_id and mis_data_id
55
    """
56
    answer_processor = AnswerProcessor()
1✔
57
    seeded_records = []
1✔
58
    invalid_answers = []
1✔
59

60
    for _, row in df.iterrows():
1✔
61
        try:
1✔
62
            # Prepare and create answers
63
            answers, row_invalid_answers = prepare_answer_data(
1✔
64
                row=row,
65
                questions=questions,
66
                administration_id=administration_id,
67
                answer_processor=answer_processor,
68
            )
69
            invalid_answers.extend(row_invalid_answers)
1✔
70

71
            if len(answers) == 0:
1!
72
                continue
×
73

74
            # Create child FormData
75
            datapoint_id = str(row[CsvColumns.DATAPOINT_ID])
1✔
76
            parent_pk = parent.pk if parent else None
1✔
77

78
            # Find matching existing record
79
            matching = [
1✔
80
                er for er in existing_records
81
                if datapoint_id in er.name and er.parent_id == parent_pk
82
            ]
83

84
            existing_record = matching[0] if matching else None
1✔
85
            form_data = create_form_data(
1✔
86
                row=row,
87
                user=config.user,
88
                administration_id=administration_id,
89
                parent=parent,
90
                existing_record=existing_record,
91
            )
92

93
            if not form_data:
1!
94
                continue
×
95

96
            bulk_create_answers(form_data, answers, config.user)
1✔
97

98
            seeded_records.append(
1✔
99
                {
100
                    "flow_data_id": row[CsvColumns.DATAPOINT_ID],
101
                    "mis_data_id": form_data.pk,
102
                    "is_new": existing_record is None,
103
                }
104
            )
105

106
        except Exception as e:
×
107
            logger.error(
×
108
                f"Error processing {'parent' if is_parent else 'child'} "
109
                f"row {row[CsvColumns.DATAPOINT_ID]}: {e}"
110
            )
111
            logger.exception(
×
112
                f"Error processing {'parent' if is_parent else 'child'} "
113
                f"row {row[CsvColumns.DATAPOINT_ID]}"
114
            )
115
            continue
×
116

117
    return seeded_records, invalid_answers
1✔
118

119

120
def process_child_data_for_parent(
1✔
121
    parent_row: pd.Series,
122
    config: SeederConfig,
123
    parent_form_data: FormData,
124
    child_data_groups: pd.core.groupby.DataFrameGroupBy,
125
    child_questions: Dict[int, Any],
126
    existing_records: Optional[List[FormData]] = None,
127
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
128
    """Process all child rows for a given parent using generic method.
129

130
    Args:
131
        parent_row: Parent row containing datapoint_id
132
        config: SeederConfig instance
133
        parent_form_data: Parent FormData instance
134
        child_data_groups: Grouped child dataframe
135
        child_questions: Questions for child data
136
        existing_records: Optional[List[FormData]] = None
137

138
    Returns:
139
        List of seeded child records
140
    """
141
    parent_datapoint_id = parent_row[CsvColumns.DATAPOINT_ID]
1✔
142

143
    try:
1✔
144
        child_rows = child_data_groups.get_group(parent_datapoint_id)
1✔
145
    except KeyError:
1✔
146
        # No child rows for this parent
147
        return [], []
1✔
148

149
    # Use generic process_data_rows method
150
    return process_data_rows(
1✔
151
        df=child_rows,
152
        config=config,
153
        questions=child_questions,
154
        administration_id=parent_form_data.administration_id,
155
        parent=parent_form_data,
156
        is_parent=False,
157
        existing_records=existing_records,
158
    )
159

160

161
# =============================================================================
162
# Form Data Creation - GENERIC METHOD
163
# =============================================================================
164

165

166
def create_form_data(
1✔
167
    row: pd.Series,
168
    user,
169
    administration_id: int,
170
    parent: Optional[FormData] = None,
171
    existing_record: Optional[FormData] = None,
172
) -> Optional[FormData]:
173
    """Generic method to create FormData instance (parent or child).
174

175
    Args:
176
        row: Pandas Series containing row data
177
        user: User creating the record
178
        administration_id: Administration ID
179
        parent: Parent FormData (for child records only)
180

181
    Returns:
182
        Created or updated FormData instance or None if failed
183
    """
184
    try:
1✔
185
        geo_value = None
1✔
186
        if CsvColumns.GEO in row and pd.notna(row[CsvColumns.GEO]):
1✔
187
            geo_value = [
1✔
188
                float(g) for g in
189
                str(row[CsvColumns.GEO]).split("|")
190
            ]
191
        if parent and not geo_value:
1✔
192
            geo_value = parent.geo
1✔
193

194
        flow_data_id = int(row[CsvColumns.DATAPOINT_ID])
1✔
195

196
        # Sanitize name by replacing pipe characters
197
        dp_name = row[CsvColumns.NAME].replace("|", " - ")
1✔
198
        # Add FLOW-{flow_data_id} prefix to name
199
        dp_name = f"{FLOW_PREFIX}{flow_data_id} - {dp_name}"
1✔
200

201
        # Check if record already exists
202
        if existing_record:
1!
203
            # Update existing record
204
            existing_record.name = dp_name
×
205
            existing_record.administration_id = administration_id
×
206
            existing_record.geo = geo_value
×
207
            existing_record.created_by = user
×
208
            existing_record.submitter = row.get(CsvColumns.SUBMITTER, None)
×
209
            if parent:
×
210
                existing_record.parent = parent
×
211
            existing_record.save()
×
212
            logger.info(
×
213
                f"Updated existing FormData {existing_record.pk} "
214
                f"for flow_data_id {flow_data_id}"
215
            )
216
            return existing_record
×
217

218
        # Create new record
219
        new_data_id = None
1✔
220
        if not parent and flow_data_id:
1✔
221
            new_data_id = flow_data_id
1✔
222
        data = FormData.objects.create(
1✔
223
            id=new_data_id,
224
            form_id=row[CsvColumns.FORM_ID],
225
            uuid=row[CsvColumns.IDENTIFIER],
226
            name=dp_name,
227
            administration_id=administration_id,
228
            geo=geo_value,
229
            created_by=user,
230
            parent=parent,
231
            submitter=row.get(CsvColumns.SUBMITTER, None),
232
        )
233
        # Set created timestamp from source data
234
        data.created = row[CsvColumns.CREATED_AT]
1✔
235
        data.save()
1✔
236
        logger.info(
1✔
237
            f"Created new FormData {data.pk} "
238
            f"for flow_data_id {flow_data_id}"
239
        )
240
        # Save to datapoint json file if parent is None (Registration)
241
        if data.parent is None:
1✔
242
            data.save_to_file
1✔
243
        return data
1✔
244
    except Exception as e:
×
245
        logger.error(
×
246
            f"Error creating/updating FormData for row "
247
            f"{row[CsvColumns.DATAPOINT_ID]}: {e}"
248
        )
249
        return None
×
250

251

252
# =============================================================================
253
# Form Data Deletion (Reverting) - GENERIC METHODS
254
# =============================================================================
255

256
def revert_form_data(
1✔
257
    form: Forms
258
) -> int:
259
    """Generic method to revert all FormData for a given form.
260

261
    Args:
262
        form: Forms instance
263
    """
264
    form_data = form.form_form_data.filter(
1✔
265
        name__startswith=FLOW_PREFIX,
266
    )
267
    total_data = form_data.count()
1✔
268
    for data in form_data.all():
1✔
269
        data.children.all().delete(hard=True)
1✔
270
        data.delete(hard=True)
1✔
271
    return total_data + sum([d.children.count() for d in form_data.all()])
1✔
272

273
# =============================================================================
274
# Answer Processing - GENERIC METHODS
275
# =============================================================================
276

277

278
def prepare_answer_data(
1✔
279
    row: pd.Series,
280
    questions: Dict[int, Any],
281
    administration_id: Optional[int],
282
    answer_processor: AnswerProcessor,
283
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
284
    """Generic method to prepare answer data from a data row.
285

286
    This method works for both parent and child data without modification.
287

288
    Args:
289
        row: Pandas Series containing row data
290
        questions: Dictionary mapping question ID to Question object
291
        administration_id: Administration ID for admin-type questions
292
        answer_processor: AnswerProcessor instance
293

294
    Returns:
295
        List of dictionaries containing answer data
296
    """
297
    answer_records = []
1✔
298
    invalid_answers = []
1✔
299

300
    for question_id, question in questions.items():
1✔
301
        column_name = str(question_id)
1✔
302

303
        # Skip if value is NaN
304
        if pd.isna(row.get(column_name)):
1!
305
            continue
×
306

307
        row_value = row[column_name]
1✔
308

309
        # Process answer based on question type
310
        opt_list = []
1✔
311
        if question.type in [
1!
312
            QuestionTypes.option,
313
            QuestionTypes.multiple_option,
314
        ]:
315
            opt_list = question.options.values_list("value", flat=True)
×
316
            opt_list = list(opt_list)
×
317

318
        name, value, options = answer_processor.process(
1✔
319
            question_type=question.type,
320
            row_value=row_value,
321
            administration_id=administration_id,
322
            opt_list=opt_list,
323
        )
324

325
        if name is None and value is None and options is None:
1!
326
            invalid_answers.append({
×
327
                "mis_form_id": question.form_id,
328
                "mis_question_id": question.pk,
329
                "mis_question_type": QuestionTypes.FieldStr[question.type],
330
                "flow_data_id": row[CsvColumns.DATAPOINT_ID],
331
                "value": row_value,
332
            })
333
            # Skip invalid answer
334
            continue
×
335

336
        answer_records.append(
1✔
337
            {
338
                "question_id": question.pk,
339
                "name": name,
340
                "value": value,
341
                "options": options,
342
            }
343
        )
344

345
    return answer_records, invalid_answers
1✔
346

347

348
def bulk_create_answers(
1✔
349
    data: FormData,
350
    answer_records: List[Dict[str, Any]],
351
    user,
352
):
353
    """Generic method to bulk create answer records.
354

355
    Works for both parent and child FormData instances.
356

357
    Args:
358
        data: FormData instance (parent or child)
359
        answer_records: List of answer data dictionaries
360
        user: User creating the answers
361
    """
362
    if not answer_records:
1!
363
        return
×
364

365
    # Clear existing answers (if any)
366
    data.data_answer.all().delete()
1✔
367

368
    # Bulk create new answers
369
    AnswerModel = data.data_answer.model
1✔
370
    data.data_answer.bulk_create(
1✔
371
        [
372
            AnswerModel(
373
                data=data,
374
                question_id=a["question_id"],
375
                value=a["value"],
376
                options=a["options"],
377
                name=a["name"],
378
                created_by=user,
379
            )
380
            for a in answer_records
381
        ]
382
    )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc