• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26195612357

20 May 2026 11:19PM UTC coverage: 49.859% (-2.3%) from 52.2%
26195612357

push

github

hariharan-devarajan
feat(aggregator): improve system metrics scanning and persistence error handling

16041 of 43831 branches covered (36.6%)

Branch coverage included in aggregate %.

6 of 17 new or added lines in 2 files covered. (35.29%)

1072 existing lines in 104 files now uncovered.

21423 of 31309 relevant lines covered (68.42%)

13054.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.15
/src/dftracer/utils/server/viz_api.cpp
1
#include <dftracer/utils/core/common/logging.h>
2
#include <dftracer/utils/core/coro/channel.h>
3
#include <dftracer/utils/core/pipeline/executor.h>
4
#include <dftracer/utils/core/tasks/coro_scope.h>
5
#include <dftracer/utils/server/http_request.h>
6
#include <dftracer/utils/server/http_response.h>
7
#include <dftracer/utils/server/router.h>
8
#include <dftracer/utils/server/trace_index.h>
9
#include <dftracer/utils/server/viz_api.h>
10
#include <dftracer/utils/utilities/common/json/json_doc_guard.h>
11
#include <dftracer/utils/utilities/common/json/json_value.h>
12
#include <dftracer/utils/utilities/common/query/query.h>
13
#include <dftracer/utils/utilities/composites/dft/views/view_builder_utility.h>
14
#include <dftracer/utils/utilities/composites/dft/views/view_definition.h>
15
#include <dftracer/utils/utilities/composites/dft/views/view_reader_utility.h>
16
#include <dftracer/utils/utilities/fileio/lines/sources/async_streaming_gz_line_generator.h>
17
#include <simdjson.h>
18

19
#include <atomic>
20
#include <cstddef>
21
#include <cstdint>
22
#include <limits>
23
#include <memory>
24
#include <mutex>
25
#include <string>
26
#include <unordered_map>
27
#include <unordered_set>
28
#include <vector>
29

30
namespace dftracer::utils::server {
31

32
using namespace dftracer::utils::utilities::composites::dft;
33
using namespace dftracer::utils::utilities::composites::dft::views;
34

35
using dftracer::utils::utilities::common::json::JsonDocGuard;
36
using dftracer::utils::utilities::common::json::JsonValue;
37
using dftracer::utils::utilities::common::query::Query;
38

39
static const std::unordered_set<std::string> HASH_METADATA_NAMES = {"FH", "HH",
40
                                                                    "SH"};
41

42
/// Normalize the "ts" field in a Chrome Trace Event JSON string by
43
/// subtracting an offset.  Returns the modified JSON.  Falls back to
44
/// the original string on parse failure.
45
static std::string normalize_event_ts(const std::string& event_json,
150✔
46
                                      std::uint64_t offset) {
47
    thread_local simdjson::dom::parser tl_parser;
150✔
48
    auto result = tl_parser.parse(event_json);
150✔
49
    if (result.error()) return event_json;
150!
50

51
    auto root = result.value_unsafe();
150✔
52
    if (!root.is_object()) return event_json;
150!
53

54
    auto ts_result = root["ts"];
150✔
55
    if (ts_result.error()) return event_json;
150!
56

57
    std::uint64_t old_ts = 0;
150✔
58
    if (ts_result.is_uint64()) {
150!
59
        old_ts = ts_result.get_uint64().value_unsafe();
150✔
UNCOV
60
    } else if (ts_result.is_int64()) {
×
61
        auto val = ts_result.get_int64().value_unsafe();
×
62
        old_ts = val >= 0 ? static_cast<std::uint64_t>(val) : 0;
×
63
    } else {
64
        return event_json;
×
65
    }
66

67
    std::uint64_t new_ts = old_ts >= offset ? old_ts - offset : 0;
150!
68

69
    // simdjson DOM is read-only, so we need to rebuild the JSON with the new ts
70
    // Find "ts": and replace the value
71
    std::string modified = event_json;
150!
72
    auto pos = modified.find("\"ts\":");
150✔
73
    if (pos == std::string::npos) return event_json;
150!
74

75
    pos += 5;  // Skip past "ts":
150✔
76
    while (pos < modified.size() && std::isspace(modified[pos])) ++pos;
150!
77

78
    auto end_pos = pos;
150✔
79
    while (end_pos < modified.size() &&
3,300!
80
           (std::isdigit(modified[end_pos]) || modified[end_pos] == '-')) {
1,650!
81
        ++end_pos;
1,500✔
82
    }
83

84
    modified.replace(pos, end_pos - pos, std::to_string(new_ts));
150!
85
    return modified;
150✔
86
}
150✔
87

88
/// Compute the minimum event duration threshold for a given summary level.
89
/// Level 1 = full detail, higher levels filter shorter events.
90
static double duration_threshold(double begin, double end, unsigned level,
6✔
91
                                 unsigned viewport_width = 1920) {
92
    if (level <= 1) return 0.0;
6!
93
    double range = end - begin;
×
94
    return range /
95
           (static_cast<double>(viewport_width) * static_cast<double>(level));
×
96
}
97

98
static std::string extract_json_value(simdjson::dom::element val) {
3✔
99
    if (val.is_string()) {
3✔
100
        return std::string(val.get_string().value_unsafe());
2!
101
    }
102
    if (val.is_int64()) {
2!
103
        return std::to_string(val.get_int64().value_unsafe());
4!
104
    }
105
    if (val.is_uint64()) {
×
106
        return std::to_string(val.get_uint64().value_unsafe());
×
107
    }
108
    return {};
×
109
}
110

111
static void append_lane_clause(std::string& dsl, const char* field,
1✔
112
                               const std::string& val) {
113
    if (!dsl.empty()) dsl += " and ";
1!
114
    bool numeric =
115
        !val.empty() && std::all_of(val.begin(), val.end(),
1!
116
                                    [](char c) { return std::isdigit(c); });
1✔
117
    if (numeric) {
1!
118
        dsl += std::string(field) + " == " + val;
1!
119
    } else {
120
        dsl += std::string(field) + " == \"" + val + "\"";
×
121
    }
122
}
1✔
123

124
static void apply_lanes(std::string& dsl, std::string_view lanes_str) {
6✔
125
    if (lanes_str.empty()) return;
6✔
126

127
    thread_local simdjson::dom::parser tl_parser;
1!
128
    auto result = tl_parser.parse(lanes_str.data(), lanes_str.size());
1✔
129
    if (result.error()) return;
1!
130

131
    auto root = result.value_unsafe();
1✔
132

133
    if (root.is_array()) {
1!
134
        auto arr = root.get_array().value_unsafe();
1✔
135
        for (auto item : arr) {
2✔
136
            if (!item.is_object()) continue;
1!
137
            auto obj = item.get_object().value_unsafe();
1✔
138

139
            auto field_result = obj["field"];
1✔
140
            if (field_result.error()) field_result = obj["fields"];
1!
141
            auto value_result = obj["value"];
1✔
142
            if (field_result.error() || value_result.error()) continue;
1!
143

144
            if (!field_result.value_unsafe().is_string()) continue;
1!
145
            const char* field =
146
                field_result.value_unsafe().get_c_str().value_unsafe();
1✔
147
            auto val = extract_json_value(value_result.value_unsafe());
1!
148
            if (!val.empty()) append_lane_clause(dsl, field, val);
1!
149
        }
1✔
UNCOV
150
    } else if (root.is_object()) {
×
151
        auto obj = root.get_object().value_unsafe();
×
152

153
        auto field_result = obj["field"];
×
154
        if (field_result.error()) field_result = obj["fields"];
×
155
        auto value_result = obj["value"];
×
156

157
        if (!field_result.error() && !value_result.error()) {
×
158
            if (field_result.value_unsafe().is_string()) {
×
159
                const char* field =
160
                    field_result.value_unsafe().get_c_str().value_unsafe();
×
161
                auto val = extract_json_value(value_result.value_unsafe());
×
162
                if (!val.empty()) append_lane_clause(dsl, field, val);
×
163
            }
×
164
        }
165
    }
166
}
167

168
static void apply_filters(std::string& dsl, std::string_view filters_str) {
6✔
169
    if (filters_str.empty()) return;
6✔
170

171
    thread_local simdjson::dom::parser tl_parser;
2✔
172
    auto result = tl_parser.parse(filters_str.data(), filters_str.size());
2✔
173
    if (result.error()) return;
2!
174

175
    auto root = result.value_unsafe();
2✔
176
    if (!root.is_array()) return;
2!
177

178
    auto arr = root.get_array().value_unsafe();
2✔
179
    for (auto item : arr) {
4✔
180
        if (!item.is_object()) continue;
2!
181
        auto obj = item.get_object().value_unsafe();
2✔
182

183
        auto field_result = obj["field"];
2✔
184
        auto op_result = obj["op"];
2✔
185
        auto value_result = obj["value"];
2✔
186
        if (field_result.error() || op_result.error() || value_result.error())
2!
187
            continue;
×
188

189
        if (!field_result.value_unsafe().is_string() ||
4!
190
            !op_result.value_unsafe().is_string())
2!
191
            continue;
×
192

193
        const char* field =
194
            field_result.value_unsafe().get_c_str().value_unsafe();
2✔
195
        const char* op = op_result.value_unsafe().get_c_str().value_unsafe();
2✔
196

197
        std::string val = extract_json_value(value_result.value_unsafe());
2!
198
        if (val.empty()) continue;
2!
199

200
        std::string op_str(op);
2!
201
        std::string field_str(field);
2!
202
        if (field_str == "begin") field_str = "ts";
2!
203
        if (field_str == "end") field_str = "ts";
2!
204
        if (field_str == "duration") field_str = "dur";
2!
205

206
        std::string query_op;
2✔
207
        if (op_str == "=")
2✔
208
            query_op = "==";
1!
209
        else if (op_str == ">=")
1!
210
            query_op = ">=";
1!
211
        else if (op_str == "<=")
×
212
            query_op = "<=";
×
213
        else if (op_str == ">")
×
214
            query_op = ">";
×
215
        else if (op_str == "<")
×
216
            query_op = "<";
×
217
        else
218
            continue;
×
219

220
        if (!dsl.empty()) dsl += " and ";
2!
221
        bool numeric = !val.empty() && (std::isdigit(val[0]) || val[0] == '-');
2!
222
        if (numeric || query_op != "==") {
2!
223
            dsl += field_str + " " + query_op + " " + val;
2!
224
        } else {
225
            dsl += field_str + " " + query_op + " \"" + val + "\"";
×
226
        }
227
    }
2!
228
}
229

230
// --- GET /api/v1/viz/events ---
231
static coro::CoroTask<HttpResponse> handle_viz_events(
7!
232
    const HttpRequest& /*req*/, const QueryParams& params, TraceIndex& index) {
233
    // Required: begin, end, summary
234
    if (!params.has("begin") || !params.has("end") || !params.has("summary")) {
235
        co_return HttpResponse::bad_request(
236
            "Missing required parameters: begin, end, summary");
237
    }
238

239
    double begin = params.get_double("begin", 0);
240
    double end = params.get_double("end", 0);
241
    int summary = params.get_int("summary", 1);
242
    if (summary < 1) summary = 1;
243

244
    // Timestamp normalization: default ON, opt-out with ?ts_normalize=0
245
    auto ts_norm_param = params.get("ts_normalize");
246
    bool normalize = ts_norm_param.empty() || ts_norm_param != "0";
247

248
    std::uint64_t global_min = 0;
249
    if (normalize) {
250
        global_min = index.global_min_timestamp_us();
251
        if (global_min == std::numeric_limits<std::uint64_t>::max()) {
252
            global_min = 0;  // No valid bounds, skip normalization
253
        }
254
    }
255

256
    // When normalization is active the user sends normalized
257
    // begin/end values (relative to global_min).  De-normalize them
258
    // so the predicate filters against absolute timestamps.
259
    double original_begin = begin;
260
    double original_end = end;
261
    if (normalize && global_min > 0) {
262
        begin += static_cast<double>(global_min);
263
        end += static_cast<double>(global_min);
264
    }
265

266
    double min_dur =
267
        duration_threshold(begin, end, static_cast<unsigned>(summary));
268

269
    // Build view with time range and optional filters
270
    ViewDefinition view;
271
    view.name = "viz_query";
272
    view.description = "Visualization query";
273

274
    std::string dsl;
275
    dsl += "ts >= " + std::to_string(static_cast<uint64_t>(begin));
276
    dsl += " and ts <= " + std::to_string(static_cast<uint64_t>(end));
277
    if (min_dur > 0) {
278
        dsl += " and dur >= " + std::to_string(static_cast<uint64_t>(min_dur));
279
    }
280

281
    apply_lanes(dsl, params.get("lanes"));
282
    apply_filters(dsl, params.get("filters"));
283

284
    auto pid = params.get("pid");
285
    if (!pid.empty()) {
286
        dsl += " and pid == " + std::string(pid);
287
    }
288

289
    auto tid = params.get("tid");
290
    if (!tid.empty()) {
291
        dsl += " and tid == " + std::string(tid);
292
    }
293

294
    auto cat = params.get("cat");
295
    if (!cat.empty()) {
296
        dsl += " and cat == \"" + std::string(cat) + "\"";
297
    }
298

299
    view.with_query(dsl);
300

301
    // Optional limit: 0 (default) means no limit.
302
    int limit = params.get_int("limit", 0);
303
    if (limit < 0) limit = 0;
304

305
    // Determine files
306
    std::vector<const TraceIndex::FileInfo*> target_files;
307
    auto file_param = params.get("file");
308
    if (!file_param.empty()) {
309
        auto* f = index.find_file(std::string(file_param));
310
        if (f) target_files.push_back(f);
311
    } else {
312
        for (const auto& f : index.files()) {
313
            target_files.push_back(&f);
314
        }
315
    }
316

317
    // File-level time range skip: remove files whose cached time
318
    // bounds don't overlap the query window [begin, end].
319
    if (begin > 0 || end > 0) {
320
        std::vector<const TraceIndex::FileInfo*> filtered;
321
        filtered.reserve(target_files.size());
322
        for (auto* fi : target_files) {
323
            if (fi->min_timestamp_us == 0 && fi->max_timestamp_us == 0) {
324
                filtered.push_back(fi);
325
                continue;
326
            }
327
            double fi_min = static_cast<double>(fi->min_timestamp_us);
328
            double fi_max = static_cast<double>(fi->max_timestamp_us);
329
            if (fi_max < begin || fi_min > end) continue;
330
            filtered.push_back(fi);
331
        }
332
        target_files = std::move(filtered);
333
    }
334

335
    std::vector<std::string> collected_events;
336

337
    bool truncated = false;
338

339
    if (target_files.size() <= 1) {
340
        for (auto* file_info : target_files) {
341
            if (limit > 0 &&
342
                static_cast<int>(collected_events.size()) >= limit) {
343
                truncated = true;
344
                break;
345
            }
346
            if (file_info->uncompressed_size == 0 &&
347
                file_info->num_checkpoints == 0)
348
                continue;
349

350
            ViewBuilderInput builder_input;
351
            builder_input.with_view(view)
352
                .with_file_path(file_info->path)
353
                .with_index_path(
354
                    file_info->has_bloom_data ? file_info->index_path : "")
355
                .with_uncompressed_size(file_info->uncompressed_size)
356
                .with_num_checkpoints(file_info->num_checkpoints)
357
                .with_bloom_cache(&index.bloom_cache())
358
                .with_time_range(begin, end);
359

360
            ViewBuilderUtility builder;
361
            auto build_output = co_await builder.process(builder_input);
362
            if (!build_output.success || !build_output.file_may_match) continue;
363

364
            for (const auto& candidate : build_output.candidates) {
365
                if (limit > 0 &&
366
                    static_cast<int>(collected_events.size()) >= limit) {
367
                    truncated = true;
368
                    break;
369
                }
370
                ViewReaderInput reader_input;
371
                reader_input.with_file_path(file_info->path)
372
                    .with_index_path(file_info->index_path)
373
                    .with_byte_range(candidate.start_byte, candidate.end_byte)
374
                    .with_checkpoint_idx(candidate.checkpoint_idx)
375
                    .with_view(view);
376

377
                ViewReaderUtility reader;
378
                auto gen = reader.process(reader_input);
379
                while (auto batch = co_await gen.next()) {
380
                    for (auto& event : batch->events) {
381
                        if (limit > 0 &&
382
                            static_cast<int>(collected_events.size()) >=
383
                                limit) {
384
                            truncated = true;
385
                            break;
386
                        }
387
                        collected_events.emplace_back(event);
388
                    }
389
                    if (truncated) break;
390
                }
391
            }
392
        }
393
    } else {
394
        std::size_t num_workers =
395
            std::min(index.max_concurrent(), target_files.size());
396
        auto* executor = Executor::current();
397

398
        auto file_chan = coro::make_channel<std::size_t>(num_workers * 2);
399
        auto collected_mutex = std::make_shared<std::mutex>();
400
        auto remaining = std::make_shared<std::atomic<int>>(
401
            limit > 0 ? limit : std::numeric_limits<int>::max());
402

403
        auto* target_files_ptr = &target_files;
404
        auto* collected_ptr = &collected_events;
405
        auto* view_ptr = &view;
406
        auto* bloom_cache_ptr = &index.bloom_cache();
407
        double t_begin = begin;
408
        double t_end = end;
409

410
        CoroScope scope(executor);
411

412
        scope.spawn([ch = file_chan->producer(), target_files_ptr](
×
413
                        CoroScope&) mutable -> coro::CoroTask<void> {
414
            auto guard = ch.guard();
415
            for (std::size_t i = 0; i < target_files_ptr->size(); ++i) {
416
                if (!co_await ch.send(i)) co_return;
417
            }
418
            co_return;
419
        });
×
420

421
        for (std::size_t w = 0; w < num_workers; ++w) {
422
            scope.spawn([file_chan, target_files_ptr, collected_mutex,
×
423
                         collected_ptr, view_ptr, bloom_cache_ptr, remaining,
424
                         t_begin, t_end](CoroScope&) -> coro::CoroTask<void> {
425
                while (auto fi_opt = co_await file_chan->receive()) {
426
                    if (remaining->load(std::memory_order_relaxed) <= 0)
427
                        co_return;
428
                    auto* file_info = (*target_files_ptr)[*fi_opt];
429

430
                    if (file_info->uncompressed_size == 0 &&
431
                        file_info->num_checkpoints == 0)
432
                        continue;
433

434
                    ViewBuilderInput builder_input;
435
                    builder_input.with_view(*view_ptr)
436
                        .with_file_path(file_info->path)
437
                        .with_index_path(file_info->has_bloom_data
438
                                             ? file_info->index_path
439
                                             : "")
440
                        .with_uncompressed_size(file_info->uncompressed_size)
441
                        .with_num_checkpoints(file_info->num_checkpoints)
442
                        .with_bloom_cache(bloom_cache_ptr)
443
                        .with_time_range(t_begin, t_end);
444

445
                    ViewBuilderUtility builder;
446
                    auto build_output = co_await builder.process(builder_input);
447
                    if (!build_output.success || !build_output.file_may_match)
448
                        continue;
449

450
                    for (const auto& candidate : build_output.candidates) {
451
                        if (remaining->load(std::memory_order_relaxed) <= 0)
452
                            break;
453

454
                        ViewReaderInput reader_input;
455
                        reader_input.with_file_path(file_info->path)
456
                            .with_index_path(file_info->index_path)
457
                            .with_byte_range(candidate.start_byte,
458
                                             candidate.end_byte)
459
                            .with_checkpoint_idx(candidate.checkpoint_idx)
460
                            .with_view(*view_ptr);
461

462
                        ViewReaderUtility reader;
463
                        auto gen = reader.process(reader_input);
464
                        while (auto batch = co_await gen.next()) {
465
                            if (!batch->events.empty()) {
466
                                std::lock_guard<std::mutex> lock(
467
                                    *collected_mutex);
468
                                for (auto& event : batch->events) {
469
                                    collected_ptr->emplace_back(event);
470
                                }
471
                                remaining->fetch_sub(
472
                                    static_cast<int>(batch->events.size()));
473
                            }
474
                        }
475
                    }
476
                }
477
                co_return;
478
            });
×
479
        }
480

481
        co_await scope.join();
482

483
        if (limit > 0 && static_cast<int>(collected_events.size()) > limit) {
484
            collected_events.resize(static_cast<std::size_t>(limit));
485
            truncated = true;
486
        }
487
    }
488

489
    // Apply normalization to event timestamps.
490
    if (normalize && global_min > 0) {
491
        for (auto& event : collected_events) {
492
            event = normalize_event_ts(event, global_min);
493
        }
494
    }
495

496
    // Use the original (normalized) begin/end for metadata.
497
    double meta_begin = original_begin;
498
    double meta_end = original_end;
499

500
    // Build response matching the Chrome Trace Event Format.
501
    // Pre-compute size to avoid repeated reallocations.
502
    std::size_t body_size = 256;
503
    for (const auto& ev : collected_events) body_size += ev.size() + 1;
504
    std::string body;
505
    body.reserve(body_size);
506
    body += "{\"events\":[";
507
    for (std::size_t i = 0; i < collected_events.size(); ++i) {
508
        if (i > 0) body += ',';
509
        body += collected_events[i];  // Already JSON
510
    }
511
    body += "],\"metadata\":{\"begin\":";
512
    body += std::to_string(meta_begin);
513
    body += ",\"end\":";
514
    body += std::to_string(meta_end);
515
    body += ",\"count\":";
516
    body += std::to_string(collected_events.size());
517
    body += ",\"limit\":";
518
    body += std::to_string(limit);
519
    body += ",\"truncated\":";
520
    body += truncated ? "true" : "false";
521
    body += ",\"ts_normalized\":";
522
    body += (normalize && global_min > 0) ? "true" : "false";
523
    body += ",\"global_min_timestamp_us\":";
524
    body += std::to_string(index.global_min_timestamp_us());
525
    body += "}}";
526

527
    co_return HttpResponse::ok(body);
528
}
14!
529

530
void register_viz_api(Router& router, TraceIndex& index) {
2✔
531
    auto* index_ptr = &index;
2✔
532

533
    router.get(
2!
534
        "/api/v1/viz/events",
535
        [index_ptr](const HttpRequest& req,
7!
536
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
537
            co_return co_await handle_viz_events(req, params, *index_ptr);
538
        });
14!
539
}
2✔
540

541
}  // namespace dftracer::utils::server
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc