• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

paulmthompson / WhiskerToolbox / 18117112849

30 Sep 2025 02:44AM UTC coverage: 70.161% (+0.03%) from 70.132%
18117112849

push

github

paulmthompson
hungarian algorithm is actually used

60 of 77 new or added lines in 2 files covered. (77.92%)

352 existing lines in 12 files now uncovered.

45125 of 64316 relevant lines covered (70.16%)

1116.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.77
/src/DataManager/transforms/TransformPipeline.cpp
1
#include "TransformPipeline.hpp"
2

3
#include "DataManager.hpp"
4
#include "ParameterFactory.hpp"
5
#include "TransformRegistry.hpp"
6
#include "transforms/Lines/Line_Proximity_Grouping/line_proximity_grouping.hpp"
7

8
#include <algorithm>
9
#include <chrono>
10
#include <fstream>
11
#include <iostream>
12
#include <thread>
13
#include <future>
14
#include <type_traits>
15

16
TransformPipeline::TransformPipeline(DataManager* data_manager, TransformRegistry* registry)
74✔
17
    : data_manager_(data_manager), registry_(registry) {
74✔
18
    if (!data_manager_) {
74✔
UNCOV
19
        throw std::invalid_argument("DataManager cannot be null");
×
20
    }
21
    if (!registry_) {
74✔
UNCOV
22
        throw std::invalid_argument("TransformRegistry cannot be null");
×
23
    }
24
    auto& factory = ParameterFactory::getInstance();
74✔
25
    factory.initializeDefaultSetters();
74✔
26

27
}
74✔
28

29
bool TransformPipeline::loadFromJson(nlohmann::json const& json_config) {
74✔
30
    try {
31
        clear();
74✔
32
        
33
        // Load metadata
34
        if (json_config.contains("metadata")) {
74✔
35
            metadata_ = json_config["metadata"];
59✔
36
        }
37
        
38
        // Load steps
39
        if (!json_config.contains("steps") || !json_config["steps"].is_array()) {
74✔
40
            std::cerr << "Pipeline JSON must contain a 'steps' array" << std::endl;
×
UNCOV
41
            return false;
×
42
        }
43
        
44
        auto const& steps_json = json_config["steps"];
74✔
45
        steps_.reserve(steps_json.size());
74✔
46
        
47
        for (size_t i = 0; i < steps_json.size(); ++i) {
148✔
48
            auto [success, step] = parseStep(steps_json[i], static_cast<int>(i));
74✔
49
            if (!success) {
74✔
50
                std::cerr << "Failed to parse step " << i << std::endl;
×
UNCOV
51
                return false;
×
52
            }
53
            steps_.push_back(std::move(step));
74✔
54
        }
74✔
55
        
56
        // Validate the loaded pipeline
57
        auto validation_errors = validate();
74✔
58
        if (!validation_errors.empty()) {
74✔
59
            std::cerr << "Pipeline validation failed:" << std::endl;
×
60
            for (auto const& error : validation_errors) {
×
UNCOV
61
                std::cerr << "  - " << error << std::endl;
×
62
            }
UNCOV
63
            return false;
×
64
        }
65
        
66
        return true;
74✔
67
    } catch (std::exception const& e) {
74✔
68
        std::cerr << "Error loading pipeline from JSON: " << e.what() << std::endl;
×
69
        return false;
×
UNCOV
70
    }
×
71
}
72

UNCOV
73
bool TransformPipeline::loadFromJsonFile(std::string const& json_file_path) {
×
74
    try {
75
        std::ifstream file(json_file_path);
×
76
        if (!file.is_open()) {
×
77
            std::cerr << "Cannot open pipeline file: " << json_file_path << std::endl;
×
UNCOV
78
            return false;
×
79
        }
80
        
81
        nlohmann::json json_config;
×
82
        file >> json_config;
×
83
        return loadFromJson(json_config);
×
84
    } catch (std::exception const& e) {
×
85
        std::cerr << "Error reading pipeline file '" << json_file_path << "': " << e.what() << std::endl;
×
86
        return false;
×
UNCOV
87
    }
×
88
}
89

90
PipelineResult TransformPipeline::execute(PipelineProgressCallback progress_callback) {
74✔
91
    auto start_time = std::chrono::high_resolution_clock::now();
74✔
92
    
93
    PipelineResult result;
74✔
94
    result.total_steps = static_cast<int>(steps_.size());
74✔
95
    result.step_results.reserve(steps_.size());
74✔
96
    
97
    try {
98
        // Clear temporary data from previous executions
99
        temporary_data_.clear();
74✔
100
        
101
        // Group steps by phase
102
        auto phase_groups = groupStepsByPhase();
74✔
103
        
104
        int completed_steps = 0;
74✔
105
        
106
        // Execute each phase
107
        for (auto const& [phase_number, step_indices] : phase_groups) {
148✔
108
            if (progress_callback) {
74✔
109
                progress_callback(-1, "Starting phase " + std::to_string(phase_number), 0, 
59✔
110
                                (completed_steps * 100) / result.total_steps);
59✔
111
            }
112
            
113
            auto phase_results = executePhase(step_indices, phase_number, progress_callback);
74✔
114
            
115
            // Check if any step in this phase failed
116
            bool phase_failed = false;
74✔
117
            for (auto const& step_result : phase_results) {
148✔
118
                result.step_results.push_back(step_result);
74✔
119
                if (!step_result.success) {
74✔
120
                    phase_failed = true;
×
121
                    result.error_message = "Step failed: " + step_result.error_message;
×
UNCOV
122
                    break;
×
123
                }
124
                completed_steps++;
74✔
125
            }
126
            
127
            if (phase_failed) {
74✔
128
                result.success = false;
×
129
                result.steps_completed = completed_steps;
×
UNCOV
130
                break;
×
131
            }
132
        }
74✔
133
        
134
        if (result.step_results.size() == steps_.size()) {
74✔
135
            result.success = true;
74✔
136
            result.steps_completed = result.total_steps;
74✔
137
            
138
            if (progress_callback) {
74✔
139
                progress_callback(-1, "Pipeline completed", 100, 100);
177✔
140
            }
141
        }
142
        
143
    } catch (std::exception const& e) {
74✔
144
        result.success = false;
×
145
        result.error_message = "Pipeline execution error: " + std::string(e.what());
×
UNCOV
146
    }
×
147
    
148
    auto end_time = std::chrono::high_resolution_clock::now();
74✔
149
    result.total_execution_time_ms = std::chrono::duration<double, std::milli>(end_time - start_time).count();
74✔
150
    
151
    return result;
148✔
UNCOV
152
}
×
153

154
StepResult TransformPipeline::executeStep(PipelineStep const& step, ProgressCallback progress_callback) {
74✔
155
    auto start_time = std::chrono::high_resolution_clock::now();
74✔
156
    
157
    StepResult result;
74✔
158
    result.output_key = step.output_key;
74✔
159
    
160
    try {
161
        // Check if step is enabled
162
        if (!step.enabled) {
74✔
163
            result.success = true; // Disabled steps are considered successful
×
UNCOV
164
            return result;
×
165
        }
166
        
167
        // Get the transform operation
168
        auto* operation = registry_->findOperationByName(step.transform_name);
74✔
169
        if (!operation) {
74✔
170
            result.error_message = "Transform '" + step.transform_name + "' not found in registry";
×
UNCOV
171
            return result;
×
172
        }
173
        
174
        // Get input data
175
        auto [input_success, input_data] = getInputData(step.input_key);
74✔
176
        if (!input_success) {
74✔
177
            result.error_message = "Failed to get input data for key '" + step.input_key + "'";
×
UNCOV
178
            return result;
×
179
        }
180
        
181
        // Check if operation can apply to input data
182
        if (!operation->canApply(input_data)) {
74✔
183
            result.error_message = "Transform '" + step.transform_name + "' cannot be applied to input data";
×
UNCOV
184
            return result;
×
185
        }
186
        
187
        // Create parameters
188
        auto parameters = createParametersFromJson(step.transform_name, step.parameters);
74✔
189
        
190
        // Execute the transform
191
        DataTypeVariant output_data;
74✔
192
        if (progress_callback) {
74✔
193
            output_data = operation->execute(input_data, parameters.get(), progress_callback);
59✔
194
        } else {
195
            output_data = operation->execute(input_data, parameters.get());
15✔
196
        }
197
        
198
        // Check if execution was successful (assuming empty variant means failure)
199
        if (std::visit([](auto const& ptr) { return ptr == nullptr; }, output_data)) {
148✔
200
            result.error_message = "Transform execution returned null result";
×
UNCOV
201
            return result;
×
202
        }
203
        
204
        // Store output data
205
        auto time_key = data_manager_->getTimeKey(step.input_key);
74✔
206
        storeOutputData(step.output_key, output_data, step.step_id, time_key);
74✔
207
        result.result_data = output_data;
74✔
208
        result.success = true;
74✔
209
        
210
    } catch (std::exception const& e) {
74✔
211
        result.error_message = "Step execution error: " + std::string(e.what());
×
UNCOV
212
    }
×
213
    
214
    auto end_time = std::chrono::high_resolution_clock::now();
74✔
215
    result.execution_time_ms = std::chrono::duration<double, std::milli>(end_time - start_time).count();
74✔
216
    
217
    return result;
74✔
UNCOV
218
}
×
219

220
std::vector<std::string> TransformPipeline::validate() const {
74✔
221
    std::vector<std::string> errors;
74✔
222
    
223
    // Check for duplicate step IDs
224
    std::unordered_map<std::string, int> step_id_counts;
74✔
225
    for (auto const& step : steps_) {
148✔
226
        step_id_counts[step.step_id]++;
74✔
227
    }
228
    for (auto const& [step_id, count] : step_id_counts) {
148✔
229
        if (count > 1) {
74✔
UNCOV
230
            errors.push_back("Duplicate step ID: " + step_id);
×
231
        }
232
    }
233
    
234
    // Validate each step
235
    for (size_t i = 0; i < steps_.size(); ++i) {
148✔
236
        auto const& step = steps_[i];
74✔
237
        std::string step_prefix = "Step " + std::to_string(i) + " (" + step.step_id + "): ";
74✔
238
        
239
        // Check if transform exists
240
        if (!registry_->findOperationByName(step.transform_name)) {
74✔
UNCOV
241
            errors.push_back(step_prefix + "Transform '" + step.transform_name + "' not found in registry");
×
242
        }
243
        
244
        // Check input key
245
        if (step.input_key.empty()) {
74✔
UNCOV
246
            errors.push_back(step_prefix + "Input key cannot be empty");
×
247
        }
248
        
249
        // Check step ID
250
        if (step.step_id.empty()) {
74✔
UNCOV
251
            errors.push_back(step_prefix + "Step ID cannot be empty");
×
252
        }
253
        
254
        // Check phase number
255
        if (step.phase < 0) {
74✔
UNCOV
256
            errors.push_back(step_prefix + "Phase number cannot be negative");
×
257
        }
258
    }
74✔
259
    
260
    return errors;
148✔
261
}
74✔
262

263
void TransformPipeline::clear() {
74✔
264
    steps_.clear();
74✔
265
    metadata_ = nlohmann::json::object();
74✔
266
    temporary_data_.clear();
74✔
267
}
74✔
268

269
nlohmann::json TransformPipeline::exportToJson() const {
×
UNCOV
270
    nlohmann::json result;
×
271
    
272
    // Export metadata
UNCOV
273
    result["metadata"] = metadata_;
×
274
    
275
    // Export steps
276
    result["steps"] = nlohmann::json::array();
×
277
    for (auto const& step : steps_) {
×
278
        nlohmann::json step_json;
×
279
        step_json["step_id"] = step.step_id;
×
280
        step_json["transform_name"] = step.transform_name;
×
281
        step_json["input_key"] = step.input_key;
×
282
        step_json["output_key"] = step.output_key;
×
283
        step_json["parameters"] = step.parameters;
×
284
        step_json["phase"] = step.phase;
×
UNCOV
285
        step_json["enabled"] = step.enabled;
×
286
        
287
        if (!step.description.empty()) {
×
UNCOV
288
            step_json["description"] = step.description;
×
289
        }
290
        if (!step.tags.empty()) {
×
UNCOV
291
            step_json["tags"] = step.tags;
×
292
        }
293
        
294
        result["steps"].push_back(step_json);
×
UNCOV
295
    }
×
296
    
297
    return result;
×
UNCOV
298
}
×
299

UNCOV
300
bool TransformPipeline::saveToJsonFile(std::string const& json_file_path) const {
×
301
    try {
302
        std::ofstream file(json_file_path);
×
303
        if (!file.is_open()) {
×
304
            std::cerr << "Cannot create pipeline file: " << json_file_path << std::endl;
×
UNCOV
305
            return false;
×
306
        }
307
        
308
        auto json_data = exportToJson();
×
309
        file << json_data.dump(2); // Pretty print with 2-space indentation
×
310
        return true;
×
311
    } catch (std::exception const& e) {
×
312
        std::cerr << "Error saving pipeline file '" << json_file_path << "': " << e.what() << std::endl;
×
313
        return false;
×
UNCOV
314
    }
×
315
}
316

317
// Private implementation methods
318

319
std::pair<bool, PipelineStep> TransformPipeline::parseStep(nlohmann::json const& step_json, int step_index) {
74✔
320
    PipelineStep step;
74✔
321
    
322
    try {
323
        // Required fields
324
        if (!step_json.contains("step_id") || !step_json["step_id"].is_string()) {
74✔
325
            std::cerr << "Step " << step_index << ": 'step_id' is required and must be a string" << std::endl;
×
UNCOV
326
            return {false, step};
×
327
        }
328
        step.step_id = step_json["step_id"];
74✔
329
        
330
        if (!step_json.contains("transform_name") || !step_json["transform_name"].is_string()) {
74✔
331
            std::cerr << "Step " << step_index << ": 'transform_name' is required and must be a string" << std::endl;
×
UNCOV
332
            return {false, step};
×
333
        }
334
        step.transform_name = step_json["transform_name"];
74✔
335
        
336
        if (!step_json.contains("input_key") || !step_json["input_key"].is_string()) {
74✔
337
            std::cerr << "Step " << step_index << ": 'input_key' is required and must be a string" << std::endl;
×
UNCOV
338
            return {false, step};
×
339
        }
340
        step.input_key = step_json["input_key"];
74✔
341
        
342
        // Optional fields
343
        if (step_json.contains("output_key")) {
74✔
344
            if (step_json["output_key"].is_string()) {
74✔
345
                step.output_key = step_json["output_key"];
74✔
346
            }
347
        }
348
        
349
        if (step_json.contains("parameters")) {
74✔
350
            step.parameters = step_json["parameters"];
74✔
351
        } else {
UNCOV
352
            step.parameters = nlohmann::json::object();
×
353
        }
354
        
355
        if (step_json.contains("phase")) {
74✔
356
            if (step_json["phase"].is_number_integer()) {
59✔
UNCOV
357
                step.phase = step_json["phase"];
×
358
            }
359
        }
360
        
361
        if (step_json.contains("enabled")) {
74✔
362
            if (step_json["enabled"].is_boolean()) {
×
UNCOV
363
                step.enabled = step_json["enabled"];
×
364
            }
365
        }
366
        
367
        if (step_json.contains("description")) {
74✔
368
            if (step_json["description"].is_string()) {
×
UNCOV
369
                step.description = step_json["description"];
×
370
            }
371
        }
372
        
373
        if (step_json.contains("tags")) {
74✔
374
            if (step_json["tags"].is_array()) {
×
375
                for (auto const& tag : step_json["tags"]) {
×
376
                    if (tag.is_string()) {
×
UNCOV
377
                        step.tags.push_back(tag);
×
378
                    }
379
                }
380
            }
381
        }
382
        
383
        return {true, step};
74✔
384
    } catch (std::exception const& e) {
×
385
        std::cerr << "Error parsing step " << step_index << ": " << e.what() << std::endl;
×
386
        return {false, step};
×
UNCOV
387
    }
×
388
}
74✔
389

390
std::unique_ptr<TransformParametersBase> TransformPipeline::createParametersFromJson(
74✔
391
    std::string const& transform_name, 
392
    nlohmann::json const& param_json) {
393
    
394
    // Get the operation to get default parameters
395
    auto* operation = registry_->findOperationByName(transform_name);
74✔
396
    if (!operation) {
74✔
UNCOV
397
        return nullptr;
×
398
    }
399

400
    // Special handling for grouping operations that need EntityGroupManager
401
    if (transform_name == "Group Lines by Proximity") {
74✔
UNCOV
402
        auto* group_manager = data_manager_->getEntityGroupManager();
×
UNCOV
403
        if (!group_manager) {
×
UNCOV
404
            std::cerr << "Error: EntityGroupManager not available for grouping operation" << std::endl;
×
UNCOV
405
            return nullptr;
×
406
        }
407
        
UNCOV
408
        auto parameters = std::make_unique<LineProximityGroupingParameters>(group_manager);
×
409
        
410
        // Set parameters from JSON
UNCOV
411
        for (auto const& [param_name, param_value] : param_json.items()) {
×
UNCOV
412
            if (!setParameterValue(parameters.get(), param_name, param_value, transform_name)) {
×
413
                std::cerr << "Warning: Failed to set parameter '" << param_name 
UNCOV
414
                          << "' for transform '" << transform_name << "'" << std::endl;
×
415
            }
UNCOV
416
        }
×
417
        
UNCOV
418
        return std::move(parameters);
×
UNCOV
419
    }
×
420
    
421
    auto parameters = operation->getDefaultParameters();
74✔
422
    if (!parameters) {
74✔
UNCOV
423
        return nullptr;
×
424
    }
425
    
426
    // Set parameters from JSON
427
    for (auto const& [param_name, param_value] : param_json.items()) {
538✔
428
        if (!setParameterValue(parameters.get(), param_name, param_value, transform_name)) {
232✔
429
            std::cerr << "Warning: Failed to set parameter '" << param_name 
430
                      << "' for transform '" << transform_name << "'" << std::endl;
4✔
431
        }
432
    }
74✔
433
    
434
    return parameters;
74✔
435
}
74✔
436

437
bool TransformPipeline::setParameterValue(TransformParametersBase* param_obj, 
232✔
438
                                         std::string const& param_name, 
439
                                         nlohmann::json const& json_value,
440
                                         std::string const& transform_name) {
441
    
442
    // Use the parameter factory for type conversion
443
    auto& factory = ParameterFactory::getInstance();
232✔
444
    
445
    return factory.setParameter(transform_name, param_obj, param_name, json_value, data_manager_);
232✔
446
}
447

448
std::pair<bool, DataTypeVariant> TransformPipeline::getInputData(std::string const& input_key) {
74✔
449
    // First check temporary data
450
    auto temp_it = temporary_data_.find(input_key);
74✔
451
    if (temp_it != temporary_data_.end()) {
74✔
UNCOV
452
        return {true, temp_it->second};
×
453
    }
454
    
455
    // Then check data manager
456
    auto data_variant = data_manager_->getDataVariant(input_key);
74✔
457
    if (data_variant.has_value()) {
74✔
458
        return {true, data_variant.value()};
74✔
459
    }
460
    
UNCOV
461
    return {false, DataTypeVariant{}};
×
462
}
74✔
463

464
void TransformPipeline::storeOutputData(std::string const& output_key, 
74✔
465
                                       DataTypeVariant const& data, 
466
                                       std::string const& step_id,
467
                                       TimeKey const& time_key) {
468
    if (output_key.empty()) {
74✔
469
        // Store as temporary data using step_id
UNCOV
470
        temporary_data_[step_id + "_output"] = data;
×
471
    } else {
472
        // Store in data manager
473
        data_manager_->setData(output_key, data, time_key);
74✔
474
    }
475
}
74✔
476

477
std::map<int, std::vector<int>> TransformPipeline::groupStepsByPhase() const {
74✔
478
    std::map<int, std::vector<int>> phase_groups;
74✔
479
    
480
    for (size_t i = 0; i < steps_.size(); ++i) {
148✔
481
        phase_groups[steps_[i].phase].push_back(static_cast<int>(i));
74✔
482
    }
483
    
484
    return phase_groups;
74✔
UNCOV
485
}
×
486

487
std::vector<StepResult> TransformPipeline::executePhase(
74✔
488
    std::vector<int> const& phase_steps, 
489
    int,
490
    PipelineProgressCallback progress_callback) {
491
    
492
    std::vector<StepResult> results;
74✔
493
    results.reserve(phase_steps.size());
74✔
494
    
495
    if (phase_steps.size() == 1) {
74✔
496
        // Single step - execute directly
497
        int step_index = phase_steps[0];
74✔
498
        auto const& step = steps_[static_cast<size_t>(step_index)];
74✔
499
        
500
        ProgressCallback step_progress_callback;
74✔
501
        if (progress_callback) {
74✔
502
            step_progress_callback = [this, progress_callback, step_index, step](int step_progress) {
118✔
503
                int overall_progress = (step_index * 100) / static_cast<int>(steps_.size());
206✔
504
                progress_callback(step_index, step.step_id, step_progress, overall_progress);
206✔
505
            };
59✔
506
        }
507
        
508
        results.push_back(executeStep(step, step_progress_callback));
74✔
509
    } else {
74✔
510
        // Multiple steps - execute in parallel
511
        std::vector<std::future<StepResult>> futures;
×
UNCOV
512
        futures.reserve(phase_steps.size());
×
513
        
UNCOV
514
        for (int step_index : phase_steps) {
×
UNCOV
515
            auto const& step = steps_[static_cast<size_t>(step_index)];
×
516
            
UNCOV
517
            ProgressCallback step_progress_callback;
×
UNCOV
518
            if (progress_callback) {
×
UNCOV
519
                step_progress_callback = [this, progress_callback, step_index, step](int step_progress) {
×
UNCOV
520
                    int overall_progress = (step_index * 100) / static_cast<int>(steps_.size());
×
UNCOV
521
                    progress_callback(step_index, step.step_id, step_progress, overall_progress);
×
UNCOV
522
                };
×
523
            }
524
            
UNCOV
525
            futures.push_back(std::async(std::launch::async, 
×
UNCOV
526
                [this, step, step_progress_callback]() {
×
UNCOV
527
                    return executeStep(step, step_progress_callback);
×
528
                }));
UNCOV
529
        }
×
530
        
531
        // Collect results
UNCOV
532
        for (auto& future : futures) {
×
UNCOV
533
            results.push_back(future.get());
×
534
        }
UNCOV
535
    }
×
536
    
537
    return results;
74✔
UNCOV
538
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc