• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 27171677342

08 Jun 2026 10:43PM UTC coverage: 51.99% (+0.05%) from 51.937%
27171677342

Pull #77

github

web-flow
Merge 3a1432eec into 8045f0be3
Pull Request #77: chore: bump version to 0.0.10

36972 of 92663 branches covered (39.9%)

Branch coverage included in aggregate %.

33405 of 42703 relevant lines covered (78.23%)

20411.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.34
/src/dftracer/utils/server/viz_api.cpp
1
#include <dftracer/utils/core/common/logging.h>
2
#include <dftracer/utils/core/coro/channel.h>
3
#include <dftracer/utils/core/pipeline/executor.h>
4
#include <dftracer/utils/core/tasks/coro_scope.h>
5
#include <dftracer/utils/server/http_request.h>
6
#include <dftracer/utils/server/http_response.h>
7
#include <dftracer/utils/server/router.h>
8
#include <dftracer/utils/server/trace_index.h>
9
#include <dftracer/utils/server/viz_api.h>
10
#include <dftracer/utils/utilities/common/json/json_doc_guard.h>
11
#include <dftracer/utils/utilities/common/json/json_value.h>
12
#include <dftracer/utils/utilities/common/query/query.h>
13
#include <dftracer/utils/utilities/composites/dft/views/view_builder_utility.h>
14
#include <dftracer/utils/utilities/composites/dft/views/view_definition.h>
15
#include <dftracer/utils/utilities/composites/dft/views/view_reader_utility.h>
16
#include <dftracer/utils/utilities/fileio/lines/sources/async_streaming_gz_line_generator.h>
17
#include <simdjson.h>
18

19
#include <atomic>
20
#include <cstddef>
21
#include <cstdint>
22
#include <limits>
23
#include <memory>
24
#include <mutex>
25
#include <string>
26
#include <unordered_map>
27
#include <unordered_set>
28
#include <vector>
29

30
namespace dftracer::utils::server {
31

32
using namespace dftracer::utils::utilities::composites::dft;
33
using namespace dftracer::utils::utilities::composites::dft::views;
34

35
using dftracer::utils::utilities::common::json::JsonDocGuard;
36
using dftracer::utils::utilities::common::json::JsonValue;
37
using dftracer::utils::utilities::common::query::Query;
38

39
static const std::unordered_set<std::string> HASH_METADATA_NAMES = {"FH", "HH",
256!
40
                                                                    "SH"};
256!
41

42
/// Normalize the "ts" field in a Chrome Trace Event JSON string by
43
/// subtracting an offset.  Returns the modified JSON.  Falls back to
44
/// the original string on parse failure.
45
static std::string normalize_event_ts(const std::string& event_json,
300✔
46
                                      std::uint64_t offset) {
47
    thread_local simdjson::dom::parser tl_parser;
300✔
48
    auto result = tl_parser.parse(event_json);
300✔
49
    if (result.error()) return event_json;
300!
50

51
    auto root = result.value_unsafe();
300✔
52
    if (!root.is_object()) return event_json;
300!
53

54
    auto ts_result = root["ts"];
300✔
55
    if (ts_result.error()) return event_json;
300!
56

57
    std::uint64_t old_ts = 0;
300✔
58
    if (ts_result.is_uint64()) {
300!
59
        old_ts = ts_result.get_uint64().value_unsafe();
300✔
60
    } else if (ts_result.is_int64()) {
150!
61
        auto val = ts_result.get_int64().value_unsafe();
×
62
        old_ts = val >= 0 ? static_cast<std::uint64_t>(val) : 0;
×
63
    } else {
64
        return event_json;
×
65
    }
66

67
    std::uint64_t new_ts = old_ts >= offset ? old_ts - offset : 0;
300!
68

69
    // simdjson DOM is read-only, so we need to rebuild the JSON with the new ts
70
    // Find "ts": and replace the value
71
    std::string modified = event_json;
300!
72
    auto pos = modified.find("\"ts\":");
300✔
73
    if (pos == std::string::npos) return event_json;
300!
74

75
    pos += 5;  // Skip past "ts":
300✔
76
    while (pos < modified.size() && std::isspace(modified[pos])) ++pos;
300!
77

78
    auto end_pos = pos;
300✔
79
    while (end_pos < modified.size() &&
5,100✔
80
           (std::isdigit(modified[end_pos]) || modified[end_pos] == '-')) {
3,300!
81
        ++end_pos;
3,000✔
82
    }
83

84
    modified.replace(pos, end_pos - pos, std::to_string(new_ts));
300!
85
    return modified;
300✔
86
}
300✔
87

88
/// Compute the minimum event duration threshold for a given summary level.
89
/// Level 1 = full detail, higher levels filter shorter events.
90
static double duration_threshold(double begin, double end, unsigned level,
12✔
91
                                 unsigned viewport_width = 1920) {
92
    if (level <= 1) return 0.0;
12!
93
    double range = end - begin;
×
94
    return range /
95
           (static_cast<double>(viewport_width) * static_cast<double>(level));
×
96
}
6✔
97

98
static std::string extract_json_value(simdjson::dom::element val) {
6✔
99
    if (val.is_string()) {
6✔
100
        return std::string(val.get_string().value_unsafe());
3!
101
    }
102
    if (val.is_int64()) {
4!
103
        return std::to_string(val.get_int64().value_unsafe());
6!
104
    }
105
    if (val.is_uint64()) {
×
106
        return std::to_string(val.get_uint64().value_unsafe());
×
107
    }
108
    return {};
×
109
}
3✔
110

111
static void append_lane_clause(std::string& dsl, const char* field,
2✔
112
                               const std::string& val) {
113
    if (!dsl.empty()) dsl += " and ";
2✔
114
    bool numeric =
1✔
115
        !val.empty() && std::all_of(val.begin(), val.end(),
2!
116
                                    [](char c) { return std::isdigit(c); });
2✔
117
    if (numeric) {
2!
118
        dsl += std::string(field) + " == " + val;
2!
119
    } else {
1✔
120
        dsl += std::string(field) + " == \"" + val + "\"";
×
121
    }
122
}
2✔
123

124
static void apply_lanes(std::string& dsl, std::string_view lanes_str) {
12✔
125
    if (lanes_str.empty()) return;
12✔
126

127
    thread_local simdjson::dom::parser tl_parser;
2✔
128
    auto result = tl_parser.parse(lanes_str.data(), lanes_str.size());
2✔
129
    if (result.error()) return;
2!
130

131
    auto root = result.value_unsafe();
2✔
132

133
    if (root.is_array()) {
2!
134
        auto arr = root.get_array().value_unsafe();
2✔
135
        for (auto item : arr) {
4✔
136
            if (!item.is_object()) continue;
2✔
137
            auto obj = item.get_object().value_unsafe();
2✔
138

139
            auto field_result = obj["field"];
2✔
140
            if (field_result.error()) field_result = obj["fields"];
2✔
141
            auto value_result = obj["value"];
2✔
142
            if (field_result.error() || value_result.error()) continue;
2!
143

144
            if (!field_result.value_unsafe().is_string()) continue;
2✔
145
            const char* field =
1✔
146
                field_result.value_unsafe().get_c_str().value_unsafe();
2✔
147
            auto val = extract_json_value(value_result.value_unsafe());
2!
148
            if (!val.empty()) append_lane_clause(dsl, field, val);
2✔
149
        }
2✔
150
    } else if (root.is_object()) {
1!
151
        auto obj = root.get_object().value_unsafe();
×
152

153
        auto field_result = obj["field"];
×
154
        if (field_result.error()) field_result = obj["fields"];
×
155
        auto value_result = obj["value"];
×
156

157
        if (!field_result.error() && !value_result.error()) {
×
158
            if (field_result.value_unsafe().is_string()) {
×
159
                const char* field =
160
                    field_result.value_unsafe().get_c_str().value_unsafe();
×
161
                auto val = extract_json_value(value_result.value_unsafe());
×
162
                if (!val.empty()) append_lane_clause(dsl, field, val);
×
163
            }
×
164
        }
165
    }
166
}
6✔
167

168
static void apply_filters(std::string& dsl, std::string_view filters_str) {
12✔
169
    if (filters_str.empty()) return;
12✔
170

171
    thread_local simdjson::dom::parser tl_parser;
4✔
172
    auto result = tl_parser.parse(filters_str.data(), filters_str.size());
4✔
173
    if (result.error()) return;
4!
174

175
    auto root = result.value_unsafe();
4✔
176
    if (!root.is_array()) return;
4✔
177

178
    auto arr = root.get_array().value_unsafe();
4✔
179
    for (auto item : arr) {
8✔
180
        if (!item.is_object()) continue;
4✔
181
        auto obj = item.get_object().value_unsafe();
4✔
182

183
        auto field_result = obj["field"];
4✔
184
        auto op_result = obj["op"];
4✔
185
        auto value_result = obj["value"];
4✔
186
        if (field_result.error() || op_result.error() || value_result.error())
4!
187
            continue;
×
188

189
        if (!field_result.value_unsafe().is_string() ||
6!
190
            !op_result.value_unsafe().is_string())
4!
191
            continue;
×
192

193
        const char* field =
2✔
194
            field_result.value_unsafe().get_c_str().value_unsafe();
4✔
195
        const char* op = op_result.value_unsafe().get_c_str().value_unsafe();
4✔
196

197
        std::string val = extract_json_value(value_result.value_unsafe());
4!
198
        if (val.empty()) continue;
4!
199

200
        std::string op_str(op);
4✔
201
        std::string field_str(field);
4✔
202
        if (field_str == "begin") field_str = "ts";
4!
203
        if (field_str == "end") field_str = "ts";
4!
204
        if (field_str == "duration") field_str = "dur";
4!
205

206
        std::string query_op;
4✔
207
        if (op_str == "=")
4✔
208
            query_op = "==";
2!
209
        else if (op_str == ">=")
2!
210
            query_op = ">=";
2!
211
        else if (op_str == "<=")
×
212
            query_op = "<=";
×
213
        else if (op_str == ">")
×
214
            query_op = ">";
×
215
        else if (op_str == "<")
×
216
            query_op = "<";
×
217
        else
218
            continue;
×
219

220
        if (!dsl.empty()) dsl += " and ";
4!
221
        bool numeric = !val.empty() && (std::isdigit(val[0]) || val[0] == '-');
6!
222
        if (numeric || query_op != "==") {
4!
223
            dsl += field_str + " " + query_op + " " + val;
4!
224
        } else {
2✔
225
            dsl += field_str + " " + query_op + " \"" + val + "\"";
×
226
        }
227
    }
4!
228
}
6✔
229

230
// --- GET /api/v1/viz/events ---
231
static coro::CoroTask<HttpResponse> handle_viz_events(
66!
232
    const HttpRequest& /*req*/, const QueryParams& params, TraceIndex& index) {
7!
233
    // Required: begin, end, summary
234
    if (!params.has("begin") || !params.has("end") || !params.has("summary")) {
7!
235
        co_return HttpResponse::bad_request(
8!
236
            "Missing required parameters: begin, end, summary");
1!
237
    }
238

239
    double begin = params.get_double("begin", 0);
6!
240
    double end = params.get_double("end", 0);
6!
241
    int summary = params.get_int("summary", 1);
6!
242
    if (summary < 1) summary = 1;
6!
243

244
    // Timestamp normalization: default ON, opt-out with ?ts_normalize=0
245
    auto ts_norm_param = params.get("ts_normalize");
6!
246
    bool normalize = ts_norm_param.empty() || ts_norm_param != "0";
6!
247

248
    std::uint64_t global_min = 0;
6✔
249
    if (normalize) {
6✔
250
        global_min = index.global_min_timestamp_us();
5!
251
        if (global_min == std::numeric_limits<std::uint64_t>::max()) {
5!
252
            global_min = 0;  // No valid bounds, skip normalization
253
        }
254
    }
5✔
255

256
    // When normalization is active the user sends normalized
257
    // begin/end values (relative to global_min).  De-normalize them
258
    // so the predicate filters against absolute timestamps.
259
    double original_begin = begin;
6✔
260
    double original_end = end;
6✔
261
    if (normalize && global_min > 0) {
6!
262
        begin += static_cast<double>(global_min);
5✔
263
        end += static_cast<double>(global_min);
5✔
264
    }
5✔
265

266
    double min_dur =
6✔
267
        duration_threshold(begin, end, static_cast<unsigned>(summary));
6!
268

269
    // Build view with time range and optional filters
270
    ViewDefinition view;
6✔
271
    view.name = "viz_query";
6!
272
    view.description = "Visualization query";
6!
273

274
    std::string dsl;
6✔
275
    dsl += "ts >= " + std::to_string(static_cast<uint64_t>(begin));
6!
276
    dsl += " and ts <= " + std::to_string(static_cast<uint64_t>(end));
6!
277
    if (min_dur > 0) {
6!
278
        dsl += " and dur >= " + std::to_string(static_cast<uint64_t>(min_dur));
×
279
    }
280

281
    apply_lanes(dsl, params.get("lanes"));
6!
282
    apply_filters(dsl, params.get("filters"));
6!
283

284
    auto pid = params.get("pid");
6!
285
    if (!pid.empty()) {
6!
286
        dsl += " and pid == " + std::string(pid);
×
287
    }
288

289
    auto tid = params.get("tid");
6!
290
    if (!tid.empty()) {
6!
291
        dsl += " and tid == " + std::string(tid);
×
292
    }
293

294
    auto cat = params.get("cat");
6!
295
    if (!cat.empty()) {
6!
296
        dsl += " and cat == \"" + std::string(cat) + "\"";
×
297
    }
298

299
    view.with_query(dsl);
6!
300

301
    // Optional limit: 0 (default) means no limit.
302
    int limit = params.get_int("limit", 0);
6!
303
    if (limit < 0) limit = 0;
6!
304

305
    // Determine files
306
    std::vector<const TraceIndex::FileInfo*> target_files;
6✔
307
    auto file_param = params.get("file");
6!
308
    if (!file_param.empty()) {
6!
309
        auto* f = index.find_file(std::string(file_param));
×
310
        if (f) target_files.push_back(f);
×
311
    } else {
312
        for (const auto& f : index.files()) {
12!
313
            target_files.push_back(&f);
6!
314
        }
6✔
315
    }
316

317
    // File-level time range skip: remove files whose cached time
318
    // bounds don't overlap the query window [begin, end].
319
    if (begin > 0 || end > 0) {
6!
320
        std::vector<const TraceIndex::FileInfo*> filtered;
6✔
321
        filtered.reserve(target_files.size());
6!
322
        for (auto* fi : target_files) {
12✔
323
            if (fi->min_timestamp_us == 0 && fi->max_timestamp_us == 0) {
6!
324
                filtered.push_back(fi);
×
325
                continue;
326
            }
327
            double fi_min = static_cast<double>(fi->min_timestamp_us);
6✔
328
            double fi_max = static_cast<double>(fi->max_timestamp_us);
6✔
329
            if (fi_max < begin || fi_min > end) continue;
6!
330
            filtered.push_back(fi);
5!
331
        }
6✔
332
        target_files = std::move(filtered);
6✔
333
    }
6✔
334

335
    std::vector<std::string> collected_events;
6✔
336

337
    bool truncated = false;
6✔
338

339
    if (target_files.size() <= 1) {
6!
340
        for (auto* file_info : target_files) {
23✔
341
            if (limit > 0 &&
5!
342
                static_cast<int>(collected_events.size()) >= limit) {
343
                truncated = true;
344
                break;
345
            }
346
            if (file_info->uncompressed_size == 0 &&
5!
347
                file_info->num_checkpoints == 0)
348
                continue;
349

350
            ViewBuilderInput builder_input;
5✔
351
            builder_input.with_view(view)
10!
352
                .with_file_path(file_info->path)
5!
353
                .with_index_path(
5!
354
                    file_info->has_bloom_data ? file_info->index_path : "")
5!
355
                .with_uncompressed_size(file_info->uncompressed_size)
5!
356
                .with_num_checkpoints(file_info->num_checkpoints)
5!
357
                .with_bloom_cache(&index.bloom_cache())
5!
358
                .with_time_range(begin, end);
5!
359

360
            ViewBuilderUtility builder;
5!
361
            auto build_output = co_await builder.process(builder_input);
10!
362
            if (!build_output.success || !build_output.file_may_match) continue;
5!
363

364
            for (const auto& candidate : build_output.candidates) {
18✔
365
                if (limit > 0 &&
3!
366
                    static_cast<int>(collected_events.size()) >= limit) {
367
                    truncated = true;
368
                    break;
369
                }
370
                ViewReaderInput reader_input;
3✔
371
                reader_input.with_file_path(file_info->path)
3!
372
                    .with_index_path(file_info->index_path)
3!
373
                    .with_byte_range(candidate.start_byte, candidate.end_byte)
3!
374
                    .with_checkpoint_idx(candidate.checkpoint_idx)
3!
375
                    .with_view(view);
3!
376

377
                ViewReaderUtility reader;
3!
378
                auto gen = reader.process(reader_input);
3!
379
                while (auto batch = co_await gen.next()) {
24!
380
                    for (auto& event : batch->events) {
153✔
381
                        if (limit > 0 &&
150!
382
                            static_cast<int>(collected_events.size()) >=
383
                                limit) {
384
                            truncated = true;
385
                            break;
386
                        }
387
                        collected_events.emplace_back(event);
150!
388
                    }
150!
389
                    if (truncated) break;
3!
390
                }
6✔
391
            }
15✔
392
        }
17✔
393
    } else {
6✔
394
        std::size_t num_workers =
395
            std::min(index.max_concurrent(), target_files.size());
×
396
        auto* executor = Executor::current();
397

398
        auto file_chan = coro::make_channel<std::size_t>(num_workers * 2);
×
399
        auto collected_mutex = std::make_shared<std::mutex>();
×
400
        auto remaining = std::make_shared<std::atomic<int>>(
×
401
            limit > 0 ? limit : std::numeric_limits<int>::max());
×
402

403
        auto* target_files_ptr = &target_files;
404
        auto* collected_ptr = &collected_events;
405
        auto* view_ptr = &view;
406
        auto* bloom_cache_ptr = &index.bloom_cache();
×
407
        double t_begin = begin;
408
        double t_end = end;
409

410
        CoroScope scope(executor);
×
411

412
        scope.spawn([ch = file_chan->producer(), target_files_ptr](
×
413
                        CoroScope&) mutable -> coro::CoroTask<void> {
×
414
            auto guard = ch.guard();
×
415
            for (std::size_t i = 0; i < target_files_ptr->size(); ++i) {
×
416
                if (!co_await ch.send(i)) co_return;
×
417
            }
418
            co_return;
419
        });
×
420

421
        for (std::size_t w = 0; w < num_workers; ++w) {
×
422
            scope.spawn([file_chan, target_files_ptr, collected_mutex,
×
423
                         collected_ptr, view_ptr, bloom_cache_ptr, remaining,
424
                         t_begin, t_end](CoroScope&) -> coro::CoroTask<void> {
×
425
                while (auto fi_opt = co_await file_chan->receive()) {
×
426
                    if (remaining->load(std::memory_order_relaxed) <= 0)
×
427
                        co_return;
428
                    auto* file_info = (*target_files_ptr)[*fi_opt];
429

430
                    if (file_info->uncompressed_size == 0 &&
×
431
                        file_info->num_checkpoints == 0)
432
                        continue;
433

434
                    ViewBuilderInput builder_input;
435
                    builder_input.with_view(*view_ptr)
×
436
                        .with_file_path(file_info->path)
×
437
                        .with_index_path(file_info->has_bloom_data
×
438
                                             ? file_info->index_path
×
439
                                             : "")
×
440
                        .with_uncompressed_size(file_info->uncompressed_size)
×
441
                        .with_num_checkpoints(file_info->num_checkpoints)
×
442
                        .with_bloom_cache(bloom_cache_ptr)
×
443
                        .with_time_range(t_begin, t_end);
×
444

445
                    ViewBuilderUtility builder;
×
446
                    auto build_output = co_await builder.process(builder_input);
×
447
                    if (!build_output.success || !build_output.file_may_match)
×
448
                        continue;
449

450
                    for (const auto& candidate : build_output.candidates) {
×
451
                        if (remaining->load(std::memory_order_relaxed) <= 0)
×
452
                            break;
453

454
                        ViewReaderInput reader_input;
455
                        reader_input.with_file_path(file_info->path)
×
456
                            .with_index_path(file_info->index_path)
×
457
                            .with_byte_range(candidate.start_byte,
×
458
                                             candidate.end_byte)
459
                            .with_checkpoint_idx(candidate.checkpoint_idx)
×
460
                            .with_view(*view_ptr);
×
461

462
                        ViewReaderUtility reader;
×
463
                        auto gen = reader.process(reader_input);
×
464
                        while (auto batch = co_await gen.next()) {
×
465
                            if (!batch->events.empty()) {
×
466
                                std::lock_guard<std::mutex> lock(
×
467
                                    *collected_mutex);
468
                                for (auto& event : batch->events) {
×
469
                                    collected_ptr->emplace_back(event);
×
470
                                }
471
                                remaining->fetch_sub(
472
                                    static_cast<int>(batch->events.size()));
473
                            }
474
                        }
×
475
                    }
×
476
                }
×
477
                co_return;
478
            });
×
479
        }
480

481
        co_await scope.join();
×
482

483
        if (limit > 0 && static_cast<int>(collected_events.size()) > limit) {
×
484
            collected_events.resize(static_cast<std::size_t>(limit));
×
485
            truncated = true;
486
        }
487
    }
×
488

489
    // Apply normalization to event timestamps.
490
    if (normalize && global_min > 0) {
6!
491
        for (auto& event : collected_events) {
155✔
492
            event = normalize_event_ts(event, global_min);
150!
493
        }
150✔
494
    }
5✔
495

496
    // Use the original (normalized) begin/end for metadata.
497
    double meta_begin = original_begin;
6✔
498
    double meta_end = original_end;
6✔
499

500
    // Build response matching the Chrome Trace Event Format.
501
    // Pre-compute size to avoid repeated reallocations.
502
    std::size_t body_size = 256;
6✔
503
    for (const auto& ev : collected_events) body_size += ev.size() + 1;
156✔
504
    std::string body;
6✔
505
    body.reserve(body_size);
6!
506
    body += "{\"events\":[";
6!
507
    for (std::size_t i = 0; i < collected_events.size(); ++i) {
156✔
508
        if (i > 0) body += ',';
150!
509
        body += collected_events[i];  // Already JSON
150!
510
    }
150✔
511
    body += "],\"metadata\":{\"begin\":";
6!
512
    body += std::to_string(meta_begin);
6!
513
    body += ",\"end\":";
6!
514
    body += std::to_string(meta_end);
6!
515
    body += ",\"count\":";
6!
516
    body += std::to_string(collected_events.size());
6!
517
    body += ",\"limit\":";
6!
518
    body += std::to_string(limit);
6!
519
    body += ",\"truncated\":";
6!
520
    body += truncated ? "true" : "false";
6!
521
    body += ",\"ts_normalized\":";
6!
522
    body += (normalize && global_min > 0) ? "true" : "false";
6!
523
    body += ",\"global_min_timestamp_us\":";
6!
524
    body += std::to_string(index.global_min_timestamp_us());
6!
525
    body += "}}";
6!
526

527
    co_return HttpResponse::ok(body);
6!
528
}
66!
529

530
void register_viz_api(Router& router, TraceIndex& index) {
4✔
531
    auto* index_ptr = &index;
4✔
532

533
    router.get(
4!
534
        "/api/v1/viz/events",
2✔
535
        [index_ptr](const HttpRequest& req,
58!
536
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
7!
537
            co_return co_await handle_viz_events(req, params, *index_ptr);
35!
538
        });
28!
539
}
4✔
540

541
}  // namespace dftracer::utils::server
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc