• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28521653886

01 Jul 2026 01:36PM UTC coverage: 50.92% (-1.4%) from 52.278%
28521653886

Pull #83

github

web-flow
Merge 9bdedb1e9 into 2efed6649
Pull Request #83: refactor and improve code QoL

31893 of 80049 branches covered (39.84%)

Branch coverage included in aggregate %.

789 of 1613 new or added lines in 87 files covered. (48.92%)

5007 existing lines in 181 files now uncovered.

32812 of 47024 relevant lines covered (69.78%)

9905.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.21
/src/dftracer/utils/server/viz_api.cpp
1
#include <dftracer/utils/core/common/logging.h>
2
#include <dftracer/utils/core/common/to_chars.h>
3
#include <dftracer/utils/core/coro/channel.h>
4
#include <dftracer/utils/core/pipeline/executor.h>
5
#include <dftracer/utils/core/tasks/coro_scope.h>
6
#include <dftracer/utils/server/http_request.h>
7
#include <dftracer/utils/server/http_response.h>
8
#include <dftracer/utils/server/router.h>
9
#include <dftracer/utils/server/trace_index.h>
10
#include <dftracer/utils/server/viz_api.h>
11
#include <dftracer/utils/utilities/common/json/json_doc_guard.h>
12
#include <dftracer/utils/utilities/common/json/json_value.h>
13
#include <dftracer/utils/utilities/common/query/query.h>
14
#include <dftracer/utils/utilities/composites/dft/views/view_builder_utility.h>
15
#include <dftracer/utils/utilities/composites/dft/views/view_definition.h>
16
#include <dftracer/utils/utilities/composites/dft/views/view_reader_utility.h>
17
#include <dftracer/utils/utilities/fileio/lines/sources/async_streaming_gz_line_generator.h>
18
#include <simdjson.h>
19

20
#include <atomic>
21
#include <cstddef>
22
#include <cstdint>
23
#include <limits>
24
#include <memory>
25
#include <mutex>
26
#include <string>
27
#include <unordered_map>
28
#include <unordered_set>
29
#include <vector>
30

31
namespace dftracer::utils::server {
32

33
using namespace dftracer::utils::utilities::composites::dft;
34
using namespace dftracer::utils::utilities::composites::dft::views;
35

36
using dftracer::utils::utilities::common::json::JsonDocGuard;
37
using dftracer::utils::utilities::common::json::JsonValue;
38
using dftracer::utils::utilities::common::query::Query;
39

40
static const std::unordered_set<std::string> HASH_METADATA_NAMES = {"FH", "HH",
256!
41
                                                                    "SH"};
256!
42

43
/// Normalize the "ts" field in a Chrome Trace Event JSON string by
44
/// subtracting an offset.  Returns the modified JSON.  Falls back to
45
/// the original string on parse failure.
46
static std::string normalize_event_ts(const std::string& event_json,
150✔
47
                                      std::uint64_t offset) {
48
    thread_local simdjson::dom::parser tl_parser;
150✔
49
    auto result = tl_parser.parse(event_json);
150✔
50
    if (result.error()) return event_json;
150!
51

52
    auto root = result.value_unsafe();
150✔
53
    if (!root.is_object()) return event_json;
150!
54

55
    auto ts_result = root["ts"];
150✔
56
    if (ts_result.error()) return event_json;
150!
57

58
    std::uint64_t old_ts = 0;
150✔
59
    if (ts_result.is_uint64()) {
150!
60
        old_ts = ts_result.get_uint64().value_unsafe();
150✔
61
    } else if (ts_result.is_int64()) {
150!
62
        auto val = ts_result.get_int64().value_unsafe();
×
63
        old_ts = val >= 0 ? static_cast<std::uint64_t>(val) : 0;
×
UNCOV
64
    } else {
×
65
        return event_json;
×
66
    }
67

68
    std::uint64_t new_ts = old_ts >= offset ? old_ts - offset : 0;
150!
69

70
    // simdjson DOM is read-only, so we need to rebuild the JSON with the new ts
71
    // Find "ts": and replace the value
72
    std::string modified = event_json;
150✔
73
    auto pos = modified.find("\"ts\":");
150✔
74
    if (pos == std::string::npos) return event_json;
150!
75

76
    pos += 5;  // Skip past "ts":
150✔
77
    while (pos < modified.size() && std::isspace(modified[pos])) ++pos;
150!
78

79
    auto end_pos = pos;
150✔
80
    while (end_pos < modified.size() &&
1,800!
81
           (std::isdigit(modified[end_pos]) || modified[end_pos] == '-')) {
1,650!
82
        ++end_pos;
1,500✔
83
    }
84

85
    modified.replace(pos, end_pos - pos, std::to_string(new_ts));
150!
86
    return modified;
150✔
87
}
150✔
88

89
/// Compute the minimum event duration threshold for a given summary level.
90
/// Level 1 = full detail, higher levels filter shorter events.
91
static double duration_threshold(double begin, double end, unsigned level,
6✔
92
                                 unsigned viewport_width = 1920) {
93
    if (level <= 1) return 0.0;
6!
94
    double range = end - begin;
×
UNCOV
95
    return range /
×
96
           (static_cast<double>(viewport_width) * static_cast<double>(level));
×
97
}
6✔
98

99
static std::string extract_json_value(simdjson::dom::element val) {
3✔
100
    if (val.is_string()) {
3✔
101
        return std::string(val.get_string().value_unsafe());
1✔
102
    }
103
    if (val.is_int64()) {
2!
104
        return std::to_string(val.get_int64().value_unsafe());
2✔
105
    }
106
    if (val.is_uint64()) {
×
107
        return std::to_string(val.get_uint64().value_unsafe());
×
108
    }
109
    return {};
×
110
}
3✔
111

112
static void append_lane_clause(std::string& dsl, const char* field,
1✔
113
                               const std::string& val) {
114
    if (!dsl.empty()) dsl += " and ";
1!
115
    bool numeric =
1✔
116
        !val.empty() && std::all_of(val.begin(), val.end(),
1!
117
                                    [](char c) { return std::isdigit(c); });
1✔
118
    if (numeric) {
1!
119
        dsl += std::string(field) + " == " + val;
1!
120
    } else {
1✔
121
        dsl += std::string(field) + " == \"" + val + "\"";
×
122
    }
123
}
1✔
124

125
static void apply_lanes(std::string& dsl, std::string_view lanes_str) {
6✔
126
    if (lanes_str.empty()) return;
6✔
127

128
    thread_local simdjson::dom::parser tl_parser;
1!
129
    auto result = tl_parser.parse(lanes_str.data(), lanes_str.size());
1✔
130
    if (result.error()) return;
1!
131

132
    auto root = result.value_unsafe();
1✔
133

134
    if (root.is_array()) {
1!
135
        auto arr = root.get_array().value_unsafe();
1✔
136
        for (auto item : arr) {
2✔
137
            if (!item.is_object()) continue;
1!
138
            auto obj = item.get_object().value_unsafe();
1✔
139

140
            auto field_result = obj["field"];
1✔
141
            if (field_result.error()) field_result = obj["fields"];
1!
142
            auto value_result = obj["value"];
1✔
143
            if (field_result.error() || value_result.error()) continue;
1!
144

145
            if (!field_result.value_unsafe().is_string()) continue;
1!
146
            const char* field =
1✔
147
                field_result.value_unsafe().get_c_str().value_unsafe();
1✔
148
            auto val = extract_json_value(value_result.value_unsafe());
1✔
149
            if (!val.empty()) append_lane_clause(dsl, field, val);
1!
150
        }
1✔
151
    } else if (root.is_object()) {
1!
152
        auto obj = root.get_object().value_unsafe();
×
153

154
        auto field_result = obj["field"];
×
155
        if (field_result.error()) field_result = obj["fields"];
×
156
        auto value_result = obj["value"];
×
157

158
        if (!field_result.error() && !value_result.error()) {
×
159
            if (field_result.value_unsafe().is_string()) {
×
UNCOV
160
                const char* field =
×
161
                    field_result.value_unsafe().get_c_str().value_unsafe();
×
162
                auto val = extract_json_value(value_result.value_unsafe());
×
163
                if (!val.empty()) append_lane_clause(dsl, field, val);
×
164
            }
×
UNCOV
165
        }
×
UNCOV
166
    }
×
167
}
6✔
168

169
static void apply_filters(std::string& dsl, std::string_view filters_str) {
6✔
170
    if (filters_str.empty()) return;
6✔
171

172
    thread_local simdjson::dom::parser tl_parser;
2✔
173
    auto result = tl_parser.parse(filters_str.data(), filters_str.size());
2✔
174
    if (result.error()) return;
2!
175

176
    auto root = result.value_unsafe();
2✔
177
    if (!root.is_array()) return;
2!
178

179
    auto arr = root.get_array().value_unsafe();
2✔
180
    for (auto item : arr) {
4✔
181
        if (!item.is_object()) continue;
2!
182
        auto obj = item.get_object().value_unsafe();
2✔
183

184
        auto field_result = obj["field"];
2✔
185
        auto op_result = obj["op"];
2✔
186
        auto value_result = obj["value"];
2✔
187
        if (field_result.error() || op_result.error() || value_result.error())
2!
188
            continue;
×
189

190
        if (!field_result.value_unsafe().is_string() ||
2!
191
            !op_result.value_unsafe().is_string())
2✔
192
            continue;
×
193

194
        const char* field =
2✔
195
            field_result.value_unsafe().get_c_str().value_unsafe();
2✔
196
        const char* op = op_result.value_unsafe().get_c_str().value_unsafe();
2✔
197

198
        std::string val = extract_json_value(value_result.value_unsafe());
2✔
199
        if (val.empty()) continue;
2!
200

201
        std::string op_str(op);
2!
202
        std::string field_str(field);
2!
203
        if (field_str == "begin") field_str = "ts";
2!
204
        if (field_str == "end") field_str = "ts";
2!
205
        if (field_str == "duration") field_str = "dur";
2!
206

207
        std::string query_op;
2✔
208
        if (op_str == "=")
2✔
209
            query_op = "==";
1!
210
        else if (op_str == ">=")
1!
211
            query_op = ">=";
1!
212
        else if (op_str == "<=")
×
213
            query_op = "<=";
×
214
        else if (op_str == ">")
×
215
            query_op = ">";
×
216
        else if (op_str == "<")
×
217
            query_op = "<";
×
218
        else
219
            continue;
×
220

221
        if (!dsl.empty()) dsl += " and ";
2!
222
        bool numeric = !val.empty() && (std::isdigit(val[0]) || val[0] == '-');
4!
223
        if (numeric || query_op != "==") {
2!
224
            dsl += field_str + " " + query_op + " " + val;
2!
225
        } else {
2✔
226
            dsl += field_str + " " + query_op + " \"" + val + "\"";
×
227
        }
228
    }
2!
229
}
6✔
230

231
// --- GET /api/v1/viz/events ---
232
// Build the query view (time range + lane/filter/pid/tid/cat predicates) for a
233
// viz request from the parsed parameters.
234
static ViewDefinition build_viz_view(const QueryParams& params, double begin,
6✔
235
                                     double end, double min_dur) {
236
    ViewDefinition view;
6✔
237
    view.name = "viz_query";
6!
238
    view.description = "Visualization query";
6!
239

240
    // Reserve once and append in place: numbers go through to_chars into a
241
    // stack buffer (no per-number heap allocation, unlike std::to_string), and
242
    // literals/string_views are appended directly (no temporary
243
    // concatenations).
244
    std::string dsl;
6✔
245
    dsl.reserve(128);
6!
246
    char numbuf[20];  // max digits of a uint64_t
247
    auto append_u64 = [&](std::uint64_t v) {
18✔
248
        dsl.append(numbuf, to_chars_u64(numbuf, numbuf + sizeof(numbuf), v));
12✔
249
    };
12✔
250

251
    dsl += "ts >= ";
6!
252
    append_u64(static_cast<std::uint64_t>(begin));
6!
253
    dsl += " and ts <= ";
6!
254
    append_u64(static_cast<std::uint64_t>(end));
6!
255
    if (min_dur > 0) {
6!
NEW
256
        dsl += " and dur >= ";
×
NEW
257
        append_u64(static_cast<std::uint64_t>(min_dur));
×
UNCOV
258
    }
×
259

260
    apply_lanes(dsl, params.get("lanes"));
6!
261
    apply_filters(dsl, params.get("filters"));
6!
262

263
    auto pid = params.get("pid");
6!
264
    if (!pid.empty()) {
6!
NEW
265
        dsl += " and pid == ";
×
NEW
266
        dsl += pid;
×
UNCOV
267
    }
×
268

269
    auto tid = params.get("tid");
6!
270
    if (!tid.empty()) {
6!
NEW
271
        dsl += " and tid == ";
×
NEW
272
        dsl += tid;
×
UNCOV
273
    }
×
274

275
    auto cat = params.get("cat");
6!
276
    if (!cat.empty()) {
6!
NEW
277
        dsl += " and cat == \"";
×
NEW
278
        dsl += cat;
×
NEW
279
        dsl += '"';
×
UNCOV
280
    }
×
281

282
    view.with_query(dsl);
6!
283
    return view;
6✔
284
}
6!
285

286
// Select the files to scan: the explicit ?file= or all indexed files, then drop
287
// files whose cached time bounds don't overlap [begin, end]. Pure/synchronous.
288
static std::vector<const TraceIndex::FileInfo*> select_viz_target_files(
6✔
289
    TraceIndex& index, const QueryParams& params, double begin, double end) {
290
    std::vector<const TraceIndex::FileInfo*> target_files;
6✔
291
    auto file_param = params.get("file");
6!
292
    if (!file_param.empty()) {
6!
UNCOV
293
        auto* f = index.find_file(std::string(file_param));
×
UNCOV
294
        if (f) target_files.push_back(f);
×
UNCOV
295
    } else {
×
296
        for (const auto& f : index.files()) {
12!
297
            target_files.push_back(&f);
6!
298
        }
299
    }
300

301
    if (begin > 0 || end > 0) {
6!
302
        std::vector<const TraceIndex::FileInfo*> filtered;
6✔
303
        filtered.reserve(target_files.size());
6!
304
        for (auto* fi : target_files) {
12✔
305
            if (fi->min_timestamp_us == 0 && fi->max_timestamp_us == 0) {
6!
UNCOV
306
                filtered.push_back(fi);
×
UNCOV
307
                continue;
×
308
            }
309
            double fi_min = static_cast<double>(fi->min_timestamp_us);
6✔
310
            double fi_max = static_cast<double>(fi->max_timestamp_us);
6✔
311
            if (fi_max < begin || fi_min > end) continue;
6!
312
            filtered.push_back(fi);
5!
313
        }
314
        target_files = std::move(filtered);
6✔
315
    }
6✔
316
    return target_files;
6✔
317
}
6!
318

319
// Normalize event timestamps (when global_min > 0) and serialize the collected
320
// events plus metadata into the Chrome Trace Event Format body. `global_min` is
321
// the de-normalization base (already 0 unless normalization is active);
322
// `display_global_min` is the value reported in the metadata. Pure/synchronous.
323
static std::string build_viz_events_body(std::vector<std::string>& events,
6✔
324
                                         std::uint64_t global_min,
325
                                         double meta_begin, double meta_end,
326
                                         int limit, bool truncated,
327
                                         std::uint64_t display_global_min) {
328
    if (global_min > 0) {
6✔
329
        for (auto& event : events) {
155✔
330
            event = normalize_event_ts(event, global_min);
150✔
331
        }
332
    }
5✔
333

334
    // Pre-compute size to avoid repeated reallocations.
335
    std::size_t body_size = 256;
6✔
336
    for (const auto& ev : events) body_size += ev.size() + 1;
156✔
337
    std::string body;
6✔
338
    body.reserve(body_size);
6!
339
    body += "{\"events\":[";
6!
340
    for (std::size_t i = 0; i < events.size(); ++i) {
156✔
341
        if (i > 0) body += ',';
150!
342
        body += events[i];  // Already JSON
150!
343
    }
150✔
344
    body += "],\"metadata\":{\"begin\":";
6!
345
    body += std::to_string(meta_begin);
6!
346
    body += ",\"end\":";
6!
347
    body += std::to_string(meta_end);
6!
348
    body += ",\"count\":";
6!
349
    body += std::to_string(events.size());
6!
350
    body += ",\"limit\":";
6!
351
    body += std::to_string(limit);
6!
352
    body += ",\"truncated\":";
6!
353
    body += truncated ? "true" : "false";
6!
354
    body += ",\"ts_normalized\":";
6!
355
    body += (global_min > 0) ? "true" : "false";
6!
356
    body += ",\"global_min_timestamp_us\":";
6!
357
    body += std::to_string(display_global_min);
6!
358
    body += "}}";
6!
359
    return body;
6✔
360
}
6!
361

362
static coro::CoroTask<HttpResponse> handle_viz_events(
59!
363
    const HttpRequest& /*req*/, const QueryParams& params, TraceIndex& index) {
7!
364
    // Required: begin, end, summary
365
    if (!params.has("begin") || !params.has("end") || !params.has("summary")) {
7!
366
        co_return HttpResponse::bad_request(
8!
367
            "Missing required parameters: begin, end, summary");
1!
368
    }
369

370
    double begin = params.get_double("begin", 0);
6!
371
    double end = params.get_double("end", 0);
6!
372
    int summary = params.get_int("summary", 1);
6!
373
    if (summary < 1) summary = 1;
6!
374

375
    // Timestamp normalization: default ON, opt-out with ?ts_normalize=0
376
    auto ts_norm_param = params.get("ts_normalize");
6!
377
    bool normalize = ts_norm_param.empty() || ts_norm_param != "0";
6!
378

379
    std::uint64_t global_min = 0;
6✔
380
    if (normalize) {
6✔
381
        global_min = index.global_min_timestamp_us();
5!
382
        if (global_min == std::numeric_limits<std::uint64_t>::max()) {
5!
NEW
383
            global_min = 0;  // No valid bounds, skip normalization
×
NEW
384
        }
×
385
    }
5✔
386

387
    // When normalization is active the user sends normalized
388
    // begin/end values (relative to global_min).  De-normalize them
389
    // so the predicate filters against absolute timestamps.
390
    double original_begin = begin;
6✔
391
    double original_end = end;
6✔
392
    if (normalize && global_min > 0) {
6!
393
        begin += static_cast<double>(global_min);
5✔
394
        end += static_cast<double>(global_min);
5✔
395
    }
5✔
396

397
    double min_dur =
6✔
398
        duration_threshold(begin, end, static_cast<unsigned>(summary));
6!
399

400
    ViewDefinition view = build_viz_view(params, begin, end, min_dur);
6!
401

402
    // Optional limit: 0 (default) means no limit.
403
    int limit = params.get_int("limit", 0);
6!
404
    if (limit < 0) limit = 0;
6!
405

406
    std::vector<const TraceIndex::FileInfo*> target_files =
6✔
407
        select_viz_target_files(index, params, begin, end);
6!
408

409
    std::vector<std::string> collected_events;
6✔
410

411
    bool truncated = false;
6✔
412

413
    if (target_files.size() <= 1) {
6!
414
        for (auto* file_info : target_files) {
23✔
415
            if (limit > 0 &&
5!
UNCOV
416
                static_cast<int>(collected_events.size()) >= limit) {
×
UNCOV
417
                truncated = true;
×
UNCOV
418
                break;
×
419
            }
420
            if (file_info->uncompressed_size == 0 &&
5!
UNCOV
421
                file_info->num_checkpoints == 0)
×
UNCOV
422
                continue;
×
423

424
            ViewBuilderInput builder_input;
5✔
425
            builder_input.with_view(view)
10!
426
                .with_file_path(file_info->path)
5!
427
                .with_index_path(
5!
428
                    file_info->has_bloom_data ? file_info->index_path : "")
5!
429
                .with_uncompressed_size(file_info->uncompressed_size)
5!
430
                .with_num_checkpoints(file_info->num_checkpoints)
5!
431
                .with_bloom_cache(&index.bloom_cache())
5!
432
                .with_time_range(begin, end);
5!
433

434
            ViewBuilderUtility builder;
5!
435
            auto build_output = co_await builder.process(builder_input);
10!
436
            if (!build_output.success || !build_output.file_may_match) continue;
5!
437

438
            for (const auto& candidate : build_output.candidates) {
18✔
439
                if (limit > 0 &&
3!
UNCOV
440
                    static_cast<int>(collected_events.size()) >= limit) {
×
UNCOV
441
                    truncated = true;
×
UNCOV
442
                    break;
×
443
                }
444
                ViewReaderInput reader_input;
3✔
445
                reader_input.with_file_path(file_info->path)
3!
446
                    .with_index_path(file_info->index_path)
3!
447
                    .with_byte_range(candidate.start_byte, candidate.end_byte)
3!
448
                    .with_checkpoint_idx(candidate.checkpoint_idx)
3!
449
                    .with_view(view);
3!
450

451
                ViewReaderUtility reader;
3!
452
                auto gen = reader.process(reader_input);
3!
453
                while (auto batch = co_await gen.next()) {
24!
454
                    for (auto& event : batch->events) {
153✔
455
                        if (limit > 0 &&
150!
UNCOV
456
                            static_cast<int>(collected_events.size()) >=
×
UNCOV
457
                                limit) {
×
UNCOV
458
                            truncated = true;
×
UNCOV
459
                            break;
×
460
                        }
461
                        collected_events.emplace_back(event);
150!
462
                    }
150!
463
                    if (truncated) break;
3!
464
                }
6✔
465
            }
15✔
466
        }
17✔
467
    } else {
6✔
UNCOV
468
        std::size_t num_workers =
×
UNCOV
469
            std::min(index.max_concurrent(), target_files.size());
×
UNCOV
470
        auto* executor = Executor::current();
×
471

UNCOV
472
        auto file_chan = coro::make_channel<std::size_t>(num_workers * 2);
×
UNCOV
473
        auto collected_mutex = std::make_shared<std::mutex>();
×
UNCOV
474
        auto remaining = std::make_shared<std::atomic<int>>(
×
UNCOV
475
            limit > 0 ? limit : std::numeric_limits<int>::max());
×
476

UNCOV
477
        auto* target_files_ptr = &target_files;
×
UNCOV
478
        auto* collected_ptr = &collected_events;
×
UNCOV
479
        auto* view_ptr = &view;
×
UNCOV
480
        auto* bloom_cache_ptr = &index.bloom_cache();
×
UNCOV
481
        double t_begin = begin;
×
UNCOV
482
        double t_end = end;
×
483

UNCOV
484
        CoroScope scope(executor);
×
485

486
        scope.spawn([ch = file_chan->producer(), target_files_ptr](
×
UNCOV
487
                        CoroScope&) mutable -> coro::CoroTask<void> {
×
UNCOV
488
            auto guard = ch.guard();
×
UNCOV
489
            for (std::size_t i = 0; i < target_files_ptr->size(); ++i) {
×
UNCOV
490
                if (!co_await ch.send(i)) co_return;
×
UNCOV
491
            }
×
UNCOV
492
            co_return;
×
493
        });
×
494

UNCOV
495
        for (std::size_t w = 0; w < num_workers; ++w) {
×
496
            scope.spawn([file_chan, target_files_ptr, collected_mutex,
×
UNCOV
497
                         collected_ptr, view_ptr, bloom_cache_ptr, remaining,
×
UNCOV
498
                         t_begin, t_end](CoroScope&) -> coro::CoroTask<void> {
×
UNCOV
499
                while (auto fi_opt = co_await file_chan->receive()) {
×
UNCOV
500
                    if (remaining->load(std::memory_order_relaxed) <= 0)
×
UNCOV
501
                        co_return;
×
UNCOV
502
                    auto* file_info = (*target_files_ptr)[*fi_opt];
×
503

UNCOV
504
                    if (file_info->uncompressed_size == 0 &&
×
UNCOV
505
                        file_info->num_checkpoints == 0)
×
UNCOV
506
                        continue;
×
507

UNCOV
508
                    ViewBuilderInput builder_input;
×
UNCOV
509
                    builder_input.with_view(*view_ptr)
×
UNCOV
510
                        .with_file_path(file_info->path)
×
UNCOV
511
                        .with_index_path(file_info->has_bloom_data
×
UNCOV
512
                                             ? file_info->index_path
×
UNCOV
513
                                             : "")
×
UNCOV
514
                        .with_uncompressed_size(file_info->uncompressed_size)
×
UNCOV
515
                        .with_num_checkpoints(file_info->num_checkpoints)
×
UNCOV
516
                        .with_bloom_cache(bloom_cache_ptr)
×
UNCOV
517
                        .with_time_range(t_begin, t_end);
×
518

UNCOV
519
                    ViewBuilderUtility builder;
×
UNCOV
520
                    auto build_output = co_await builder.process(builder_input);
×
UNCOV
521
                    if (!build_output.success || !build_output.file_may_match)
×
UNCOV
522
                        continue;
×
523

UNCOV
524
                    for (const auto& candidate : build_output.candidates) {
×
UNCOV
525
                        if (remaining->load(std::memory_order_relaxed) <= 0)
×
UNCOV
526
                            break;
×
527

UNCOV
528
                        ViewReaderInput reader_input;
×
UNCOV
529
                        reader_input.with_file_path(file_info->path)
×
UNCOV
530
                            .with_index_path(file_info->index_path)
×
UNCOV
531
                            .with_byte_range(candidate.start_byte,
×
UNCOV
532
                                             candidate.end_byte)
×
UNCOV
533
                            .with_checkpoint_idx(candidate.checkpoint_idx)
×
UNCOV
534
                            .with_view(*view_ptr);
×
535

UNCOV
536
                        ViewReaderUtility reader;
×
UNCOV
537
                        auto gen = reader.process(reader_input);
×
UNCOV
538
                        while (auto batch = co_await gen.next()) {
×
UNCOV
539
                            if (!batch->events.empty()) {
×
UNCOV
540
                                std::lock_guard<std::mutex> lock(
×
UNCOV
541
                                    *collected_mutex);
×
UNCOV
542
                                for (auto& event : batch->events) {
×
UNCOV
543
                                    collected_ptr->emplace_back(event);
×
UNCOV
544
                                }
×
UNCOV
545
                                remaining->fetch_sub(
×
UNCOV
546
                                    static_cast<int>(batch->events.size()));
×
UNCOV
547
                            }
×
UNCOV
548
                        }
×
UNCOV
549
                    }
×
UNCOV
550
                }
×
UNCOV
551
                co_return;
×
552
            });
×
UNCOV
553
        }
×
554

UNCOV
555
        co_await scope.join();
×
556

UNCOV
557
        if (limit > 0 && static_cast<int>(collected_events.size()) > limit) {
×
UNCOV
558
            collected_events.resize(static_cast<std::size_t>(limit));
×
UNCOV
559
            truncated = true;
×
UNCOV
560
        }
×
UNCOV
561
    }
×
562

563
    std::string body = build_viz_events_body(
6!
564
        collected_events, global_min, original_begin, original_end, limit,
6✔
565
        truncated, index.global_min_timestamp_us());
6!
566
    co_return HttpResponse::ok(body);
6!
567
}
52✔
568

569
void register_viz_api(Router& router, TraceIndex& index) {
2✔
570
    auto* index_ptr = &index;
2✔
571

572
    router.get(
2!
573
        "/api/v1/viz/events",
2✔
574
        [index_ptr](const HttpRequest& req,
51!
575
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
7!
576
            co_return co_await handle_viz_events(req, params, *index_ptr);
35!
577
        });
14✔
578
}
2✔
579

580
}  // namespace dftracer::utils::server
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc