• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

paulmthompson / WhiskerToolbox / 17270491352

27 Aug 2025 02:57PM UTC coverage: 65.333%. Remained the same
17270491352

push

github

paulmthompson
Merge branch 'main' of https://github.com/paulmthompson/WhiskerToolbox

352 of 628 new or added lines in 92 files covered. (56.05%)

357 existing lines in 24 files now uncovered.

26429 of 40453 relevant lines covered (65.33%)

1119.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/DataManager/transforms/TransformPipeline.cpp
1
#include "TransformPipeline.hpp"
2

3
#include "DataManager.hpp"
4
#include "ParameterFactory.hpp"
5
#include "TransformRegistry.hpp"
6

7
#include <algorithm>
8
#include <chrono>
9
#include <fstream>
10
#include <iostream>
11
#include <thread>
12
#include <future>
13
#include <type_traits>
14

15
TransformPipeline::TransformPipeline(DataManager* data_manager, TransformRegistry* registry)
×
16
    : data_manager_(data_manager), registry_(registry) {
×
17
    if (!data_manager_) {
×
18
        throw std::invalid_argument("DataManager cannot be null");
×
19
    }
20
    if (!registry_) {
×
21
        throw std::invalid_argument("TransformRegistry cannot be null");
×
22
    }
23
}
×
24

25
bool TransformPipeline::loadFromJson(nlohmann::json const& json_config) {
×
26
    try {
27
        clear();
×
28
        
29
        // Load metadata
30
        if (json_config.contains("metadata")) {
×
31
            metadata_ = json_config["metadata"];
×
32
        }
33
        
34
        // Load steps
35
        if (!json_config.contains("steps") || !json_config["steps"].is_array()) {
×
36
            std::cerr << "Pipeline JSON must contain a 'steps' array" << std::endl;
×
37
            return false;
×
38
        }
39
        
40
        auto const& steps_json = json_config["steps"];
×
41
        steps_.reserve(steps_json.size());
×
42
        
43
        for (size_t i = 0; i < steps_json.size(); ++i) {
×
44
            auto [success, step] = parseStep(steps_json[i], static_cast<int>(i));
×
45
            if (!success) {
×
46
                std::cerr << "Failed to parse step " << i << std::endl;
×
47
                return false;
×
48
            }
49
            steps_.push_back(std::move(step));
×
50
        }
×
51
        
52
        // Validate the loaded pipeline
53
        auto validation_errors = validate();
×
54
        if (!validation_errors.empty()) {
×
55
            std::cerr << "Pipeline validation failed:" << std::endl;
×
56
            for (auto const& error : validation_errors) {
×
57
                std::cerr << "  - " << error << std::endl;
×
58
            }
59
            return false;
×
60
        }
61
        
62
        return true;
×
63
    } catch (std::exception const& e) {
×
64
        std::cerr << "Error loading pipeline from JSON: " << e.what() << std::endl;
×
65
        return false;
×
66
    }
×
67
}
68

69
bool TransformPipeline::loadFromJsonFile(std::string const& json_file_path) {
×
70
    try {
71
        std::ifstream file(json_file_path);
×
72
        if (!file.is_open()) {
×
73
            std::cerr << "Cannot open pipeline file: " << json_file_path << std::endl;
×
74
            return false;
×
75
        }
76
        
77
        nlohmann::json json_config;
×
78
        file >> json_config;
×
79
        return loadFromJson(json_config);
×
80
    } catch (std::exception const& e) {
×
81
        std::cerr << "Error reading pipeline file '" << json_file_path << "': " << e.what() << std::endl;
×
82
        return false;
×
83
    }
×
84
}
85

86
PipelineResult TransformPipeline::execute(PipelineProgressCallback progress_callback) {
×
87
    auto start_time = std::chrono::high_resolution_clock::now();
×
88
    
89
    PipelineResult result;
×
90
    result.total_steps = static_cast<int>(steps_.size());
×
91
    result.step_results.reserve(steps_.size());
×
92
    
93
    try {
94
        // Clear temporary data from previous executions
95
        temporary_data_.clear();
×
96
        
97
        // Group steps by phase
98
        auto phase_groups = groupStepsByPhase();
×
99
        
100
        int completed_steps = 0;
×
101
        
102
        // Execute each phase
103
        for (auto const& [phase_number, step_indices] : phase_groups) {
×
104
            if (progress_callback) {
×
105
                progress_callback(-1, "Starting phase " + std::to_string(phase_number), 0, 
×
106
                                (completed_steps * 100) / result.total_steps);
×
107
            }
108
            
109
            auto phase_results = executePhase(step_indices, phase_number, progress_callback);
×
110
            
111
            // Check if any step in this phase failed
112
            bool phase_failed = false;
×
113
            for (auto const& step_result : phase_results) {
×
114
                result.step_results.push_back(step_result);
×
115
                if (!step_result.success) {
×
116
                    phase_failed = true;
×
117
                    result.error_message = "Step failed: " + step_result.error_message;
×
118
                    break;
×
119
                }
120
                completed_steps++;
×
121
            }
122
            
123
            if (phase_failed) {
×
124
                result.success = false;
×
125
                result.steps_completed = completed_steps;
×
126
                break;
×
127
            }
128
        }
×
129
        
130
        if (result.step_results.size() == steps_.size()) {
×
131
            result.success = true;
×
132
            result.steps_completed = result.total_steps;
×
133
            
134
            if (progress_callback) {
×
135
                progress_callback(-1, "Pipeline completed", 100, 100);
×
136
            }
137
        }
138
        
139
    } catch (std::exception const& e) {
×
140
        result.success = false;
×
141
        result.error_message = "Pipeline execution error: " + std::string(e.what());
×
142
    }
×
143
    
144
    auto end_time = std::chrono::high_resolution_clock::now();
×
145
    result.total_execution_time_ms = std::chrono::duration<double, std::milli>(end_time - start_time).count();
×
146
    
147
    return result;
×
148
}
×
149

150
StepResult TransformPipeline::executeStep(PipelineStep const& step, ProgressCallback progress_callback) {
×
151
    auto start_time = std::chrono::high_resolution_clock::now();
×
152
    
153
    StepResult result;
×
154
    result.output_key = step.output_key;
×
155
    
156
    try {
157
        // Check if step is enabled
158
        if (!step.enabled) {
×
159
            result.success = true; // Disabled steps are considered successful
×
160
            return result;
×
161
        }
162
        
163
        // Get the transform operation
164
        auto* operation = registry_->findOperationByName(step.transform_name);
×
165
        if (!operation) {
×
166
            result.error_message = "Transform '" + step.transform_name + "' not found in registry";
×
167
            return result;
×
168
        }
169
        
170
        // Get input data
171
        auto [input_success, input_data] = getInputData(step.input_key);
×
172
        if (!input_success) {
×
173
            result.error_message = "Failed to get input data for key '" + step.input_key + "'";
×
174
            return result;
×
175
        }
176
        
177
        // Check if operation can apply to input data
178
        if (!operation->canApply(input_data)) {
×
179
            result.error_message = "Transform '" + step.transform_name + "' cannot be applied to input data";
×
180
            return result;
×
181
        }
182
        
183
        // Create parameters
184
        auto parameters = createParametersFromJson(step.transform_name, step.parameters);
×
185
        
186
        // Execute the transform
187
        DataTypeVariant output_data;
×
188
        if (progress_callback) {
×
189
            output_data = operation->execute(input_data, parameters.get(), progress_callback);
×
190
        } else {
191
            output_data = operation->execute(input_data, parameters.get());
×
192
        }
193
        
194
        // Check if execution was successful (assuming empty variant means failure)
195
        if (std::visit([](auto const& ptr) { return ptr == nullptr; }, output_data)) {
×
196
            result.error_message = "Transform execution returned null result";
×
197
            return result;
×
198
        }
199
        
200
        // Store output data
201
        auto time_key = data_manager_->getTimeKey(step.input_key);
×
202
        storeOutputData(step.output_key, output_data, step.step_id, time_key);
×
203
        result.result_data = output_data;
×
204
        result.success = true;
×
205
        
206
    } catch (std::exception const& e) {
×
207
        result.error_message = "Step execution error: " + std::string(e.what());
×
208
    }
×
209
    
210
    auto end_time = std::chrono::high_resolution_clock::now();
×
211
    result.execution_time_ms = std::chrono::duration<double, std::milli>(end_time - start_time).count();
×
212
    
213
    return result;
×
214
}
×
215

216
std::vector<std::string> TransformPipeline::validate() const {
×
217
    std::vector<std::string> errors;
×
218
    
219
    // Check for duplicate step IDs
220
    std::unordered_map<std::string, int> step_id_counts;
×
221
    for (auto const& step : steps_) {
×
222
        step_id_counts[step.step_id]++;
×
223
    }
224
    for (auto const& [step_id, count] : step_id_counts) {
×
225
        if (count > 1) {
×
226
            errors.push_back("Duplicate step ID: " + step_id);
×
227
        }
228
    }
229
    
230
    // Validate each step
231
    for (size_t i = 0; i < steps_.size(); ++i) {
×
232
        auto const& step = steps_[i];
×
233
        std::string step_prefix = "Step " + std::to_string(i) + " (" + step.step_id + "): ";
×
234
        
235
        // Check if transform exists
236
        if (!registry_->findOperationByName(step.transform_name)) {
×
237
            errors.push_back(step_prefix + "Transform '" + step.transform_name + "' not found in registry");
×
238
        }
239
        
240
        // Check input key
241
        if (step.input_key.empty()) {
×
242
            errors.push_back(step_prefix + "Input key cannot be empty");
×
243
        }
244
        
245
        // Check step ID
246
        if (step.step_id.empty()) {
×
247
            errors.push_back(step_prefix + "Step ID cannot be empty");
×
248
        }
249
        
250
        // Check phase number
251
        if (step.phase < 0) {
×
252
            errors.push_back(step_prefix + "Phase number cannot be negative");
×
253
        }
254
    }
×
255
    
256
    return errors;
×
257
}
×
258

259
void TransformPipeline::clear() {
×
260
    steps_.clear();
×
261
    metadata_ = nlohmann::json::object();
×
262
    temporary_data_.clear();
×
263
}
×
264

265
nlohmann::json TransformPipeline::exportToJson() const {
×
266
    nlohmann::json result;
×
267
    
268
    // Export metadata
269
    result["metadata"] = metadata_;
×
270
    
271
    // Export steps
272
    result["steps"] = nlohmann::json::array();
×
273
    for (auto const& step : steps_) {
×
274
        nlohmann::json step_json;
×
275
        step_json["step_id"] = step.step_id;
×
276
        step_json["transform_name"] = step.transform_name;
×
277
        step_json["input_key"] = step.input_key;
×
278
        step_json["output_key"] = step.output_key;
×
279
        step_json["parameters"] = step.parameters;
×
280
        step_json["phase"] = step.phase;
×
281
        step_json["enabled"] = step.enabled;
×
282
        
283
        if (!step.description.empty()) {
×
284
            step_json["description"] = step.description;
×
285
        }
286
        if (!step.tags.empty()) {
×
287
            step_json["tags"] = step.tags;
×
288
        }
289
        
290
        result["steps"].push_back(step_json);
×
291
    }
×
292
    
293
    return result;
×
294
}
×
295

296
bool TransformPipeline::saveToJsonFile(std::string const& json_file_path) const {
×
297
    try {
298
        std::ofstream file(json_file_path);
×
299
        if (!file.is_open()) {
×
300
            std::cerr << "Cannot create pipeline file: " << json_file_path << std::endl;
×
301
            return false;
×
302
        }
303
        
304
        auto json_data = exportToJson();
×
305
        file << json_data.dump(2); // Pretty print with 2-space indentation
×
306
        return true;
×
307
    } catch (std::exception const& e) {
×
308
        std::cerr << "Error saving pipeline file '" << json_file_path << "': " << e.what() << std::endl;
×
309
        return false;
×
310
    }
×
311
}
312

313
// Private implementation methods
314

315
std::pair<bool, PipelineStep> TransformPipeline::parseStep(nlohmann::json const& step_json, int step_index) {
×
316
    PipelineStep step;
×
317
    
318
    try {
319
        // Required fields
320
        if (!step_json.contains("step_id") || !step_json["step_id"].is_string()) {
×
321
            std::cerr << "Step " << step_index << ": 'step_id' is required and must be a string" << std::endl;
×
322
            return {false, step};
×
323
        }
324
        step.step_id = step_json["step_id"];
×
325
        
326
        if (!step_json.contains("transform_name") || !step_json["transform_name"].is_string()) {
×
327
            std::cerr << "Step " << step_index << ": 'transform_name' is required and must be a string" << std::endl;
×
328
            return {false, step};
×
329
        }
330
        step.transform_name = step_json["transform_name"];
×
331
        
332
        if (!step_json.contains("input_key") || !step_json["input_key"].is_string()) {
×
333
            std::cerr << "Step " << step_index << ": 'input_key' is required and must be a string" << std::endl;
×
334
            return {false, step};
×
335
        }
336
        step.input_key = step_json["input_key"];
×
337
        
338
        // Optional fields
339
        if (step_json.contains("output_key")) {
×
340
            if (step_json["output_key"].is_string()) {
×
341
                step.output_key = step_json["output_key"];
×
342
            }
343
        }
344
        
345
        if (step_json.contains("parameters")) {
×
346
            step.parameters = step_json["parameters"];
×
347
        } else {
348
            step.parameters = nlohmann::json::object();
×
349
        }
350
        
351
        if (step_json.contains("phase")) {
×
352
            if (step_json["phase"].is_number_integer()) {
×
353
                step.phase = step_json["phase"];
×
354
            }
355
        }
356
        
357
        if (step_json.contains("enabled")) {
×
358
            if (step_json["enabled"].is_boolean()) {
×
359
                step.enabled = step_json["enabled"];
×
360
            }
361
        }
362
        
363
        if (step_json.contains("description")) {
×
364
            if (step_json["description"].is_string()) {
×
365
                step.description = step_json["description"];
×
366
            }
367
        }
368
        
369
        if (step_json.contains("tags")) {
×
370
            if (step_json["tags"].is_array()) {
×
371
                for (auto const& tag : step_json["tags"]) {
×
372
                    if (tag.is_string()) {
×
373
                        step.tags.push_back(tag);
×
374
                    }
375
                }
376
            }
377
        }
378
        
379
        return {true, step};
×
380
    } catch (std::exception const& e) {
×
381
        std::cerr << "Error parsing step " << step_index << ": " << e.what() << std::endl;
×
382
        return {false, step};
×
383
    }
×
384
}
×
385

386
std::unique_ptr<TransformParametersBase> TransformPipeline::createParametersFromJson(
×
387
    std::string const& transform_name, 
388
    nlohmann::json const& param_json) {
389
    
390
    // Get the operation to get default parameters
391
    auto* operation = registry_->findOperationByName(transform_name);
×
392
    if (!operation) {
×
393
        return nullptr;
×
394
    }
395
    
396
    auto parameters = operation->getDefaultParameters();
×
397
    if (!parameters) {
×
398
        return nullptr;
×
399
    }
400
    
401
    // Set parameters from JSON
402
    for (auto const& [param_name, param_value] : param_json.items()) {
×
403
        if (!setParameterValue(parameters.get(), param_name, param_value, transform_name)) {
×
404
            std::cerr << "Warning: Failed to set parameter '" << param_name 
405
                      << "' for transform '" << transform_name << "'" << std::endl;
×
406
        }
407
    }
×
408
    
409
    return parameters;
×
410
}
×
411

412
bool TransformPipeline::setParameterValue(TransformParametersBase* param_obj, 
×
413
                                         std::string const& param_name, 
414
                                         nlohmann::json const& json_value,
415
                                         std::string const& transform_name) {
416
    
417
    // Use the parameter factory for type conversion
418
    auto& factory = ParameterFactory::getInstance();
×
419
    
420
    return factory.setParameter(transform_name, param_obj, param_name, json_value, data_manager_);
×
421
}
422

423
std::pair<bool, DataTypeVariant> TransformPipeline::getInputData(std::string const& input_key) {
×
424
    // First check temporary data
425
    auto temp_it = temporary_data_.find(input_key);
×
426
    if (temp_it != temporary_data_.end()) {
×
427
        return {true, temp_it->second};
×
428
    }
429
    
430
    // Then check data manager
431
    auto data_variant = data_manager_->getDataVariant(input_key);
×
432
    if (data_variant.has_value()) {
×
433
        return {true, data_variant.value()};
×
434
    }
435
    
436
    return {false, DataTypeVariant{}};
×
437
}
×
438

439
void TransformPipeline::storeOutputData(std::string const& output_key, 
×
440
                                       DataTypeVariant const& data, 
441
                                       std::string const& step_id,
442
                                       TimeKey const& time_key) {
443
    if (output_key.empty()) {
×
444
        // Store as temporary data using step_id
445
        temporary_data_[step_id + "_output"] = data;
×
446
    } else {
447
        // Store in data manager
448
        data_manager_->setData(output_key, data, time_key);
×
449
    }
450
}
×
451

452
std::map<int, std::vector<int>> TransformPipeline::groupStepsByPhase() const {
×
453
    std::map<int, std::vector<int>> phase_groups;
×
454
    
455
    for (size_t i = 0; i < steps_.size(); ++i) {
×
456
        phase_groups[steps_[i].phase].push_back(static_cast<int>(i));
×
457
    }
458
    
459
    return phase_groups;
×
460
}
×
461

462
std::vector<StepResult> TransformPipeline::executePhase(
×
463
    std::vector<int> const& phase_steps, 
464
    int,
465
    PipelineProgressCallback progress_callback) {
466
    
467
    std::vector<StepResult> results;
×
468
    results.reserve(phase_steps.size());
×
469
    
470
    if (phase_steps.size() == 1) {
×
471
        // Single step - execute directly
472
        int step_index = phase_steps[0];
×
NEW
473
        auto const& step = steps_[static_cast<size_t>(step_index)];
×
474
        
475
        ProgressCallback step_progress_callback;
×
476
        if (progress_callback) {
×
NEW
477
            step_progress_callback = [this, progress_callback, step_index, step](int step_progress) {
×
478
                int overall_progress = (step_index * 100) / static_cast<int>(steps_.size());
×
479
                progress_callback(step_index, step.step_id, step_progress, overall_progress);
×
480
            };
×
481
        }
482
        
483
        results.push_back(executeStep(step, step_progress_callback));
×
484
    } else {
×
485
        // Multiple steps - execute in parallel
486
        std::vector<std::future<StepResult>> futures;
×
487
        futures.reserve(phase_steps.size());
×
488
        
489
        for (int step_index : phase_steps) {
×
NEW
490
            auto const& step = steps_[static_cast<size_t>(step_index)];
×
491
            
492
            ProgressCallback step_progress_callback;
×
493
            if (progress_callback) {
×
NEW
494
                step_progress_callback = [this, progress_callback, step_index, step](int step_progress) {
×
495
                    int overall_progress = (step_index * 100) / static_cast<int>(steps_.size());
×
496
                    progress_callback(step_index, step.step_id, step_progress, overall_progress);
×
497
                };
×
498
            }
499
            
500
            futures.push_back(std::async(std::launch::async, 
×
501
                [this, step, step_progress_callback]() {
×
502
                    return executeStep(step, step_progress_callback);
×
503
                }));
504
        }
×
505
        
506
        // Collect results
507
        for (auto& future : futures) {
×
508
            results.push_back(future.get());
×
509
        }
510
    }
×
511
    
512
    return results;
×
513
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc