• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

paulmthompson / WhiskerToolbox / 17920603410

22 Sep 2025 03:39PM UTC coverage: 71.97% (-0.05%) from 72.02%
17920603410

push

github

paulmthompson
all tests pass

277 of 288 new or added lines in 8 files covered. (96.18%)

520 existing lines in 35 files now uncovered.

40275 of 55961 relevant lines covered (71.97%)

1225.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.13
/src/DataManager/utils/TableView/pipeline/TablePipeline.cpp
1
#include "TablePipeline.hpp"
2

3
#include "DataManager.hpp"
4
#include "TimeFrame/TimeFrame.hpp"
5
#include "utils/TableView/ComputerRegistry.hpp"
6
#include "utils/TableView/ComputerRegistryTypes.hpp"
7
#include "utils/TableView/TableRegistry.hpp"
8
#include "utils/TableView/adapters/DataManagerExtension.h"
9
#include "utils/TableView/core/TableViewBuilder.h"
10
#include "utils/TableView/interfaces/IEventSource.h"
11
#include "utils/TableView/interfaces/IIntervalSource.h"
12
#include "utils/TableView/interfaces/IRowSelector.h"
13
#include "utils/TableView/transforms/PCATransform.hpp"
14

15
#include <chrono>
16
#include <fstream>
17
#include <iostream>
18

19
TablePipeline::TablePipeline(TableRegistry * table_registry, DataManager * data_manager)
55✔
20
    : table_registry_(table_registry),
55✔
21
      data_manager_(data_manager),
55✔
22
      data_manager_extension_(table_registry->getDataManagerExtension()),
55✔
23
      computer_registry_(&table_registry->getComputerRegistry()) {
55✔
24

25
    if (!table_registry) {
55✔
26
        throw std::invalid_argument("TableRegistry cannot be null");
×
27
    }
28
}
55✔
29

30
bool TablePipeline::loadFromJson(nlohmann::json const & json_config) {
23✔
31
    try {
32
        clear();
23✔
33

34
        // Load metadata if present
35
        if (json_config.contains("metadata")) {
23✔
36
            metadata_ = json_config["metadata"];
23✔
37
        }
38

39
        // Load tables array
40
        if (!json_config.contains("tables")) {
23✔
41
            std::cerr << "TablePipeline: JSON must contain 'tables' array" << std::endl;
×
42
            return false;
×
43
        }
44

45
        if (!json_config["tables"].is_array()) {
23✔
46
            std::cerr << "TablePipeline: 'tables' must be an array" << std::endl;
×
47
            return false;
×
48
        }
49

50
        for (auto const & table_json: json_config["tables"]) {
46✔
51
            auto config = parseTableConfiguration(table_json);
23✔
52

53
            // Validate configuration
54
            std::string validation_error = validateTableConfiguration(config);
23✔
55
            if (!validation_error.empty()) {
23✔
56
                std::cerr << "TablePipeline: Invalid table configuration for '"
57
                          << config.table_id << "': " << validation_error << std::endl;
×
58
                return false;
×
59
            }
60

61
            tables_.push_back(std::move(config));
23✔
62
        }
23✔
63

64
        std::cout << "TablePipeline: Loaded " << tables_.size() << " table configurations" << std::endl;
23✔
65
        return true;
23✔
66

67
    } catch (std::exception const & e) {
×
68
        std::cerr << "TablePipeline: Error loading JSON: " << e.what() << std::endl;
×
69
        return false;
×
70
    }
×
71
}
72

73
bool TablePipeline::loadFromJsonFile(std::string const & json_file_path) {
×
74
    std::ifstream file(json_file_path);
×
75
    if (!file.is_open()) {
×
76
        std::cerr << "TablePipeline: Cannot open file: " << json_file_path << std::endl;
×
77
        return false;
×
78
    }
79

80
    nlohmann::json json_config;
×
81
    try {
82
        file >> json_config;
×
83
    } catch (std::exception const & e) {
×
84
        std::cerr << "TablePipeline: Error parsing JSON file: " << e.what() << std::endl;
×
85
        return false;
×
86
    }
×
87

88
    return loadFromJson(json_config);
×
89
}
×
90

91
TablePipelineResult TablePipeline::execute(TablePipelineProgressCallback progress_callback) {
22✔
92
    auto start_time = std::chrono::high_resolution_clock::now();
22✔
93

94
    TablePipelineResult result;
22✔
95
    result.total_tables = static_cast<int>(tables_.size());
22✔
96

97
    if (tables_.empty()) {
22✔
98
        result.success = true;
×
99
        return result;
×
100
    }
101

102
    try {
103
        for (size_t i = 0; i < tables_.size(); ++i) {
43✔
104
            auto const & config = tables_[i];
22✔
105

106
            // Report progress
107
            if (progress_callback) {
22✔
108
                int overall_progress = static_cast<int>((i * 100) / tables_.size());
9✔
109
                progress_callback(static_cast<int>(i), config.name, 0, overall_progress);
9✔
110
            }
111

112
            // Build the table
113
            auto table_result = buildTable(config, [&](int columns_done, int total_columns) {
44✔
114
                if (progress_callback) {
70✔
115
                    int table_progress = total_columns > 0 ? (columns_done * 100) / total_columns : 100;
31✔
116
                    int overall_progress = static_cast<int>((i * 100) / tables_.size());
31✔
117
                    progress_callback(static_cast<int>(i), config.name, table_progress, overall_progress);
31✔
118
                }
119
            });
44✔
120

121
            result.table_results.push_back(table_result);
22✔
122

123
            if (table_result.success) {
22✔
124
                result.tables_completed++;
21✔
125
            } else {
126
                result.success = false;
1✔
127
                result.error_message = "Failed to build table '" + config.table_id + "': " + table_result.error_message;
1✔
128
                break;
1✔
129
            }
130
        }
22✔
131

132
        if (result.tables_completed == result.total_tables) {
22✔
133
            result.success = true;
21✔
134
        }
135

136
    } catch (std::exception const & e) {
×
137
        result.success = false;
×
138
        result.error_message = "Exception during pipeline execution: " + std::string(e.what());
×
139
    }
×
140

141
    auto end_time = std::chrono::high_resolution_clock::now();
22✔
142
    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
22✔
143
    result.total_execution_time_ms = static_cast<double>(duration.count()) / 1000.0;
22✔
144

145
    return result;
22✔
146
}
×
147

148
TableBuildResult TablePipeline::buildTable(TableConfiguration const & config,
22✔
149
                                           std::function<void(int, int)> progress_callback) {
150
    auto start_time = std::chrono::high_resolution_clock::now();
22✔
151

152
    TableBuildResult result;
22✔
153
    result.table_id = config.table_id;
22✔
154
    result.total_columns = static_cast<int>(config.columns.size());
22✔
155

156
    try {
157
        if (table_registry_->hasTable(config.table_id)) {
22✔
158
            table_registry_->updateTableInfo(config.table_id, config.name, config.description);
×
159
        } else {
160
            table_registry_->createTable(config.table_id, config.name, config.description);
22✔
161
        }
162

163
        // Create TableViewBuilder
164
        TableViewBuilder builder(data_manager_extension_);
22✔
165

166
        // Create and set row selector
167
        auto row_selector = createRowSelector(config.row_selector);
22✔
168
        if (!row_selector) {
22✔
169
            result.error_message = "Failed to create row selector";
×
170
            return result;
×
171
        }
172

173
        RowSelectorType selector_type = parseRowSelectorType(config.row_selector["type"]);
22✔
174
        builder.setRowSelector(std::move(row_selector));
22✔
175

176
        // Add columns
177
        for (size_t i = 0; i < config.columns.size(); ++i) {
70✔
178
            auto const & column_json = config.columns[i];
49✔
179

180
            // Report progress
181
            if (progress_callback) {
49✔
182
                progress_callback(static_cast<int>(i), result.total_columns);
49✔
183
            }
184

185
            // Create column computer
186
            auto computer = createColumnComputer(column_json, selector_type);
49✔
187
            if (!computer) {
49✔
188
                result.error_message = "Failed to create computer for column: " +
4✔
189
                                       column_json.value("name", "unnamed");
3✔
190
                return result;
1✔
191
            }
192

193
            // Add column to builder - this requires template specialization
194
            std::string column_name = column_json["name"];
48✔
195

196
            // Get computer info to determine output type
197
            std::string computer_name = column_json["computer"];
48✔
198
            auto computer_info = computer_registry_->findComputerInfo(computer_name);
48✔
199
            if (!computer_info) {
48✔
200
                result.error_message = "Computer not found: " + computer_name;
×
201
                return result;
×
202
            }
203

204
            // Add column based on output type
205
            if (!addColumnToBuilder(builder, column_json, std::move(computer))) {
48✔
206
                result.error_message = "Failed to add column to builder: " + column_name;
×
207
                return result;
×
208
            }
209

210
            result.columns_built++;
48✔
211
        }
49✔
212

213
        // Final progress report
214
        if (progress_callback) {
21✔
215
            progress_callback(result.total_columns, result.total_columns);
21✔
216
        }
217

218
        // Build the table
219
        TableView table_view = builder.build();
21✔
220

221
        // Store the built table in TableManager
222
        if (!table_registry_->storeBuiltTable(config.table_id, std::make_unique<TableView>(std::move(table_view)))) {
21✔
223
            result.error_message = "Failed to store built table in TableManager";
×
224
            return result;
×
225
        }
226

227
        // Apply optional transforms for this base table
228
        if (!applyTransforms(config)) {
21✔
229
            std::cerr << "TablePipeline: One or more transforms failed for table '" << config.table_id << "'\n";
×
230
        }
231

232
        result.success = true;
21✔
233

234
    } catch (std::exception const & e) {
23✔
235
        result.error_message = "Exception during table build: " + std::string(e.what());
×
236
    }
×
237

238
    auto end_time = std::chrono::high_resolution_clock::now();
21✔
239
    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time);
21✔
240
    result.build_time_ms = static_cast<double>(duration.count()) / 1000.0;
21✔
241

242
    return result;
21✔
243
}
×
244

245
void TablePipeline::clear() {
23✔
246
    tables_.clear();
23✔
247
    metadata_ = nlohmann::json{};
23✔
248
}
23✔
249

250
TableConfiguration TablePipeline::parseTableConfiguration(nlohmann::json const & table_json) {
23✔
251
    TableConfiguration config;
23✔
252

253
    config.table_id = table_json.value("table_id", "");
23✔
254
    config.name = table_json.value("name", "");
23✔
255
    config.description = table_json.value("description", "");
23✔
256

257
    if (table_json.contains("row_selector")) {
23✔
258
        config.row_selector = table_json["row_selector"];
23✔
259
    }
260

261
    if (table_json.contains("columns") && table_json["columns"].is_array()) {
23✔
262
        for (auto const & column_json: table_json["columns"]) {
125✔
263
            config.columns.push_back(column_json);
51✔
264
        }
265
    }
266

267
    if (table_json.contains("tags") && table_json["tags"].is_array()) {
23✔
268
        for (auto const & tag: table_json["tags"]) {
×
269
            if (tag.is_string()) {
×
270
                config.tags.push_back(tag);
×
271
            }
272
        }
273
    }
274
    if (table_json.contains("transforms") && table_json["transforms"].is_array()) {
23✔
275
        for (auto const & tjson: table_json["transforms"]) {
×
276
            TableConfiguration::TransformSpec spec;
×
277
            spec.type = tjson.value("type", "");
×
278
            if (tjson.contains("parameters") && tjson["parameters"].is_object()) {
×
279
                spec.parameters = tjson["parameters"];
×
280
            }
281
            spec.output_table_id = tjson.value("output_table_id", "");
×
282
            spec.output_name = tjson.value("output_name", "");
×
283
            spec.output_description = tjson.value("output_description", "");
×
284
            config.transforms.push_back(std::move(spec));
×
285
        }
×
286
    }
287

288
    return config;
23✔
289
}
×
290

291
bool TablePipeline::applyTransforms(TableConfiguration const & config) {
21✔
292
    if (config.transforms.empty()) return true;
21✔
293

294
    auto base_id = config.table_id;
×
295
    auto base_view_sp = table_registry_->getBuiltTable(base_id);
×
296
    if (!base_view_sp) {
×
297
        std::cerr << "TablePipeline: Cannot apply transforms, base table not found: " << config.table_id << "\n";
×
298
        return false;
×
299
    }
300

301
    bool all_ok = true;
×
302
    for (auto const & t: config.transforms) {
×
303
        try {
304
            if (t.type == "PCA") {
×
305
                PCAConfig pc;
×
306
                pc.center = t.parameters.value("center", true);
×
307
                pc.standardize = t.parameters.value("standardize", false);
×
308
                if (t.parameters.contains("include") && t.parameters["include"].is_array()) {
×
309
                    for (auto const & s: t.parameters["include"])
×
310
                        if (s.is_string()) pc.include.push_back(s);
×
311
                }
312
                if (t.parameters.contains("exclude") && t.parameters["exclude"].is_array()) {
×
313
                    for (auto const & s: t.parameters["exclude"])
×
314
                        if (s.is_string()) pc.exclude.push_back(s);
×
315
                }
316

317
                PCATransform pca(pc);
×
318
                TableView derived = pca.apply(*base_view_sp);
×
319

320
                // Prepare output id/name
321
                std::string out_id = t.output_table_id.empty()
×
322
                                             ? table_registry_->generateUniqueTableId(config.table_id + "_pca")
×
323
                                             : t.output_table_id;
×
324
                std::string out_name = t.output_name.empty()
×
325
                                               ? config.name + " (PCA)"
×
326
                                               : t.output_name;
×
327
                std::string out_desc = t.output_description;
×
328

329
                if (!table_registry_->hasTable(out_id)) {
×
330
                    table_registry_->createTable(out_id, out_name, out_desc);
×
331
                } else {
332
                    table_registry_->updateTableInfo(out_id, out_name, out_desc);
×
333
                }
334
                if (!table_registry_->storeBuiltTable(out_id, std::make_unique<TableView>(std::move(derived)))) {
×
335
                    std::cerr << "TablePipeline: Failed to store transformed table: " << out_id << "\n";
×
336
                    all_ok = false;
×
337
                }
338
            } else {
×
339
                std::cerr << "TablePipeline: Unknown transform type: " << t.type << "\n";
×
340
                all_ok = false;
×
341
            }
342
        } catch (std::exception const & e) {
×
343
            std::cerr << "TablePipeline: Transform '" << t.type << "' failed: " << e.what() << "\n";
×
344
            all_ok = false;
×
345
        }
×
346
    }
347
    return all_ok;
×
348
}
×
349

350
std::unique_ptr<IRowSelector> TablePipeline::createRowSelector(nlohmann::json const & row_selector_json) {
22✔
351
    if (!row_selector_json.contains("type")) {
22✔
352
        std::cerr << "TablePipeline: Row selector must have 'type' field" << std::endl;
×
353
        return nullptr;
×
354
    }
355

356
    std::string type = row_selector_json["type"];
22✔
357

358
    if (type == "interval") {
22✔
359
        // Handle interval row selector creation
360
        if (row_selector_json.contains("source")) {
11✔
361
            // Source-based interval specification - get intervals from DigitalIntervalSeries
362
            std::string source_key = row_selector_json["source"];
11✔
363

364
            // Get the interval source from the DataManagerExtension
365
            auto interval_source = data_manager_extension_->getIntervalSource(source_key);
11✔
366
            if (!interval_source) {
11✔
367
                std::cerr << "TablePipeline: Cannot resolve interval source: " << source_key << std::endl;
×
368
                return nullptr;
×
369
            }
370

371
            // Get the source's timeframe
372
            auto source_timeframe = interval_source->getTimeFrame();
11✔
373
            if (!source_timeframe) {
11✔
374
                std::cerr << "TablePipeline: Interval source has no timeframe: " << source_key << std::endl;
×
375
                return nullptr;
×
376
            }
377

378
            // Get all intervals from the source
379
            auto intervals = interval_source->getIntervalsInRange(
11✔
380
                    TimeFrameIndex(0),
381
                    TimeFrameIndex(source_timeframe->getTotalFrameCount() - 1),
11✔
382
                    source_timeframe.get());
22✔
383

384
            if (intervals.empty()) {
11✔
385
                std::cerr << "TablePipeline: No intervals found in source: " << source_key << std::endl;
×
386
                return nullptr;
×
387
            }
388

389
            // Convert Interval objects to TimeFrameInterval objects
390
            std::vector<TimeFrameInterval> time_frame_intervals;
11✔
391
            time_frame_intervals.reserve(intervals.size());
11✔
392
            for (auto const & interval: intervals) {
54✔
393
                time_frame_intervals.emplace_back(TimeFrameIndex(interval.start), TimeFrameIndex(interval.end));
43✔
394
            }
395

396
            return std::make_unique<IntervalSelector>(std::move(time_frame_intervals), source_timeframe);
11✔
397

398
        } else if (row_selector_json.contains("intervals") && row_selector_json["intervals"].is_array()) {
11✔
399
            // Direct interval specification
400
            std::vector<TimeFrameInterval> intervals;
×
401
            std::shared_ptr<TimeFrame> timeFrame;
×
402

403
            // Check if timeframe is specified
404
            if (row_selector_json.contains("timeframe")) {
×
405
                std::string timeframe_key = row_selector_json["timeframe"];
×
406
                timeFrame = data_manager_->getTime(TimeKey(timeframe_key));
×
407
                if (!timeFrame) {
×
408
                    std::cerr << "TablePipeline: Cannot resolve timeframe: " << timeframe_key << std::endl;
×
409
                    return nullptr;
×
410
                }
411
            } else {
×
412
                // Use default timeframe
413
                timeFrame = data_manager_->getTime();
×
414
                if (!timeFrame) {
×
415
                    std::cerr << "TablePipeline: No default timeframe available" << std::endl;
×
416
                    return nullptr;
×
417
                }
418
            }
419

420
            // Parse interval specifications
421
            for (auto const & interval_json: row_selector_json["intervals"]) {
×
422
                if (interval_json.is_array() && interval_json.size() == 2) {
×
423
                    int64_t start = interval_json[0].get<int64_t>();
×
424
                    int64_t end = interval_json[1].get<int64_t>();
×
425
                    intervals.emplace_back(TimeFrameIndex(start), TimeFrameIndex(end));
×
426
                } else if (interval_json.is_object() &&
×
427
                           interval_json.contains("start") && interval_json.contains("end")) {
×
428
                    int64_t start = interval_json["start"].get<int64_t>();
×
429
                    int64_t end = interval_json["end"].get<int64_t>();
×
430
                    intervals.emplace_back(TimeFrameIndex(start), TimeFrameIndex(end));
×
431
                } else {
432
                    std::cerr << "TablePipeline: Invalid interval specification in intervals array" << std::endl;
×
433
                    return nullptr;
×
434
                }
435
            }
436

437
            if (intervals.empty()) {
×
438
                std::cerr << "TablePipeline: No valid intervals found in intervals array" << std::endl;
×
439
                return nullptr;
×
440
            }
441

442
            return std::make_unique<IntervalSelector>(std::move(intervals), timeFrame);
×
443

444
        } else {
×
445
            std::cerr << "TablePipeline: Interval row selector must have 'source' field or 'intervals' array" << std::endl;
×
446
            return nullptr;
×
447
        }
448

449
    } else if (type == "timestamp") {
11✔
450
        // Handle timestamp row selector creation
451
        std::vector<TimeFrameIndex> timestamps;
11✔
452
        std::shared_ptr<TimeFrame> timeFrame;
11✔
453

454
        if (row_selector_json.contains("timestamps") && row_selector_json["timestamps"].is_array()) {
11✔
455
            // Direct timestamp specification
456
            for (auto const & ts: row_selector_json["timestamps"]) {
57✔
457
                if (ts.is_number_integer()) {
25✔
458
                    timestamps.emplace_back(ts.get<int64_t>());
25✔
459
                } else if (ts.is_number_float()) {
×
460
                    timestamps.emplace_back(static_cast<int64_t>(ts.get<double>()));
×
461
                }
462
            }
463

464
            // Check if timeframe is specified
465
            if (row_selector_json.contains("timeframe")) {
7✔
466
                std::string timeframe_key = row_selector_json["timeframe"];
1✔
467
                timeFrame = data_manager_->getTime(TimeKey(timeframe_key));
1✔
468
                if (!timeFrame) {
1✔
UNCOV
469
                    std::cerr << "TablePipeline: Cannot resolve timeframe: " << timeframe_key << std::endl;
×
UNCOV
470
                    return nullptr;
×
471
                }
472
            } else {
1✔
473
                // Use default timeframe
474
                timeFrame = data_manager_->getTime();
6✔
475
                if (!timeFrame) {
6✔
476
                    std::cerr << "TablePipeline: No default timeframe available" << std::endl;
×
477
                    return nullptr;
×
478
                }
479
            }
480

481
        } else if (row_selector_json.contains("source")) {
4✔
482
            // Source-based timestamp specification
483
            std::string source_key = row_selector_json["source"];
4✔
484

485
            // Try to get timestamps from a DigitalEventSeries
486
            if (auto event_source = data_manager_extension_->getEventSource(source_key)) {
4✔
487
                // Get all event times using the source's timeframe
UNCOV
488
                auto source_timeframe = event_source->getTimeFrame();
×
UNCOV
489
                if (source_timeframe && source_timeframe->getTotalFrameCount() > 0) {
×
490
                    TimeFrameIndex start_idx(0);
×
UNCOV
491
                    TimeFrameIndex end_idx(source_timeframe->getTotalFrameCount() - 1);
×
UNCOV
492
                    auto event_times = event_source->getDataInRange(start_idx, end_idx, source_timeframe.get());
×
493

UNCOV
494
                    timestamps.reserve(event_times.size());
×
UNCOV
495
                    for (auto time: event_times) {
×
UNCOV
496
                        timestamps.emplace_back(static_cast<int64_t>(time));
×
497
                    }
UNCOV
498
                    timeFrame = source_timeframe;
×
UNCOV
499
                } else {
×
UNCOV
500
                    std::cerr << "TablePipeline: Event source has no timeframe or no data: " << source_key << std::endl;
×
501
                    return nullptr;
×
502
                }
503

UNCOV
504
            } else {
×
505
                // Try to get specific TimeFrame
506
                timeFrame = data_manager_->getTime(TimeKey(source_key));
4✔
507
                if (timeFrame) {
4✔
508
                    // Generate timestamps from the TimeFrame
509
                    int frame_count = timeFrame->getTotalFrameCount();
4✔
510
                    timestamps.reserve(static_cast<size_t>(frame_count));
4✔
511
                    for (int i = 0; i < frame_count; ++i) {
508✔
512
                        timestamps.emplace_back(timeFrame->getTimeAtIndex(TimeFrameIndex(i)));
504✔
513
                    }
514
                } else {
UNCOV
515
                    std::cerr << "TablePipeline: Cannot resolve timestamp source: " << source_key << std::endl;
×
516
                    return nullptr;
×
517
                }
518
            }
4✔
519
        } else {
4✔
UNCOV
520
            std::cerr << "TablePipeline: Timestamp row selector must have 'timestamps' array or 'source' field" << std::endl;
×
UNCOV
521
            return nullptr;
×
522
        }
523

524
        if (timestamps.empty()) {
11✔
525
            std::cerr << "TablePipeline: No timestamps found for timestamp row selector" << std::endl;
×
UNCOV
526
            return nullptr;
×
527
        }
528

529
        if (!timeFrame) {
11✔
UNCOV
530
            std::cerr << "TablePipeline: No TimeFrame available for timestamp row selector" << std::endl;
×
UNCOV
531
            return nullptr;
×
532
        }
533

534
        return std::make_unique<TimestampSelector>(std::move(timestamps), timeFrame);
11✔
535

536
    } else if (type == "index") {
11✔
537
        // TODO: Implement index row selector creation
538
        std::cerr << "TablePipeline: Index row selector not yet implemented" << std::endl;
×
UNCOV
539
        return nullptr;
×
540

541
    } else {
UNCOV
542
        std::cerr << "TablePipeline: Unknown row selector type: " << type << std::endl;
×
UNCOV
543
        return nullptr;
×
544
    }
545
}
22✔
546

547
std::unique_ptr<IComputerBase> TablePipeline::createColumnComputer(nlohmann::json const & column_json,
49✔
548
                                                                   RowSelectorType row_selector_type) {
549
    (void) row_selector_type;// Suppress unused parameter warning
550
    if (!column_json.contains("computer")) {
49✔
UNCOV
551
        std::cerr << "TablePipeline: Column must have 'computer' field" << std::endl;
×
UNCOV
552
        return nullptr;
×
553
    }
554

555
    std::string computer_name = column_json["computer"];
49✔
556

557
    // Resolve data source
558
    DataSourceVariant data_source;
49✔
559
    if (column_json.contains("data_source")) {
49✔
560
        data_source = resolveDataSource(column_json["data_source"]);
49✔
561
    }
562

563
    // Get parameters
564
    std::map<std::string, std::string> parameters;
49✔
565
    if (column_json.contains("parameters") && column_json["parameters"].is_object()) {
49✔
566
        for (auto const & [key, value]: column_json["parameters"].items()) {
27✔
567
            if (value.is_string()) {
9✔
568
                parameters[key] = value;
9✔
569
            } else {
UNCOV
570
                parameters[key] = value.dump();
×
571
            }
572
        }
9✔
573
    }
574

575
    // Check if this is a multi-output computer
576
    auto computer_info = computer_registry_->findComputerInfo(computer_name);
49✔
577
    if (!computer_info) {
49✔
UNCOV
578
        std::cerr << "TablePipeline: Computer not found: " << computer_name << std::endl;
×
UNCOV
579
        return nullptr;
×
580
    }
581

582
    // Create computer using the appropriate registry method
583
    if (computer_info->isMultiOutput) {
49✔
584
        return computer_registry_->createMultiComputer(computer_name, data_source, parameters);
6✔
585
    } else {
586
        return computer_registry_->createComputer(computer_name, data_source, parameters);
43✔
587
    }
588
}
49✔
589

590
DataSourceVariant TablePipeline::resolveDataSource(nlohmann::json const & data_source_json) {
49✔
591
    if (data_source_json.is_string()) {
49✔
592
        // Simple key reference - resolve directly through DataManagerExtension
593
        std::string key = data_source_json;
49✔
594

595
        // Try different interface types
596
        if (auto analog_source = data_manager_extension_->getAnalogSource(key)) {
49✔
597
            return analog_source;
23✔
598
        }
49✔
599
        if (auto event_source = data_manager_extension_->getEventSource(key)) {
26✔
600
            return event_source;
5✔
601
        }
26✔
602
        if (auto interval_source = data_manager_extension_->getIntervalSource(key)) {
21✔
603
            return interval_source;
15✔
604
        }
21✔
605
        if (auto line_source = data_manager_extension_->getLineSource(key)) {
6✔
606
            return line_source;
5✔
607
        }
6✔
608

609
        std::cerr << "TablePipeline: Could not resolve data source: " << key << std::endl;
1✔
610
        return {};
1✔
611

612
    } else if (data_source_json.is_object()) {
49✔
613
        // Complex data source with adapter
614
        std::string key = data_source_json.value("key", "");
×
615
        std::string adapter = data_source_json.value("adapter", "");
×
616

UNCOV
617
        if (key.empty() || adapter.empty()) {
×
UNCOV
618
            std::cerr << "TablePipeline: Data source object must have 'key' and 'adapter' fields" << std::endl;
×
UNCOV
619
            return {};
×
620
        }
621

622
        // TODO: Implement adapter-based data source resolution
623
        // This would use the ComputerRegistry's adapter system
624
        std::cerr << "TablePipeline: Adapter-based data sources not yet implemented" << std::endl;
×
625
        return {};
×
626

627
    } else {
×
628
        std::cerr << "TablePipeline: Invalid data source specification" << std::endl;
×
UNCOV
629
        return {};
×
630
    }
631
}
632

633
RowSelectorType TablePipeline::parseRowSelectorType(std::string const & type_string) {
22✔
634
    if (type_string == "interval") {
22✔
635
        return RowSelectorType::IntervalBased;
11✔
636
    } else if (type_string == "timestamp") {
11✔
637
        return RowSelectorType::Timestamp;
11✔
638
    } else if (type_string == "index") {
×
UNCOV
639
        return RowSelectorType::Index;
×
640
    } else {
UNCOV
641
        std::cerr << "TablePipeline: Unknown row selector type: " << type_string << std::endl;
×
642
        return RowSelectorType::IntervalBased;// Default fallback
×
643
    }
644
}
645

646
std::string TablePipeline::validateTableConfiguration(TableConfiguration const & config) {
23✔
647
    if (config.table_id.empty()) {
23✔
UNCOV
648
        return "table_id cannot be empty";
×
649
    }
650

651
    if (config.name.empty()) {
23✔
UNCOV
652
        return "name cannot be empty";
×
653
    }
654

655
    if (config.columns.empty()) {
23✔
UNCOV
656
        return "table must have at least one column";
×
657
    }
658

659
    // Validate row selector
660
    if (!config.row_selector.contains("type")) {
23✔
UNCOV
661
        return "row_selector must have 'type' field";
×
662
    }
663

664
    // Validate columns
665
    for (size_t i = 0; i < config.columns.size(); ++i) {
74✔
666
        auto const & column = config.columns[i];
51✔
667

668
        if (!column.contains("name")) {
51✔
UNCOV
669
            return "column " + std::to_string(i) + " missing 'name' field";
×
670
        }
671

672
        if (!column.contains("computer")) {
51✔
UNCOV
673
            return "column " + std::to_string(i) + " missing 'computer' field";
×
674
        }
675

676
        if (!column.contains("data_source")) {
51✔
UNCOV
677
            return "column " + std::to_string(i) + " missing 'data_source' field";
×
678
        }
679
    }
680

681
    return "";// Valid
69✔
682
}
683

684
// Helper template function to try adding a column with a specific type
685
template<typename T>
686
bool tryAddColumnWithType(TableViewBuilder & builder,
115✔
687
                          std::string const & column_name,
688
                          std::unique_ptr<IComputerBase> & computer) {
689
    // First try multi-output computer
690
    auto multi_wrapper = dynamic_cast<MultiComputerWrapper<T> *>(computer.get());
115✔
691
    if (multi_wrapper) {
115✔
692
        auto multi_computer = multi_wrapper->releaseComputer();
6✔
693
        builder.addColumns<T>(column_name, std::move(multi_computer));
6✔
694
        computer.reset();// Mark as consumed
6✔
695
        return true;
6✔
696
    }
6✔
697

698
    // Try single-output computer wrapper
699
    auto single_wrapper = dynamic_cast<ComputerWrapper<T> *>(computer.get());
109✔
700
    if (single_wrapper) {
109✔
701
        auto typed_computer = single_wrapper->releaseComputer();
42✔
702
        builder.addColumn(column_name, std::move(typed_computer));
42✔
703
        computer.reset();// Mark as consumed
42✔
704
        return true;
42✔
705
    }
42✔
706

707
    return false;
67✔
708
}
709

710
// Helper function to add column to builder based on type
711
bool TablePipeline::addColumnToBuilder(TableViewBuilder & builder,
48✔
712
                                       nlohmann::json const & column_json,
713
                                       std::unique_ptr<IComputerBase> computer) {
714
    std::string column_name = column_json["name"];
48✔
715

716
    // Try each supported type in order
717
    // Start with the most common types for better performance
718

719
    // Scalar types
720
    if (tryAddColumnWithType<double>(builder, column_name, computer)) return true;
48✔
721
    if (tryAddColumnWithType<float>(builder, column_name, computer)) return true;
18✔
722
    if (tryAddColumnWithType<int64_t>(builder, column_name, computer)) return true;
18✔
723
    if (tryAddColumnWithType<int>(builder, column_name, computer)) return true;
11✔
724
    if (tryAddColumnWithType<bool>(builder, column_name, computer)) return true;
10✔
725

726
    // Vector types
727
    if (tryAddColumnWithType<std::vector<double>>(builder, column_name, computer)) return true;
6✔
728
    if (tryAddColumnWithType<std::vector<float>>(builder, column_name, computer)) return true;
4✔
UNCOV
729
    if (tryAddColumnWithType<std::vector<int>>(builder, column_name, computer)) return true;
×
UNCOV
730
    if (tryAddColumnWithType<std::vector<TimeFrameIndex>>(builder, column_name, computer)) return true;
×
731

732
    // If we get here, the computer type is not supported
UNCOV
733
    std::cerr << "TablePipeline: Unsupported computer output type for column: " << column_name << std::endl;
×
UNCOV
734
    return false;
×
735
}
48✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc