• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

paulmthompson / WhiskerToolbox / 18477247352

13 Oct 2025 08:18PM UTC coverage: 72.391% (+0.4%) from 71.943%
18477247352

push

github

web-flow
Merge pull request #140 from paulmthompson/kdtree

Jules PR

164 of 287 new or added lines in 3 files covered. (57.14%)

350 existing lines in 9 files now uncovered.

51889 of 71679 relevant lines covered (72.39%)

63071.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.17
/src/DataManager/transforms/TransformPipeline.cpp
1
#include "TransformPipeline.hpp"
2

3
#include "DataManager.hpp"
4
#include "ParameterFactory.hpp"
5
#include "TransformRegistry.hpp"
6
#include "transforms/Lines/Line_Proximity_Grouping/line_proximity_grouping.hpp"
7
#include "transforms/grouping_transforms.hpp"
8

9
#include <algorithm>
10
#include <chrono>
11
#include <fstream>
12
#include <iostream>
13
#include <thread>
14
#include <future>
15
#include <type_traits>
16

17
TransformPipeline::TransformPipeline(DataManager* data_manager, TransformRegistry* registry)
74✔
18
    : data_manager_(data_manager), registry_(registry) {
74✔
19
    if (!data_manager_) {
74✔
UNCOV
20
        throw std::invalid_argument("DataManager cannot be null");
×
21
    }
22
    if (!registry_) {
74✔
UNCOV
23
        throw std::invalid_argument("TransformRegistry cannot be null");
×
24
    }
25
    auto& factory = ParameterFactory::getInstance();
74✔
26
    factory.initializeDefaultSetters();
74✔
27

28
}
74✔
29

30
bool TransformPipeline::loadFromJson(nlohmann::json const& json_config) {
74✔
31
    try {
32
        clear();
74✔
33
        
34
        // Load metadata
35
        if (json_config.contains("metadata")) {
74✔
36
            metadata_ = json_config["metadata"];
59✔
37
        }
38
        
39
        // Load steps
40
        if (!json_config.contains("steps") || !json_config["steps"].is_array()) {
74✔
41
            std::cerr << "Pipeline JSON must contain a 'steps' array" << std::endl;
×
UNCOV
42
            return false;
×
43
        }
44
        
45
        auto const& steps_json = json_config["steps"];
74✔
46
        steps_.reserve(steps_json.size());
74✔
47
        
48
        for (size_t i = 0; i < steps_json.size(); ++i) {
148✔
49
            auto [success, step] = parseStep(steps_json[i], static_cast<int>(i));
74✔
50
            if (!success) {
74✔
51
                std::cerr << "Failed to parse step " << i << std::endl;
×
UNCOV
52
                return false;
×
53
            }
54
            steps_.push_back(std::move(step));
74✔
55
        }
74✔
56
        
57
        // Validate the loaded pipeline
58
        auto validation_errors = validate();
74✔
59
        if (!validation_errors.empty()) {
74✔
60
            std::cerr << "Pipeline validation failed:" << std::endl;
×
61
            for (auto const& error : validation_errors) {
×
UNCOV
62
                std::cerr << "  - " << error << std::endl;
×
63
            }
UNCOV
64
            return false;
×
65
        }
66
        
67
        return true;
74✔
68
    } catch (std::exception const& e) {
74✔
69
        std::cerr << "Error loading pipeline from JSON: " << e.what() << std::endl;
×
70
        return false;
×
UNCOV
71
    }
×
72
}
73

UNCOV
74
bool TransformPipeline::loadFromJsonFile(std::string const& json_file_path) {
×
75
    try {
76
        std::ifstream file(json_file_path);
×
77
        if (!file.is_open()) {
×
78
            std::cerr << "Cannot open pipeline file: " << json_file_path << std::endl;
×
UNCOV
79
            return false;
×
80
        }
81
        
82
        nlohmann::json json_config;
×
83
        file >> json_config;
×
84
        return loadFromJson(json_config);
×
85
    } catch (std::exception const& e) {
×
86
        std::cerr << "Error reading pipeline file '" << json_file_path << "': " << e.what() << std::endl;
×
87
        return false;
×
UNCOV
88
    }
×
89
}
90

91
PipelineResult TransformPipeline::execute(PipelineProgressCallback progress_callback) {
74✔
92
    auto start_time = std::chrono::high_resolution_clock::now();
74✔
93
    
94
    PipelineResult result;
74✔
95
    result.total_steps = static_cast<int>(steps_.size());
74✔
96
    result.step_results.reserve(steps_.size());
74✔
97
    
98
    try {
99
        // Clear temporary data from previous executions
100
        temporary_data_.clear();
74✔
101
        
102
        // Group steps by phase
103
        auto phase_groups = groupStepsByPhase();
74✔
104
        
105
        int completed_steps = 0;
74✔
106
        
107
        // Execute each phase
108
        for (auto const& [phase_number, step_indices] : phase_groups) {
148✔
109
            if (progress_callback) {
74✔
110
                progress_callback(-1, "Starting phase " + std::to_string(phase_number), 0, 
59✔
111
                                (completed_steps * 100) / result.total_steps);
59✔
112
            }
113
            
114
            auto phase_results = executePhase(step_indices, phase_number, progress_callback);
74✔
115
            
116
            // Check if any step in this phase failed
117
            bool phase_failed = false;
74✔
118
            for (auto const& step_result : phase_results) {
148✔
119
                result.step_results.push_back(step_result);
74✔
120
                if (!step_result.success) {
74✔
121
                    phase_failed = true;
×
122
                    result.error_message = "Step failed: " + step_result.error_message;
×
UNCOV
123
                    break;
×
124
                }
125
                completed_steps++;
74✔
126
            }
127
            
128
            if (phase_failed) {
74✔
129
                result.success = false;
×
130
                result.steps_completed = completed_steps;
×
UNCOV
131
                break;
×
132
            }
133
        }
74✔
134
        
135
        if (result.step_results.size() == steps_.size()) {
74✔
136
            result.success = true;
74✔
137
            result.steps_completed = result.total_steps;
74✔
138
            
139
            if (progress_callback) {
74✔
140
                progress_callback(-1, "Pipeline completed", 100, 100);
177✔
141
            }
142
        }
143
        
144
    } catch (std::exception const& e) {
74✔
145
        result.success = false;
×
146
        result.error_message = "Pipeline execution error: " + std::string(e.what());
×
UNCOV
147
    }
×
148
    
149
    auto end_time = std::chrono::high_resolution_clock::now();
74✔
150
    result.total_execution_time_ms = std::chrono::duration<double, std::milli>(end_time - start_time).count();
74✔
151
    
152
    return result;
148✔
UNCOV
153
}
×
154

155
StepResult TransformPipeline::executeStep(PipelineStep const& step, ProgressCallback progress_callback) {
74✔
156
    auto start_time = std::chrono::high_resolution_clock::now();
74✔
157
    
158
    StepResult result;
74✔
159
    result.output_key = step.output_key;
74✔
160
    
161
    try {
162
        // Check if step is enabled
163
        if (!step.enabled) {
74✔
164
            result.success = true; // Disabled steps are considered successful
×
UNCOV
165
            return result;
×
166
        }
167
        
168
        // Get the transform operation
169
        auto* operation = registry_->findOperationByName(step.transform_name);
74✔
170
        if (!operation) {
74✔
171
            result.error_message = "Transform '" + step.transform_name + "' not found in registry";
×
UNCOV
172
            return result;
×
173
        }
174
        
175
        // Get input data
176
        auto [input_success, input_data] = getInputData(step.input_key);
74✔
177
        if (!input_success) {
74✔
178
            result.error_message = "Failed to get input data for key '" + step.input_key + "'";
×
UNCOV
179
            return result;
×
180
        }
181
        
182
        // Check if operation can apply to input data
183
        if (!operation->canApply(input_data)) {
74✔
184
            result.error_message = "Transform '" + step.transform_name + "' cannot be applied to input data";
×
UNCOV
185
            return result;
×
186
        }
187
        
188
        // Create parameters
189
        auto parameters = createParametersFromJson(step.transform_name, step.parameters);
74✔
190
        
191
        // Execute the transform
192
        DataTypeVariant output_data;
74✔
193
        if (progress_callback) {
74✔
194
            output_data = operation->execute(input_data, parameters.get(), progress_callback);
59✔
195
        } else {
196
            output_data = operation->execute(input_data, parameters.get());
15✔
197
        }
198
        
199
        // Check if execution was successful (assuming empty variant means failure)
200
        if (std::visit([](auto const& ptr) { return ptr == nullptr; }, output_data)) {
148✔
201
            result.error_message = "Transform execution returned null result";
×
UNCOV
202
            return result;
×
203
        }
204
        
205
        // Store output data
206
        auto time_key = data_manager_->getTimeKey(step.input_key);
74✔
207
        storeOutputData(step.output_key, output_data, step.step_id, time_key);
74✔
208
        result.result_data = output_data;
74✔
209
        result.success = true;
74✔
210
        
211
    } catch (std::exception const& e) {
74✔
212
        result.error_message = "Step execution error: " + std::string(e.what());
×
UNCOV
213
    }
×
214
    
215
    auto end_time = std::chrono::high_resolution_clock::now();
74✔
216
    result.execution_time_ms = std::chrono::duration<double, std::milli>(end_time - start_time).count();
74✔
217
    
218
    return result;
74✔
UNCOV
219
}
×
220

221
std::vector<std::string> TransformPipeline::validate() const {
74✔
222
    std::vector<std::string> errors;
74✔
223
    
224
    // Check for duplicate step IDs
225
    std::unordered_map<std::string, int> step_id_counts;
74✔
226
    for (auto const& step : steps_) {
148✔
227
        step_id_counts[step.step_id]++;
74✔
228
    }
229
    for (auto const& [step_id, count] : step_id_counts) {
148✔
230
        if (count > 1) {
74✔
UNCOV
231
            errors.push_back("Duplicate step ID: " + step_id);
×
232
        }
233
    }
234
    
235
    // Validate each step
236
    for (size_t i = 0; i < steps_.size(); ++i) {
148✔
237
        auto const& step = steps_[i];
74✔
238
        std::string step_prefix = "Step " + std::to_string(i) + " (" + step.step_id + "): ";
74✔
239
        
240
        // Check if transform exists
241
        if (!registry_->findOperationByName(step.transform_name)) {
74✔
UNCOV
242
            errors.push_back(step_prefix + "Transform '" + step.transform_name + "' not found in registry");
×
243
        }
244
        
245
        // Check input key
246
        if (step.input_key.empty()) {
74✔
UNCOV
247
            errors.push_back(step_prefix + "Input key cannot be empty");
×
248
        }
249
        
250
        // Check step ID
251
        if (step.step_id.empty()) {
74✔
UNCOV
252
            errors.push_back(step_prefix + "Step ID cannot be empty");
×
253
        }
254
        
255
        // Check phase number
256
        if (step.phase < 0) {
74✔
UNCOV
257
            errors.push_back(step_prefix + "Phase number cannot be negative");
×
258
        }
259
    }
74✔
260
    
261
    return errors;
148✔
262
}
74✔
263

264
void TransformPipeline::clear() {
74✔
265
    steps_.clear();
74✔
266
    metadata_ = nlohmann::json::object();
74✔
267
    temporary_data_.clear();
74✔
268
}
74✔
269

270
nlohmann::json TransformPipeline::exportToJson() const {
×
UNCOV
271
    nlohmann::json result;
×
272
    
273
    // Export metadata
UNCOV
274
    result["metadata"] = metadata_;
×
275
    
276
    // Export steps
277
    result["steps"] = nlohmann::json::array();
×
278
    for (auto const& step : steps_) {
×
279
        nlohmann::json step_json;
×
280
        step_json["step_id"] = step.step_id;
×
281
        step_json["transform_name"] = step.transform_name;
×
282
        step_json["input_key"] = step.input_key;
×
283
        step_json["output_key"] = step.output_key;
×
284
        step_json["parameters"] = step.parameters;
×
285
        step_json["phase"] = step.phase;
×
UNCOV
286
        step_json["enabled"] = step.enabled;
×
287
        
288
        if (!step.description.empty()) {
×
UNCOV
289
            step_json["description"] = step.description;
×
290
        }
291
        if (!step.tags.empty()) {
×
UNCOV
292
            step_json["tags"] = step.tags;
×
293
        }
294
        
295
        result["steps"].push_back(step_json);
×
UNCOV
296
    }
×
297
    
298
    return result;
×
UNCOV
299
}
×
300

UNCOV
301
bool TransformPipeline::saveToJsonFile(std::string const& json_file_path) const {
×
302
    try {
303
        std::ofstream file(json_file_path);
×
304
        if (!file.is_open()) {
×
305
            std::cerr << "Cannot create pipeline file: " << json_file_path << std::endl;
×
UNCOV
306
            return false;
×
307
        }
308
        
309
        auto json_data = exportToJson();
×
310
        file << json_data.dump(2); // Pretty print with 2-space indentation
×
311
        return true;
×
312
    } catch (std::exception const& e) {
×
313
        std::cerr << "Error saving pipeline file '" << json_file_path << "': " << e.what() << std::endl;
×
314
        return false;
×
UNCOV
315
    }
×
316
}
317

318
// Private implementation methods
319

320
std::pair<bool, PipelineStep> TransformPipeline::parseStep(nlohmann::json const& step_json, int step_index) {
74✔
321
    PipelineStep step;
74✔
322
    
323
    try {
324
        // Required fields
325
        if (!step_json.contains("step_id") || !step_json["step_id"].is_string()) {
74✔
326
            std::cerr << "Step " << step_index << ": 'step_id' is required and must be a string" << std::endl;
×
UNCOV
327
            return {false, step};
×
328
        }
329
        step.step_id = step_json["step_id"];
74✔
330
        
331
        if (!step_json.contains("transform_name") || !step_json["transform_name"].is_string()) {
74✔
332
            std::cerr << "Step " << step_index << ": 'transform_name' is required and must be a string" << std::endl;
×
UNCOV
333
            return {false, step};
×
334
        }
335
        step.transform_name = step_json["transform_name"];
74✔
336
        
337
        if (!step_json.contains("input_key") || !step_json["input_key"].is_string()) {
74✔
338
            std::cerr << "Step " << step_index << ": 'input_key' is required and must be a string" << std::endl;
×
UNCOV
339
            return {false, step};
×
340
        }
341
        step.input_key = step_json["input_key"];
74✔
342
        
343
        // Optional fields
344
        if (step_json.contains("output_key")) {
74✔
345
            if (step_json["output_key"].is_string()) {
74✔
346
                step.output_key = step_json["output_key"];
74✔
347
            }
348
        }
349
        
350
        if (step_json.contains("parameters")) {
74✔
351
            step.parameters = step_json["parameters"];
74✔
352
        } else {
UNCOV
353
            step.parameters = nlohmann::json::object();
×
354
        }
355
        
356
        if (step_json.contains("phase")) {
74✔
357
            if (step_json["phase"].is_number_integer()) {
59✔
UNCOV
358
                step.phase = step_json["phase"];
×
359
            }
360
        }
361
        
362
        if (step_json.contains("enabled")) {
74✔
363
            if (step_json["enabled"].is_boolean()) {
×
UNCOV
364
                step.enabled = step_json["enabled"];
×
365
            }
366
        }
367
        
368
        if (step_json.contains("description")) {
74✔
369
            if (step_json["description"].is_string()) {
×
UNCOV
370
                step.description = step_json["description"];
×
371
            }
372
        }
373
        
374
        if (step_json.contains("tags")) {
74✔
375
            if (step_json["tags"].is_array()) {
×
376
                for (auto const& tag : step_json["tags"]) {
×
377
                    if (tag.is_string()) {
×
UNCOV
378
                        step.tags.push_back(tag);
×
379
                    }
380
                }
381
            }
382
        }
383
        
384
        return {true, step};
74✔
385
    } catch (std::exception const& e) {
×
386
        std::cerr << "Error parsing step " << step_index << ": " << e.what() << std::endl;
×
387
        return {false, step};
×
UNCOV
388
    }
×
389
}
74✔
390

391
std::unique_ptr<TransformParametersBase> TransformPipeline::createParametersFromJson(
74✔
392
    std::string const& transform_name, 
393
    nlohmann::json const& param_json) {
394
    
395
    // Get the operation to get default parameters
396
    auto* operation = registry_->findOperationByName(transform_name);
74✔
397
    if (!operation) {
74✔
UNCOV
398
        return nullptr;
×
399
    }
400

401
    // Special handling for grouping operations that need EntityGroupManager
402
    if (transform_name == "Group Lines by Proximity") {
74✔
403
        auto* group_manager = data_manager_->getEntityGroupManager();
×
404
        if (!group_manager) {
×
405
            std::cerr << "Error: EntityGroupManager not available for grouping operation" << std::endl;
×
UNCOV
406
            return nullptr;
×
407
        }
408
        
UNCOV
409
        auto parameters = std::make_unique<LineProximityGroupingParameters>(group_manager);
×
410
        
411
        // Set parameters from JSON
412
        for (auto const& [param_name, param_value] : param_json.items()) {
×
UNCOV
413
            if (!setParameterValue(parameters.get(), param_name, param_value, transform_name)) {
×
414
                std::cerr << "Warning: Failed to set parameter '" << param_name 
UNCOV
415
                          << "' for transform '" << transform_name << "'" << std::endl;
×
416
            }
UNCOV
417
        }
×
418
        
419
        return std::move(parameters);
×
UNCOV
420
    }
×
421
    
422
    // Get default parameters first
423
    auto parameters = operation->getDefaultParameters();
74✔
424
    if (!parameters) {
74✔
UNCOV
425
        return nullptr;
×
426
    }
427
    
428
    // Check if this is a grouping operation that needs EntityGroupManager
429
    auto* grouping_params = dynamic_cast<GroupingTransformParametersBase*>(parameters.get());
74✔
430
    if (grouping_params) {
74✔
431
        auto* group_manager = data_manager_->getEntityGroupManager();
3✔
432
        if (!group_manager) {
3✔
433
            std::cerr << "Error: EntityGroupManager not available for grouping operation '" 
UNCOV
434
                      << transform_name << "'" << std::endl;
×
UNCOV
435
            return nullptr;
×
436
        }
437
        
438
        // Set the group manager
439
        grouping_params->setGroupManager(group_manager);
3✔
440
    }
441
    
442
    // Set parameters from JSON
443
    for (auto const& [param_name, param_value] : param_json.items()) {
306✔
444
        if (!setParameterValue(parameters.get(), param_name, param_value, transform_name)) {
232✔
445
            std::cerr << "Warning: Failed to set parameter '" << param_name 
446
                      << "' for transform '" << transform_name << "'" << std::endl;
4✔
447
        }
448
    }
74✔
449
    
450
    return parameters;
74✔
451
}
74✔
452

453
bool TransformPipeline::setParameterValue(TransformParametersBase* param_obj, 
232✔
454
                                         std::string const& param_name, 
455
                                         nlohmann::json const& json_value,
456
                                         std::string const& transform_name) {
457
    
458
    // Use the parameter factory for type conversion
459
    auto& factory = ParameterFactory::getInstance();
232✔
460
    
461
    return factory.setParameter(transform_name, param_obj, param_name, json_value, data_manager_);
232✔
462
}
463

464
std::pair<bool, DataTypeVariant> TransformPipeline::getInputData(std::string const& input_key) {
74✔
465
    // First check temporary data
466
    auto temp_it = temporary_data_.find(input_key);
74✔
467
    if (temp_it != temporary_data_.end()) {
74✔
UNCOV
468
        return {true, temp_it->second};
×
469
    }
470
    
471
    // Then check data manager
472
    auto data_variant = data_manager_->getDataVariant(input_key);
74✔
473
    if (data_variant.has_value()) {
74✔
474
        return {true, data_variant.value()};
74✔
475
    }
476
    
UNCOV
477
    return {false, DataTypeVariant{}};
×
478
}
74✔
479

480
void TransformPipeline::storeOutputData(std::string const& output_key, 
74✔
481
                                       DataTypeVariant const& data, 
482
                                       std::string const& step_id,
483
                                       TimeKey const& time_key) {
484
    if (output_key.empty()) {
74✔
485
        // Store as temporary data using step_id
UNCOV
486
        temporary_data_[step_id + "_output"] = data;
×
487
    } else {
488
        // Store in data manager
489
        data_manager_->setData(output_key, data, time_key);
74✔
490
    }
491
}
74✔
492

493
std::map<int, std::vector<int>> TransformPipeline::groupStepsByPhase() const {
74✔
494
    std::map<int, std::vector<int>> phase_groups;
74✔
495
    
496
    for (size_t i = 0; i < steps_.size(); ++i) {
148✔
497
        phase_groups[steps_[i].phase].push_back(static_cast<int>(i));
74✔
498
    }
499
    
500
    return phase_groups;
74✔
UNCOV
501
}
×
502

503
std::vector<StepResult> TransformPipeline::executePhase(
74✔
504
    std::vector<int> const& phase_steps, 
505
    int,
506
    PipelineProgressCallback progress_callback) {
507
    
508
    std::vector<StepResult> results;
74✔
509
    results.reserve(phase_steps.size());
74✔
510
    
511
    if (phase_steps.size() == 1) {
74✔
512
        // Single step - execute directly
513
        int step_index = phase_steps[0];
74✔
514
        auto const& step = steps_[static_cast<size_t>(step_index)];
74✔
515
        
516
        ProgressCallback step_progress_callback;
74✔
517
        if (progress_callback) {
74✔
518
            step_progress_callback = [this, progress_callback, step_index, step](int step_progress) {
118✔
519
                int overall_progress = (step_index * 100) / static_cast<int>(steps_.size());
206✔
520
                progress_callback(step_index, step.step_id, step_progress, overall_progress);
206✔
521
            };
59✔
522
        }
523
        
524
        results.push_back(executeStep(step, step_progress_callback));
74✔
525
    } else {
74✔
526
        // Multiple steps - execute in parallel
527
        std::vector<std::future<StepResult>> futures;
×
UNCOV
528
        futures.reserve(phase_steps.size());
×
529
        
UNCOV
530
        for (int step_index : phase_steps) {
×
UNCOV
531
            auto const& step = steps_[static_cast<size_t>(step_index)];
×
532
            
533
            ProgressCallback step_progress_callback;
×
UNCOV
534
            if (progress_callback) {
×
535
                step_progress_callback = [this, progress_callback, step_index, step](int step_progress) {
×
UNCOV
536
                    int overall_progress = (step_index * 100) / static_cast<int>(steps_.size());
×
UNCOV
537
                    progress_callback(step_index, step.step_id, step_progress, overall_progress);
×
538
                };
×
539
            }
540
            
UNCOV
541
            futures.push_back(std::async(std::launch::async, 
×
UNCOV
542
                [this, step, step_progress_callback]() {
×
UNCOV
543
                    return executeStep(step, step_progress_callback);
×
544
                }));
UNCOV
545
        }
×
546
        
547
        // Collect results
UNCOV
548
        for (auto& future : futures) {
×
UNCOV
549
            results.push_back(future.get());
×
550
        }
UNCOV
551
    }
×
552
    
553
    return results;
74✔
UNCOV
554
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc