• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

paulmthompson / WhiskerToolbox / 17335530769

29 Aug 2025 09:57PM UTC coverage: 66.478% (+0.3%) from 66.194%
17335530769

push

github

paulmthompson
update testing for analog interval threshold

113 of 116 new or added lines in 3 files covered. (97.41%)

103 existing lines in 6 files now uncovered.

27064 of 40711 relevant lines covered (66.48%)

1114.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.63
/src/DataManager/transforms/TransformPipeline.cpp
1
#include "TransformPipeline.hpp"
2

3
#include "DataManager.hpp"
4
#include "ParameterFactory.hpp"
5
#include "TransformRegistry.hpp"
6

7
#include <algorithm>
8
#include <chrono>
9
#include <fstream>
10
#include <iostream>
11
#include <thread>
12
#include <future>
13
#include <type_traits>
14

15
TransformPipeline::TransformPipeline(DataManager* data_manager, TransformRegistry* registry)
12✔
16
    : data_manager_(data_manager), registry_(registry) {
12✔
17
    if (!data_manager_) {
12✔
18
        throw std::invalid_argument("DataManager cannot be null");
×
19
    }
20
    if (!registry_) {
12✔
21
        throw std::invalid_argument("TransformRegistry cannot be null");
×
22
    }
23
    auto& factory = ParameterFactory::getInstance();
12✔
24
    factory.initializeDefaultSetters();
12✔
25

26
}
12✔
27

28
bool TransformPipeline::loadFromJson(nlohmann::json const& json_config) {
12✔
29
    try {
30
        clear();
12✔
31
        
32
        // Load metadata
33
        if (json_config.contains("metadata")) {
12✔
34
            metadata_ = json_config["metadata"];
9✔
35
        }
36
        
37
        // Load steps
38
        if (!json_config.contains("steps") || !json_config["steps"].is_array()) {
12✔
UNCOV
39
            std::cerr << "Pipeline JSON must contain a 'steps' array" << std::endl;
×
UNCOV
40
            return false;
×
41
        }
42
        
43
        auto const& steps_json = json_config["steps"];
12✔
44
        steps_.reserve(steps_json.size());
12✔
45
        
46
        for (size_t i = 0; i < steps_json.size(); ++i) {
24✔
47
            auto [success, step] = parseStep(steps_json[i], static_cast<int>(i));
12✔
48
            if (!success) {
12✔
UNCOV
49
                std::cerr << "Failed to parse step " << i << std::endl;
×
UNCOV
50
                return false;
×
51
            }
52
            steps_.push_back(std::move(step));
12✔
53
        }
12✔
54
        
55
        // Validate the loaded pipeline
56
        auto validation_errors = validate();
12✔
57
        if (!validation_errors.empty()) {
12✔
UNCOV
58
            std::cerr << "Pipeline validation failed:" << std::endl;
×
59
            for (auto const& error : validation_errors) {
×
UNCOV
60
                std::cerr << "  - " << error << std::endl;
×
61
            }
UNCOV
62
            return false;
×
63
        }
64
        
65
        return true;
12✔
66
    } catch (std::exception const& e) {
12✔
UNCOV
67
        std::cerr << "Error loading pipeline from JSON: " << e.what() << std::endl;
×
UNCOV
68
        return false;
×
69
    }
×
70
}
71

72
bool TransformPipeline::loadFromJsonFile(std::string const& json_file_path) {
×
73
    try {
74
        std::ifstream file(json_file_path);
×
UNCOV
75
        if (!file.is_open()) {
×
UNCOV
76
            std::cerr << "Cannot open pipeline file: " << json_file_path << std::endl;
×
77
            return false;
×
78
        }
79
        
80
        nlohmann::json json_config;
×
81
        file >> json_config;
×
82
        return loadFromJson(json_config);
×
83
    } catch (std::exception const& e) {
×
UNCOV
84
        std::cerr << "Error reading pipeline file '" << json_file_path << "': " << e.what() << std::endl;
×
UNCOV
85
        return false;
×
UNCOV
86
    }
×
87
}
88

89
PipelineResult TransformPipeline::execute(PipelineProgressCallback progress_callback) {
12✔
90
    auto start_time = std::chrono::high_resolution_clock::now();
12✔
91
    
92
    PipelineResult result;
12✔
93
    result.total_steps = static_cast<int>(steps_.size());
12✔
94
    result.step_results.reserve(steps_.size());
12✔
95
    
96
    try {
97
        // Clear temporary data from previous executions
98
        temporary_data_.clear();
12✔
99
        
100
        // Group steps by phase
101
        auto phase_groups = groupStepsByPhase();
12✔
102
        
103
        int completed_steps = 0;
12✔
104
        
105
        // Execute each phase
106
        for (auto const& [phase_number, step_indices] : phase_groups) {
24✔
107
            if (progress_callback) {
12✔
108
                progress_callback(-1, "Starting phase " + std::to_string(phase_number), 0, 
9✔
109
                                (completed_steps * 100) / result.total_steps);
9✔
110
            }
111
            
112
            auto phase_results = executePhase(step_indices, phase_number, progress_callback);
12✔
113
            
114
            // Check if any step in this phase failed
115
            bool phase_failed = false;
12✔
116
            for (auto const& step_result : phase_results) {
24✔
117
                result.step_results.push_back(step_result);
12✔
118
                if (!step_result.success) {
12✔
UNCOV
119
                    phase_failed = true;
×
UNCOV
120
                    result.error_message = "Step failed: " + step_result.error_message;
×
UNCOV
121
                    break;
×
122
                }
123
                completed_steps++;
12✔
124
            }
125
            
126
            if (phase_failed) {
12✔
UNCOV
127
                result.success = false;
×
UNCOV
128
                result.steps_completed = completed_steps;
×
UNCOV
129
                break;
×
130
            }
131
        }
12✔
132
        
133
        if (result.step_results.size() == steps_.size()) {
12✔
134
            result.success = true;
12✔
135
            result.steps_completed = result.total_steps;
12✔
136
            
137
            if (progress_callback) {
12✔
138
                progress_callback(-1, "Pipeline completed", 100, 100);
27✔
139
            }
140
        }
141
        
142
    } catch (std::exception const& e) {
12✔
UNCOV
143
        result.success = false;
×
UNCOV
144
        result.error_message = "Pipeline execution error: " + std::string(e.what());
×
UNCOV
145
    }
×
146
    
147
    auto end_time = std::chrono::high_resolution_clock::now();
12✔
148
    result.total_execution_time_ms = std::chrono::duration<double, std::milli>(end_time - start_time).count();
12✔
149
    
150
    return result;
24✔
UNCOV
151
}
×
152

153
StepResult TransformPipeline::executeStep(PipelineStep const& step, ProgressCallback progress_callback) {
12✔
154
    auto start_time = std::chrono::high_resolution_clock::now();
12✔
155
    
156
    StepResult result;
12✔
157
    result.output_key = step.output_key;
12✔
158
    
159
    try {
160
        // Check if step is enabled
161
        if (!step.enabled) {
12✔
UNCOV
162
            result.success = true; // Disabled steps are considered successful
×
UNCOV
163
            return result;
×
164
        }
165
        
166
        // Get the transform operation
167
        auto* operation = registry_->findOperationByName(step.transform_name);
12✔
168
        if (!operation) {
12✔
UNCOV
169
            result.error_message = "Transform '" + step.transform_name + "' not found in registry";
×
UNCOV
170
            return result;
×
171
        }
172
        
173
        // Get input data
174
        auto [input_success, input_data] = getInputData(step.input_key);
12✔
175
        if (!input_success) {
12✔
UNCOV
176
            result.error_message = "Failed to get input data for key '" + step.input_key + "'";
×
UNCOV
177
            return result;
×
178
        }
179
        
180
        // Check if operation can apply to input data
181
        if (!operation->canApply(input_data)) {
12✔
UNCOV
182
            result.error_message = "Transform '" + step.transform_name + "' cannot be applied to input data";
×
UNCOV
183
            return result;
×
184
        }
185
        
186
        // Create parameters
187
        auto parameters = createParametersFromJson(step.transform_name, step.parameters);
12✔
188
        
189
        // Execute the transform
190
        DataTypeVariant output_data;
12✔
191
        if (progress_callback) {
12✔
192
            output_data = operation->execute(input_data, parameters.get(), progress_callback);
9✔
193
        } else {
194
            output_data = operation->execute(input_data, parameters.get());
3✔
195
        }
196
        
197
        // Check if execution was successful (assuming empty variant means failure)
198
        if (std::visit([](auto const& ptr) { return ptr == nullptr; }, output_data)) {
24✔
UNCOV
199
            result.error_message = "Transform execution returned null result";
×
UNCOV
200
            return result;
×
201
        }
202
        
203
        // Store output data
204
        auto time_key = data_manager_->getTimeKey(step.input_key);
12✔
205
        storeOutputData(step.output_key, output_data, step.step_id, time_key);
12✔
206
        result.result_data = output_data;
12✔
207
        result.success = true;
12✔
208
        
209
    } catch (std::exception const& e) {
12✔
UNCOV
210
        result.error_message = "Step execution error: " + std::string(e.what());
×
UNCOV
211
    }
×
212
    
213
    auto end_time = std::chrono::high_resolution_clock::now();
12✔
214
    result.execution_time_ms = std::chrono::duration<double, std::milli>(end_time - start_time).count();
12✔
215
    
216
    return result;
12✔
UNCOV
217
}
×
218

219
std::vector<std::string> TransformPipeline::validate() const {
12✔
220
    std::vector<std::string> errors;
12✔
221
    
222
    // Check for duplicate step IDs
223
    std::unordered_map<std::string, int> step_id_counts;
12✔
224
    for (auto const& step : steps_) {
24✔
225
        step_id_counts[step.step_id]++;
12✔
226
    }
227
    for (auto const& [step_id, count] : step_id_counts) {
24✔
228
        if (count > 1) {
12✔
UNCOV
229
            errors.push_back("Duplicate step ID: " + step_id);
×
230
        }
231
    }
232
    
233
    // Validate each step
234
    for (size_t i = 0; i < steps_.size(); ++i) {
24✔
235
        auto const& step = steps_[i];
12✔
236
        std::string step_prefix = "Step " + std::to_string(i) + " (" + step.step_id + "): ";
12✔
237
        
238
        // Check if transform exists
239
        if (!registry_->findOperationByName(step.transform_name)) {
12✔
UNCOV
240
            errors.push_back(step_prefix + "Transform '" + step.transform_name + "' not found in registry");
×
241
        }
242
        
243
        // Check input key
244
        if (step.input_key.empty()) {
12✔
UNCOV
245
            errors.push_back(step_prefix + "Input key cannot be empty");
×
246
        }
247
        
248
        // Check step ID
249
        if (step.step_id.empty()) {
12✔
UNCOV
250
            errors.push_back(step_prefix + "Step ID cannot be empty");
×
251
        }
252
        
253
        // Check phase number
254
        if (step.phase < 0) {
12✔
UNCOV
255
            errors.push_back(step_prefix + "Phase number cannot be negative");
×
256
        }
257
    }
12✔
258
    
259
    return errors;
24✔
260
}
12✔
261

262
void TransformPipeline::clear() {
12✔
263
    steps_.clear();
12✔
264
    metadata_ = nlohmann::json::object();
12✔
265
    temporary_data_.clear();
12✔
266
}
12✔
267

UNCOV
268
nlohmann::json TransformPipeline::exportToJson() const {
×
269
    nlohmann::json result;
×
270
    
271
    // Export metadata
272
    result["metadata"] = metadata_;
×
273
    
274
    // Export steps
275
    result["steps"] = nlohmann::json::array();
×
276
    for (auto const& step : steps_) {
×
277
        nlohmann::json step_json;
×
278
        step_json["step_id"] = step.step_id;
×
279
        step_json["transform_name"] = step.transform_name;
×
280
        step_json["input_key"] = step.input_key;
×
281
        step_json["output_key"] = step.output_key;
×
UNCOV
282
        step_json["parameters"] = step.parameters;
×
283
        step_json["phase"] = step.phase;
×
284
        step_json["enabled"] = step.enabled;
×
285
        
286
        if (!step.description.empty()) {
×
287
            step_json["description"] = step.description;
×
288
        }
UNCOV
289
        if (!step.tags.empty()) {
×
290
            step_json["tags"] = step.tags;
×
291
        }
292
        
293
        result["steps"].push_back(step_json);
×
294
    }
×
295
    
296
    return result;
×
UNCOV
297
}
×
298

299
bool TransformPipeline::saveToJsonFile(std::string const& json_file_path) const {
×
300
    try {
301
        std::ofstream file(json_file_path);
×
UNCOV
302
        if (!file.is_open()) {
×
UNCOV
303
            std::cerr << "Cannot create pipeline file: " << json_file_path << std::endl;
×
304
            return false;
×
305
        }
306
        
307
        auto json_data = exportToJson();
×
308
        file << json_data.dump(2); // Pretty print with 2-space indentation
×
309
        return true;
×
310
    } catch (std::exception const& e) {
×
UNCOV
311
        std::cerr << "Error saving pipeline file '" << json_file_path << "': " << e.what() << std::endl;
×
UNCOV
312
        return false;
×
UNCOV
313
    }
×
314
}
315

316
// Private implementation methods
317

318
std::pair<bool, PipelineStep> TransformPipeline::parseStep(nlohmann::json const& step_json, int step_index) {
12✔
319
    PipelineStep step;
12✔
320
    
321
    try {
322
        // Required fields
323
        if (!step_json.contains("step_id") || !step_json["step_id"].is_string()) {
12✔
UNCOV
324
            std::cerr << "Step " << step_index << ": 'step_id' is required and must be a string" << std::endl;
×
UNCOV
325
            return {false, step};
×
326
        }
327
        step.step_id = step_json["step_id"];
12✔
328
        
329
        if (!step_json.contains("transform_name") || !step_json["transform_name"].is_string()) {
12✔
UNCOV
330
            std::cerr << "Step " << step_index << ": 'transform_name' is required and must be a string" << std::endl;
×
UNCOV
331
            return {false, step};
×
332
        }
333
        step.transform_name = step_json["transform_name"];
12✔
334
        
335
        if (!step_json.contains("input_key") || !step_json["input_key"].is_string()) {
12✔
UNCOV
336
            std::cerr << "Step " << step_index << ": 'input_key' is required and must be a string" << std::endl;
×
UNCOV
337
            return {false, step};
×
338
        }
339
        step.input_key = step_json["input_key"];
12✔
340
        
341
        // Optional fields
342
        if (step_json.contains("output_key")) {
12✔
343
            if (step_json["output_key"].is_string()) {
12✔
344
                step.output_key = step_json["output_key"];
12✔
345
            }
346
        }
347
        
348
        if (step_json.contains("parameters")) {
12✔
349
            step.parameters = step_json["parameters"];
12✔
350
        } else {
UNCOV
351
            step.parameters = nlohmann::json::object();
×
352
        }
353
        
354
        if (step_json.contains("phase")) {
12✔
355
            if (step_json["phase"].is_number_integer()) {
9✔
UNCOV
356
                step.phase = step_json["phase"];
×
357
            }
358
        }
359
        
360
        if (step_json.contains("enabled")) {
12✔
UNCOV
361
            if (step_json["enabled"].is_boolean()) {
×
UNCOV
362
                step.enabled = step_json["enabled"];
×
363
            }
364
        }
365
        
366
        if (step_json.contains("description")) {
12✔
UNCOV
367
            if (step_json["description"].is_string()) {
×
UNCOV
368
                step.description = step_json["description"];
×
369
            }
370
        }
371
        
372
        if (step_json.contains("tags")) {
12✔
373
            if (step_json["tags"].is_array()) {
×
UNCOV
374
                for (auto const& tag : step_json["tags"]) {
×
UNCOV
375
                    if (tag.is_string()) {
×
UNCOV
376
                        step.tags.push_back(tag);
×
377
                    }
378
                }
379
            }
380
        }
381
        
382
        return {true, step};
12✔
383
    } catch (std::exception const& e) {
×
UNCOV
384
        std::cerr << "Error parsing step " << step_index << ": " << e.what() << std::endl;
×
UNCOV
385
        return {false, step};
×
UNCOV
386
    }
×
387
}
12✔
388

389
std::unique_ptr<TransformParametersBase> TransformPipeline::createParametersFromJson(
12✔
390
    std::string const& transform_name, 
391
    nlohmann::json const& param_json) {
392
    
393
    // Get the operation to get default parameters
394
    auto* operation = registry_->findOperationByName(transform_name);
12✔
395
    if (!operation) {
12✔
UNCOV
396
        return nullptr;
×
397
    }
398
    
399
    auto parameters = operation->getDefaultParameters();
12✔
400
    if (!parameters) {
12✔
UNCOV
401
        return nullptr;
×
402
    }
403
    
404
    // Set parameters from JSON
405
    for (auto const& [param_name, param_value] : param_json.items()) {
100✔
406
        if (!setParameterValue(parameters.get(), param_name, param_value, transform_name)) {
44✔
407
            std::cerr << "Warning: Failed to set parameter '" << param_name 
408
                      << "' for transform '" << transform_name << "'" << std::endl;
2✔
409
        }
410
    }
12✔
411
    
412
    return parameters;
12✔
413
}
12✔
414

415
bool TransformPipeline::setParameterValue(TransformParametersBase* param_obj, 
44✔
416
                                         std::string const& param_name, 
417
                                         nlohmann::json const& json_value,
418
                                         std::string const& transform_name) {
419
    
420
    // Use the parameter factory for type conversion
421
    auto& factory = ParameterFactory::getInstance();
44✔
422
    
423
    return factory.setParameter(transform_name, param_obj, param_name, json_value, data_manager_);
44✔
424
}
425

426
std::pair<bool, DataTypeVariant> TransformPipeline::getInputData(std::string const& input_key) {
12✔
427
    // First check temporary data
428
    auto temp_it = temporary_data_.find(input_key);
12✔
429
    if (temp_it != temporary_data_.end()) {
12✔
UNCOV
430
        return {true, temp_it->second};
×
431
    }
432
    
433
    // Then check data manager
434
    auto data_variant = data_manager_->getDataVariant(input_key);
12✔
435
    if (data_variant.has_value()) {
12✔
436
        return {true, data_variant.value()};
12✔
437
    }
438
    
UNCOV
439
    return {false, DataTypeVariant{}};
×
440
}
12✔
441

442
void TransformPipeline::storeOutputData(std::string const& output_key, 
12✔
443
                                       DataTypeVariant const& data, 
444
                                       std::string const& step_id,
445
                                       TimeKey const& time_key) {
446
    if (output_key.empty()) {
12✔
447
        // Store as temporary data using step_id
UNCOV
448
        temporary_data_[step_id + "_output"] = data;
×
449
    } else {
450
        // Store in data manager
451
        data_manager_->setData(output_key, data, time_key);
12✔
452
    }
453
}
12✔
454

455
std::map<int, std::vector<int>> TransformPipeline::groupStepsByPhase() const {
12✔
456
    std::map<int, std::vector<int>> phase_groups;
12✔
457
    
458
    for (size_t i = 0; i < steps_.size(); ++i) {
24✔
459
        phase_groups[steps_[i].phase].push_back(static_cast<int>(i));
12✔
460
    }
461
    
462
    return phase_groups;
12✔
UNCOV
463
}
×
464

465
std::vector<StepResult> TransformPipeline::executePhase(
12✔
466
    std::vector<int> const& phase_steps, 
467
    int,
468
    PipelineProgressCallback progress_callback) {
469
    
470
    std::vector<StepResult> results;
12✔
471
    results.reserve(phase_steps.size());
12✔
472
    
473
    if (phase_steps.size() == 1) {
12✔
474
        // Single step - execute directly
475
        int step_index = phase_steps[0];
12✔
476
        auto const& step = steps_[static_cast<size_t>(step_index)];
12✔
477
        
478
        ProgressCallback step_progress_callback;
12✔
479
        if (progress_callback) {
12✔
480
            step_progress_callback = [this, progress_callback, step_index, step](int step_progress) {
18✔
481
                int overall_progress = (step_index * 100) / static_cast<int>(steps_.size());
45✔
482
                progress_callback(step_index, step.step_id, step_progress, overall_progress);
45✔
483
            };
9✔
484
        }
485
        
486
        results.push_back(executeStep(step, step_progress_callback));
12✔
487
    } else {
12✔
488
        // Multiple steps - execute in parallel
489
        std::vector<std::future<StepResult>> futures;
×
490
        futures.reserve(phase_steps.size());
×
491
        
492
        for (int step_index : phase_steps) {
×
493
            auto const& step = steps_[static_cast<size_t>(step_index)];
×
494
            
495
            ProgressCallback step_progress_callback;
×
496
            if (progress_callback) {
×
497
                step_progress_callback = [this, progress_callback, step_index, step](int step_progress) {
×
UNCOV
498
                    int overall_progress = (step_index * 100) / static_cast<int>(steps_.size());
×
UNCOV
499
                    progress_callback(step_index, step.step_id, step_progress, overall_progress);
×
500
                };
×
501
            }
502
            
UNCOV
503
            futures.push_back(std::async(std::launch::async, 
×
504
                [this, step, step_progress_callback]() {
×
UNCOV
505
                    return executeStep(step, step_progress_callback);
×
506
                }));
507
        }
×
508
        
509
        // Collect results
510
        for (auto& future : futures) {
×
UNCOV
511
            results.push_back(future.get());
×
512
        }
513
    }
×
514
    
515
    return results;
12✔
UNCOV
516
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc