• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28423703495

30 Jun 2026 05:59AM UTC coverage: 51.998% (-0.3%) from 52.278%
28423703495

Pull #83

github

web-flow
Merge fb542a938 into 2efed6649
Pull Request #83: refactor and improve code QoL

37282 of 93303 branches covered (39.96%)

Branch coverage included in aggregate %.

801 of 1525 new or added lines in 78 files covered. (52.52%)

98 existing lines in 37 files now uncovered.

33674 of 43157 relevant lines covered (78.03%)

20306.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.83
/src/dftracer/utils/server/viz_api.cpp
1
#include <dftracer/utils/core/common/logging.h>
2
#include <dftracer/utils/core/common/to_chars.h>
3
#include <dftracer/utils/core/coro/channel.h>
4
#include <dftracer/utils/core/pipeline/executor.h>
5
#include <dftracer/utils/core/tasks/coro_scope.h>
6
#include <dftracer/utils/server/http_request.h>
7
#include <dftracer/utils/server/http_response.h>
8
#include <dftracer/utils/server/router.h>
9
#include <dftracer/utils/server/trace_index.h>
10
#include <dftracer/utils/server/viz_api.h>
11
#include <dftracer/utils/utilities/common/json/json_doc_guard.h>
12
#include <dftracer/utils/utilities/common/json/json_value.h>
13
#include <dftracer/utils/utilities/common/query/query.h>
14
#include <dftracer/utils/utilities/composites/dft/views/view_builder_utility.h>
15
#include <dftracer/utils/utilities/composites/dft/views/view_definition.h>
16
#include <dftracer/utils/utilities/composites/dft/views/view_reader_utility.h>
17
#include <dftracer/utils/utilities/fileio/lines/sources/async_streaming_gz_line_generator.h>
18
#include <simdjson.h>
19

20
#include <atomic>
21
#include <cstddef>
22
#include <cstdint>
23
#include <limits>
24
#include <memory>
25
#include <mutex>
26
#include <string>
27
#include <unordered_map>
28
#include <unordered_set>
29
#include <vector>
30

31
namespace dftracer::utils::server {
32

33
using namespace dftracer::utils::utilities::composites::dft;
34
using namespace dftracer::utils::utilities::composites::dft::views;
35

36
using dftracer::utils::utilities::common::json::JsonDocGuard;
37
using dftracer::utils::utilities::common::json::JsonValue;
38
using dftracer::utils::utilities::common::query::Query;
39

40
static const std::unordered_set<std::string> HASH_METADATA_NAMES = {"FH", "HH",
257!
41
                                                                    "SH"};
257!
42

43
/// Normalize the "ts" field in a Chrome Trace Event JSON string by
44
/// subtracting an offset.  Returns the modified JSON.  Falls back to
45
/// the original string on parse failure.
46
static std::string normalize_event_ts(const std::string& event_json,
300✔
47
                                      std::uint64_t offset) {
48
    thread_local simdjson::dom::parser tl_parser;
300✔
49
    auto result = tl_parser.parse(event_json);
300✔
50
    if (result.error()) return event_json;
300!
51

52
    auto root = result.value_unsafe();
300✔
53
    if (!root.is_object()) return event_json;
300!
54

55
    auto ts_result = root["ts"];
300✔
56
    if (ts_result.error()) return event_json;
300!
57

58
    std::uint64_t old_ts = 0;
300✔
59
    if (ts_result.is_uint64()) {
300!
60
        old_ts = ts_result.get_uint64().value_unsafe();
300✔
61
    } else if (ts_result.is_int64()) {
150!
62
        auto val = ts_result.get_int64().value_unsafe();
×
63
        old_ts = val >= 0 ? static_cast<std::uint64_t>(val) : 0;
×
64
    } else {
65
        return event_json;
×
66
    }
67

68
    std::uint64_t new_ts = old_ts >= offset ? old_ts - offset : 0;
300!
69

70
    // simdjson DOM is read-only, so we need to rebuild the JSON with the new ts
71
    // Find "ts": and replace the value
72
    std::string modified = event_json;
300!
73
    auto pos = modified.find("\"ts\":");
300✔
74
    if (pos == std::string::npos) return event_json;
300!
75

76
    pos += 5;  // Skip past "ts":
300✔
77
    while (pos < modified.size() && std::isspace(modified[pos])) ++pos;
300!
78

79
    auto end_pos = pos;
300✔
80
    while (end_pos < modified.size() &&
5,100✔
81
           (std::isdigit(modified[end_pos]) || modified[end_pos] == '-')) {
3,300!
82
        ++end_pos;
3,000✔
83
    }
84

85
    modified.replace(pos, end_pos - pos, std::to_string(new_ts));
300!
86
    return modified;
300✔
87
}
300✔
88

89
/// Compute the minimum event duration threshold for a given summary level.
90
/// Level 1 = full detail, higher levels filter shorter events.
91
static double duration_threshold(double begin, double end, unsigned level,
12✔
92
                                 unsigned viewport_width = 1920) {
93
    if (level <= 1) return 0.0;
12!
94
    double range = end - begin;
×
95
    return range /
96
           (static_cast<double>(viewport_width) * static_cast<double>(level));
×
97
}
6✔
98

99
static std::string extract_json_value(simdjson::dom::element val) {
6✔
100
    if (val.is_string()) {
6✔
101
        return std::string(val.get_string().value_unsafe());
3!
102
    }
103
    if (val.is_int64()) {
4!
104
        return std::to_string(val.get_int64().value_unsafe());
6!
105
    }
106
    if (val.is_uint64()) {
×
107
        return std::to_string(val.get_uint64().value_unsafe());
×
108
    }
109
    return {};
×
110
}
3✔
111

112
static void append_lane_clause(std::string& dsl, const char* field,
2✔
113
                               const std::string& val) {
114
    if (!dsl.empty()) dsl += " and ";
2✔
115
    bool numeric =
1✔
116
        !val.empty() && std::all_of(val.begin(), val.end(),
2!
117
                                    [](char c) { return std::isdigit(c); });
2✔
118
    if (numeric) {
2!
119
        dsl += std::string(field) + " == " + val;
2!
120
    } else {
1✔
121
        dsl += std::string(field) + " == \"" + val + "\"";
×
122
    }
123
}
2✔
124

125
static void apply_lanes(std::string& dsl, std::string_view lanes_str) {
12✔
126
    if (lanes_str.empty()) return;
12✔
127

128
    thread_local simdjson::dom::parser tl_parser;
2✔
129
    auto result = tl_parser.parse(lanes_str.data(), lanes_str.size());
2✔
130
    if (result.error()) return;
2!
131

132
    auto root = result.value_unsafe();
2✔
133

134
    if (root.is_array()) {
2!
135
        auto arr = root.get_array().value_unsafe();
2✔
136
        for (auto item : arr) {
4✔
137
            if (!item.is_object()) continue;
2✔
138
            auto obj = item.get_object().value_unsafe();
2✔
139

140
            auto field_result = obj["field"];
2✔
141
            if (field_result.error()) field_result = obj["fields"];
2✔
142
            auto value_result = obj["value"];
2✔
143
            if (field_result.error() || value_result.error()) continue;
2!
144

145
            if (!field_result.value_unsafe().is_string()) continue;
2✔
146
            const char* field =
1✔
147
                field_result.value_unsafe().get_c_str().value_unsafe();
2✔
148
            auto val = extract_json_value(value_result.value_unsafe());
2!
149
            if (!val.empty()) append_lane_clause(dsl, field, val);
2✔
150
        }
2✔
151
    } else if (root.is_object()) {
1!
152
        auto obj = root.get_object().value_unsafe();
×
153

154
        auto field_result = obj["field"];
×
155
        if (field_result.error()) field_result = obj["fields"];
×
156
        auto value_result = obj["value"];
×
157

158
        if (!field_result.error() && !value_result.error()) {
×
159
            if (field_result.value_unsafe().is_string()) {
×
160
                const char* field =
161
                    field_result.value_unsafe().get_c_str().value_unsafe();
×
162
                auto val = extract_json_value(value_result.value_unsafe());
×
163
                if (!val.empty()) append_lane_clause(dsl, field, val);
×
164
            }
×
165
        }
166
    }
167
}
6✔
168

169
static void apply_filters(std::string& dsl, std::string_view filters_str) {
12✔
170
    if (filters_str.empty()) return;
12✔
171

172
    thread_local simdjson::dom::parser tl_parser;
4✔
173
    auto result = tl_parser.parse(filters_str.data(), filters_str.size());
4✔
174
    if (result.error()) return;
4!
175

176
    auto root = result.value_unsafe();
4✔
177
    if (!root.is_array()) return;
4✔
178

179
    auto arr = root.get_array().value_unsafe();
4✔
180
    for (auto item : arr) {
8✔
181
        if (!item.is_object()) continue;
4✔
182
        auto obj = item.get_object().value_unsafe();
4✔
183

184
        auto field_result = obj["field"];
4✔
185
        auto op_result = obj["op"];
4✔
186
        auto value_result = obj["value"];
4✔
187
        if (field_result.error() || op_result.error() || value_result.error())
4!
188
            continue;
×
189

190
        if (!field_result.value_unsafe().is_string() ||
6!
191
            !op_result.value_unsafe().is_string())
4!
192
            continue;
×
193

194
        const char* field =
2✔
195
            field_result.value_unsafe().get_c_str().value_unsafe();
4✔
196
        const char* op = op_result.value_unsafe().get_c_str().value_unsafe();
4✔
197

198
        std::string val = extract_json_value(value_result.value_unsafe());
4!
199
        if (val.empty()) continue;
4!
200

201
        std::string op_str(op);
4✔
202
        std::string field_str(field);
4✔
203
        if (field_str == "begin") field_str = "ts";
4!
204
        if (field_str == "end") field_str = "ts";
4!
205
        if (field_str == "duration") field_str = "dur";
4!
206

207
        std::string query_op;
4✔
208
        if (op_str == "=")
4✔
209
            query_op = "==";
2!
210
        else if (op_str == ">=")
2!
211
            query_op = ">=";
2!
212
        else if (op_str == "<=")
×
213
            query_op = "<=";
×
214
        else if (op_str == ">")
×
215
            query_op = ">";
×
216
        else if (op_str == "<")
×
217
            query_op = "<";
×
218
        else
219
            continue;
×
220

221
        if (!dsl.empty()) dsl += " and ";
4!
222
        bool numeric = !val.empty() && (std::isdigit(val[0]) || val[0] == '-');
6!
223
        if (numeric || query_op != "==") {
4!
224
            dsl += field_str + " " + query_op + " " + val;
4!
225
        } else {
2✔
226
            dsl += field_str + " " + query_op + " \"" + val + "\"";
×
227
        }
228
    }
4!
229
}
6✔
230

231
// --- GET /api/v1/viz/events ---
232
// Build the query view (time range + lane/filter/pid/tid/cat predicates) for a
233
// viz request from the parsed parameters.
234
static ViewDefinition build_viz_view(const QueryParams& params, double begin,
12✔
235
                                     double end, double min_dur) {
236
    ViewDefinition view;
12✔
237
    view.name = "viz_query";
12!
238
    view.description = "Visualization query";
12!
239

240
    // Reserve once and append in place: numbers go through to_chars into a
241
    // stack buffer (no per-number heap allocation, unlike std::to_string), and
242
    // literals/string_views are appended directly (no temporary
243
    // concatenations).
244
    std::string dsl;
12✔
245
    dsl.reserve(128);
12!
246
    char numbuf[20];  // max digits of a uint64_t
247
    auto append_u64 = [&](std::uint64_t v) {
30✔
248
        dsl.append(numbuf, to_chars_u64(numbuf, numbuf + sizeof(numbuf), v));
24✔
249
    };
30✔
250

251
    dsl += "ts >= ";
12!
252
    append_u64(static_cast<std::uint64_t>(begin));
12!
253
    dsl += " and ts <= ";
12!
254
    append_u64(static_cast<std::uint64_t>(end));
12!
255
    if (min_dur > 0) {
12!
NEW
256
        dsl += " and dur >= ";
×
NEW
257
        append_u64(static_cast<std::uint64_t>(min_dur));
×
258
    }
259

260
    apply_lanes(dsl, params.get("lanes"));
12!
261
    apply_filters(dsl, params.get("filters"));
12!
262

263
    auto pid = params.get("pid");
12!
264
    if (!pid.empty()) {
12!
NEW
265
        dsl += " and pid == ";
×
NEW
266
        dsl += pid;
×
267
    }
268

269
    auto tid = params.get("tid");
12!
270
    if (!tid.empty()) {
12!
NEW
271
        dsl += " and tid == ";
×
NEW
272
        dsl += tid;
×
273
    }
274

275
    auto cat = params.get("cat");
12!
276
    if (!cat.empty()) {
12!
NEW
277
        dsl += " and cat == \"";
×
NEW
278
        dsl += cat;
×
NEW
279
        dsl += '"';
×
280
    }
281

282
    view.with_query(dsl);
12!
283
    return view;
18✔
284
}
12!
285

286
// Select the files to scan: the explicit ?file= or all indexed files, then drop
287
// files whose cached time bounds don't overlap [begin, end]. Pure/synchronous.
288
static std::vector<const TraceIndex::FileInfo*> select_viz_target_files(
12✔
289
    TraceIndex& index, const QueryParams& params, double begin, double end) {
290
    std::vector<const TraceIndex::FileInfo*> target_files;
12✔
291
    auto file_param = params.get("file");
12!
292
    if (!file_param.empty()) {
12!
UNCOV
293
        auto* f = index.find_file(std::string(file_param));
×
UNCOV
294
        if (f) target_files.push_back(f);
×
295
    } else {
296
        for (const auto& f : index.files()) {
24✔
297
            target_files.push_back(&f);
12!
298
        }
299
    }
300

301
    if (begin > 0 || end > 0) {
12!
302
        std::vector<const TraceIndex::FileInfo*> filtered;
12✔
303
        filtered.reserve(target_files.size());
12!
304
        for (auto* fi : target_files) {
24✔
305
            if (fi->min_timestamp_us == 0 && fi->max_timestamp_us == 0) {
12!
UNCOV
306
                filtered.push_back(fi);
×
UNCOV
307
                continue;
×
308
            }
309
            double fi_min = static_cast<double>(fi->min_timestamp_us);
12✔
310
            double fi_max = static_cast<double>(fi->max_timestamp_us);
12✔
311
            if (fi_max < begin || fi_min > end) continue;
12!
312
            filtered.push_back(fi);
10!
313
        }
314
        target_files = std::move(filtered);
12✔
315
    }
12✔
316
    return target_files;
18✔
317
}
6!
318

319
// Normalize event timestamps (when global_min > 0) and serialize the collected
320
// events plus metadata into the Chrome Trace Event Format body. `global_min` is
321
// the de-normalization base (already 0 unless normalization is active);
322
// `display_global_min` is the value reported in the metadata. Pure/synchronous.
323
static std::string build_viz_events_body(std::vector<std::string>& events,
12✔
324
                                         std::uint64_t global_min,
325
                                         double meta_begin, double meta_end,
326
                                         int limit, bool truncated,
327
                                         std::uint64_t display_global_min) {
328
    if (global_min > 0) {
12✔
329
        for (auto& event : events) {
310✔
330
            event = normalize_event_ts(event, global_min);
300!
331
        }
332
    }
5✔
333

334
    // Pre-compute size to avoid repeated reallocations.
335
    std::size_t body_size = 256;
12✔
336
    for (const auto& ev : events) body_size += ev.size() + 1;
312✔
337
    std::string body;
12✔
338
    body.reserve(body_size);
12!
339
    body += "{\"events\":[";
12!
340
    for (std::size_t i = 0; i < events.size(); ++i) {
312✔
341
        if (i > 0) body += ',';
300!
342
        body += events[i];  // Already JSON
300!
343
    }
150✔
344
    body += "],\"metadata\":{\"begin\":";
12!
345
    body += std::to_string(meta_begin);
12!
346
    body += ",\"end\":";
12!
347
    body += std::to_string(meta_end);
12!
348
    body += ",\"count\":";
12!
349
    body += std::to_string(events.size());
12!
350
    body += ",\"limit\":";
12!
351
    body += std::to_string(limit);
12!
352
    body += ",\"truncated\":";
12!
353
    body += truncated ? "true" : "false";
12!
354
    body += ",\"ts_normalized\":";
12!
355
    body += (global_min > 0) ? "true" : "false";
12!
356
    body += ",\"global_min_timestamp_us\":";
12!
357
    body += std::to_string(display_global_min);
12!
358
    body += "}}";
12!
359
    return body;
12✔
360
}
6!
361

362
static coro::CoroTask<HttpResponse> handle_viz_events(
66!
363
    const HttpRequest& /*req*/, const QueryParams& params, TraceIndex& index) {
7!
364
    // Required: begin, end, summary
365
    if (!params.has("begin") || !params.has("end") || !params.has("summary")) {
7!
366
        co_return HttpResponse::bad_request(
8!
367
            "Missing required parameters: begin, end, summary");
1!
368
    }
369

370
    double begin = params.get_double("begin", 0);
6!
371
    double end = params.get_double("end", 0);
6!
372
    int summary = params.get_int("summary", 1);
6!
373
    if (summary < 1) summary = 1;
6!
374

375
    // Timestamp normalization: default ON, opt-out with ?ts_normalize=0
376
    auto ts_norm_param = params.get("ts_normalize");
6!
377
    bool normalize = ts_norm_param.empty() || ts_norm_param != "0";
6!
378

379
    std::uint64_t global_min = 0;
6✔
380
    if (normalize) {
6✔
381
        global_min = index.global_min_timestamp_us();
5!
382
        if (global_min == std::numeric_limits<std::uint64_t>::max()) {
5!
383
            global_min = 0;  // No valid bounds, skip normalization
384
        }
385
    }
5✔
386

387
    // When normalization is active the user sends normalized
388
    // begin/end values (relative to global_min).  De-normalize them
389
    // so the predicate filters against absolute timestamps.
390
    double original_begin = begin;
6✔
391
    double original_end = end;
6✔
392
    if (normalize && global_min > 0) {
6!
393
        begin += static_cast<double>(global_min);
5✔
394
        end += static_cast<double>(global_min);
5✔
395
    }
5✔
396

397
    double min_dur =
6✔
398
        duration_threshold(begin, end, static_cast<unsigned>(summary));
6!
399

400
    ViewDefinition view = build_viz_view(params, begin, end, min_dur);
6!
401

402
    // Optional limit: 0 (default) means no limit.
403
    int limit = params.get_int("limit", 0);
6!
404
    if (limit < 0) limit = 0;
6!
405

406
    std::vector<const TraceIndex::FileInfo*> target_files =
6✔
407
        select_viz_target_files(index, params, begin, end);
6!
408

409
    std::vector<std::string> collected_events;
6✔
410

411
    bool truncated = false;
6✔
412

413
    if (target_files.size() <= 1) {
6!
414
        for (auto* file_info : target_files) {
23✔
415
            if (limit > 0 &&
5!
416
                static_cast<int>(collected_events.size()) >= limit) {
417
                truncated = true;
418
                break;
419
            }
420
            if (file_info->uncompressed_size == 0 &&
5!
421
                file_info->num_checkpoints == 0)
422
                continue;
423

424
            ViewBuilderInput builder_input;
5✔
425
            builder_input.with_view(view)
10!
426
                .with_file_path(file_info->path)
5!
427
                .with_index_path(
5!
428
                    file_info->has_bloom_data ? file_info->index_path : "")
5!
429
                .with_uncompressed_size(file_info->uncompressed_size)
5!
430
                .with_num_checkpoints(file_info->num_checkpoints)
5!
431
                .with_bloom_cache(&index.bloom_cache())
5!
432
                .with_time_range(begin, end);
5!
433

434
            ViewBuilderUtility builder;
5!
435
            auto build_output = co_await builder.process(builder_input);
10!
436
            if (!build_output.success || !build_output.file_may_match) continue;
5!
437

438
            for (const auto& candidate : build_output.candidates) {
18✔
439
                if (limit > 0 &&
3!
440
                    static_cast<int>(collected_events.size()) >= limit) {
441
                    truncated = true;
442
                    break;
443
                }
444
                ViewReaderInput reader_input;
3✔
445
                reader_input.with_file_path(file_info->path)
3!
446
                    .with_index_path(file_info->index_path)
3!
447
                    .with_byte_range(candidate.start_byte, candidate.end_byte)
3!
448
                    .with_checkpoint_idx(candidate.checkpoint_idx)
3!
449
                    .with_view(view);
3!
450

451
                ViewReaderUtility reader;
3!
452
                auto gen = reader.process(reader_input);
3!
453
                while (auto batch = co_await gen.next()) {
24!
454
                    for (auto& event : batch->events) {
153✔
455
                        if (limit > 0 &&
150!
456
                            static_cast<int>(collected_events.size()) >=
457
                                limit) {
458
                            truncated = true;
459
                            break;
460
                        }
461
                        collected_events.emplace_back(event);
150!
462
                    }
150!
463
                    if (truncated) break;
3!
464
                }
6✔
465
            }
15✔
466
        }
17✔
467
    } else {
6✔
468
        std::size_t num_workers =
469
            std::min(index.max_concurrent(), target_files.size());
×
470
        auto* executor = Executor::current();
471

472
        auto file_chan = coro::make_channel<std::size_t>(num_workers * 2);
×
473
        auto collected_mutex = std::make_shared<std::mutex>();
×
474
        auto remaining = std::make_shared<std::atomic<int>>(
×
475
            limit > 0 ? limit : std::numeric_limits<int>::max());
×
476

477
        auto* target_files_ptr = &target_files;
478
        auto* collected_ptr = &collected_events;
479
        auto* view_ptr = &view;
480
        auto* bloom_cache_ptr = &index.bloom_cache();
×
481
        double t_begin = begin;
482
        double t_end = end;
483

484
        CoroScope scope(executor);
×
485

486
        scope.spawn([ch = file_chan->producer(), target_files_ptr](
×
487
                        CoroScope&) mutable -> coro::CoroTask<void> {
×
488
            auto guard = ch.guard();
×
489
            for (std::size_t i = 0; i < target_files_ptr->size(); ++i) {
×
490
                if (!co_await ch.send(i)) co_return;
×
491
            }
492
            co_return;
493
        });
×
494

495
        for (std::size_t w = 0; w < num_workers; ++w) {
×
496
            scope.spawn([file_chan, target_files_ptr, collected_mutex,
×
497
                         collected_ptr, view_ptr, bloom_cache_ptr, remaining,
498
                         t_begin, t_end](CoroScope&) -> coro::CoroTask<void> {
×
499
                while (auto fi_opt = co_await file_chan->receive()) {
×
500
                    if (remaining->load(std::memory_order_relaxed) <= 0)
×
501
                        co_return;
502
                    auto* file_info = (*target_files_ptr)[*fi_opt];
503

504
                    if (file_info->uncompressed_size == 0 &&
×
505
                        file_info->num_checkpoints == 0)
506
                        continue;
507

508
                    ViewBuilderInput builder_input;
509
                    builder_input.with_view(*view_ptr)
×
510
                        .with_file_path(file_info->path)
×
511
                        .with_index_path(file_info->has_bloom_data
×
512
                                             ? file_info->index_path
×
513
                                             : "")
×
514
                        .with_uncompressed_size(file_info->uncompressed_size)
×
515
                        .with_num_checkpoints(file_info->num_checkpoints)
×
516
                        .with_bloom_cache(bloom_cache_ptr)
×
517
                        .with_time_range(t_begin, t_end);
×
518

519
                    ViewBuilderUtility builder;
×
520
                    auto build_output = co_await builder.process(builder_input);
×
521
                    if (!build_output.success || !build_output.file_may_match)
×
522
                        continue;
523

524
                    for (const auto& candidate : build_output.candidates) {
×
525
                        if (remaining->load(std::memory_order_relaxed) <= 0)
×
526
                            break;
527

528
                        ViewReaderInput reader_input;
529
                        reader_input.with_file_path(file_info->path)
×
530
                            .with_index_path(file_info->index_path)
×
531
                            .with_byte_range(candidate.start_byte,
×
532
                                             candidate.end_byte)
533
                            .with_checkpoint_idx(candidate.checkpoint_idx)
×
534
                            .with_view(*view_ptr);
×
535

536
                        ViewReaderUtility reader;
×
537
                        auto gen = reader.process(reader_input);
×
538
                        while (auto batch = co_await gen.next()) {
×
539
                            if (!batch->events.empty()) {
×
540
                                std::lock_guard<std::mutex> lock(
×
541
                                    *collected_mutex);
542
                                for (auto& event : batch->events) {
×
543
                                    collected_ptr->emplace_back(event);
×
544
                                }
545
                                remaining->fetch_sub(
546
                                    static_cast<int>(batch->events.size()));
547
                            }
548
                        }
×
549
                    }
×
550
                }
×
551
                co_return;
552
            });
×
553
        }
554

555
        co_await scope.join();
×
556

557
        if (limit > 0 && static_cast<int>(collected_events.size()) > limit) {
×
558
            collected_events.resize(static_cast<std::size_t>(limit));
×
559
            truncated = true;
560
        }
561
    }
×
562

563
    std::string body = build_viz_events_body(
6!
564
        collected_events, global_min, original_begin, original_end, limit,
6✔
565
        truncated, index.global_min_timestamp_us());
6!
566
    co_return HttpResponse::ok(body);
6!
567
}
66!
568

569
void register_viz_api(Router& router, TraceIndex& index) {
4✔
570
    auto* index_ptr = &index;
4✔
571

572
    router.get(
4!
573
        "/api/v1/viz/events",
2✔
574
        [index_ptr](const HttpRequest& req,
58!
575
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
7!
576
            co_return co_await handle_viz_events(req, params, *index_ptr);
35!
577
        });
28!
578
}
4✔
579

580
}  // namespace dftracer::utils::server
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc