• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23529483807

25 Mar 2026 07:17AM UTC coverage: 48.515% (-1.6%) from 50.098%
23529483807

Pull #57

github

web-flow
Merge 5b1e117ad into 38f9f3616
Pull Request #57: feat(comparator): add pairwise traces comparator

18829 of 49412 branches covered (38.11%)

Branch coverage included in aggregate %.

1584 of 1933 new or added lines in 14 files covered. (81.95%)

3552 existing lines in 135 files now uncovered.

18474 of 27477 relevant lines covered (67.23%)

241072.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.11
/src/dftracer/utils/server/viz_api.cpp
1
#include <dftracer/utils/core/common/logging.h>
2
#include <dftracer/utils/core/coro/channel.h>
3
#include <dftracer/utils/core/pipeline/executor.h>
4
#include <dftracer/utils/core/tasks/coro_scope.h>
5
#include <dftracer/utils/server/http_request.h>
6
#include <dftracer/utils/server/http_response.h>
7
#include <dftracer/utils/server/router.h>
8
#include <dftracer/utils/server/trace_index.h>
9
#include <dftracer/utils/server/viz_api.h>
10
#include <dftracer/utils/utilities/common/json/json_doc_guard.h>
11
#include <dftracer/utils/utilities/common/json/json_value.h>
12
#include <dftracer/utils/utilities/common/query/query.h>
13
#include <dftracer/utils/utilities/composites/dft/views/view_builder_utility.h>
14
#include <dftracer/utils/utilities/composites/dft/views/view_definition.h>
15
#include <dftracer/utils/utilities/composites/dft/views/view_reader_utility.h>
16
#include <dftracer/utils/utilities/fileio/lines/sources/async_streaming_gz_line_generator.h>
17
#include <yyjson.h>
18

19
#include <atomic>
20
#include <cstddef>
21
#include <cstdint>
22
#include <limits>
23
#include <memory>
24
#include <mutex>
25
#include <string>
26
#include <unordered_map>
27
#include <unordered_set>
28
#include <vector>
29

30
namespace dftracer::utils::server {
31

32
using namespace dftracer::utils::utilities::composites::dft;
33
using namespace dftracer::utils::utilities::composites::dft::views;
34

35
using dftracer::utils::utilities::common::json::JsonDocGuard;
36
using dftracer::utils::utilities::common::json::JsonValue;
37
using dftracer::utils::utilities::common::query::Query;
38

39
static const std::unordered_set<std::string> HASH_METADATA_NAMES = {"FH", "HH",
222!
40
                                                                    "SH"};
222!
41

42
/// Normalize the "ts" field in a Chrome Trace Event JSON string by
43
/// subtracting an offset.  Returns the modified JSON.  Falls back to
44
/// the original string on parse failure.
45
static std::string normalize_event_ts(const std::string& event_json,
×
46
                                      std::uint64_t offset) {
47
    auto* doc = yyjson_read(event_json.c_str(), event_json.size(), 0);
×
48
    if (!doc) return event_json;
×
49

50
    auto* mdoc = yyjson_doc_mut_copy(doc, nullptr);
×
UNCOV
51
    yyjson_doc_free(doc);
×
52
    if (!mdoc) return event_json;
×
53

54
    auto* root = yyjson_mut_doc_get_root(mdoc);
×
55
    if (root) {
×
56
        auto* ts_val = yyjson_mut_obj_get(root, "ts");
×
57
        if (ts_val && yyjson_mut_is_uint(ts_val)) {
×
58
            std::uint64_t old_ts = yyjson_mut_get_uint(ts_val);
×
59
            std::uint64_t new_ts = old_ts >= offset ? old_ts - offset : 0;
×
UNCOV
60
            yyjson_mut_set_uint(ts_val, new_ts);
×
61
        } else if (ts_val && yyjson_mut_is_int(ts_val)) {
×
UNCOV
62
            auto old_ts =
×
63
                static_cast<std::uint64_t>(yyjson_mut_get_int(ts_val));
×
64
            std::uint64_t new_ts = old_ts >= offset ? old_ts - offset : 0;
×
UNCOV
65
            yyjson_mut_set_uint(ts_val, new_ts);
×
UNCOV
66
        }
×
UNCOV
67
    }
×
68

69
    std::size_t len = 0;
×
70
    char* json_str = yyjson_mut_write(mdoc, YYJSON_WRITE_NOFLAG, &len);
×
71
    yyjson_mut_doc_free(mdoc);
×
72

73
    if (json_str) {
×
74
        std::string result(json_str, len);
×
75
        free(json_str);
×
76
        return result;
×
77
    }
×
78
    return event_json;
×
UNCOV
79
}
×
80

81
/// Compute the minimum event duration threshold for a given summary level.
82
/// Level 1 = full detail, higher levels filter shorter events.
83
static double duration_threshold(double begin, double end, unsigned level,
6✔
84
                                 unsigned viewport_width = 1920) {
85
    if (level <= 1) return 0.0;
6!
86
    double range = end - begin;
×
UNCOV
87
    return range /
×
88
           (static_cast<double>(viewport_width) * static_cast<double>(level));
×
89
}
6✔
90

91
static std::string extract_json_value(yyjson_val* val) {
3✔
92
    if (yyjson_is_str(val)) return yyjson_get_str(val);
3✔
93
    if (yyjson_is_int(val)) return std::to_string(yyjson_get_int(val));
2!
94
    if (yyjson_is_uint(val)) return std::to_string(yyjson_get_uint(val));
×
95
    return {};
×
96
}
3✔
97

98
static void append_lane_clause(std::string& dsl, const char* field,
1✔
99
                               const std::string& val) {
100
    if (!dsl.empty()) dsl += " and ";
1!
101
    bool numeric =
1✔
102
        !val.empty() && std::all_of(val.begin(), val.end(),
1!
103
                                    [](char c) { return std::isdigit(c); });
1✔
104
    if (numeric) {
1!
105
        dsl += std::string(field) + " == " + val;
1!
106
    } else {
1✔
107
        dsl += std::string(field) + " == \"" + val + "\"";
×
108
    }
109
}
1✔
110

111
static void apply_lanes(std::string& dsl, std::string_view lanes_str) {
6✔
112
    if (lanes_str.empty()) return;
6✔
113

114
    std::string buf(lanes_str);
1✔
115
    auto* doc = yyjson_read(buf.c_str(), buf.size(), 0);
1!
116
    if (!doc) return;
1!
117
    auto doc_guard = std::unique_ptr<yyjson_doc, decltype(&yyjson_doc_free)>(
1✔
118
        doc, yyjson_doc_free);
1✔
119

120
    yyjson_val* root = yyjson_doc_get_root(doc);
1!
121
    if (!root) return;
1!
122

123
    if (yyjson_is_arr(root)) {
1!
124
        yyjson_val* item;
125
        yyjson_arr_iter iter;
126
        yyjson_arr_iter_init(root, &iter);
1!
127
        while ((item = yyjson_arr_iter_next(&iter)) != nullptr) {
2!
128
            if (!yyjson_is_obj(item)) continue;
1!
129
            auto* field_val = yyjson_obj_get(item, "field");
1!
130
            if (!field_val) field_val = yyjson_obj_get(item, "fields");
1!
131
            auto* value_val = yyjson_obj_get(item, "value");
1!
132
            if (!field_val || !value_val) continue;
1!
133
            const char* field = yyjson_get_str(field_val);
1!
134
            if (!field) continue;
1!
135
            auto val = extract_json_value(value_val);
1!
136
            if (!val.empty()) append_lane_clause(dsl, field, val);
1!
137
        }
1✔
138
    } else if (yyjson_is_obj(root)) {
1!
139
        auto* field_val = yyjson_obj_get(root, "field");
×
140
        if (!field_val) field_val = yyjson_obj_get(root, "fields");
×
141
        auto* value_val = yyjson_obj_get(root, "value");
×
142
        if (field_val && value_val) {
×
143
            const char* field = yyjson_get_str(field_val);
×
144
            if (field) {
×
145
                auto val = extract_json_value(value_val);
×
146
                if (!val.empty()) append_lane_clause(dsl, field, val);
×
147
            }
×
UNCOV
148
        }
×
UNCOV
149
    }
×
150
}
6!
151

152
static void apply_filters(std::string& dsl, std::string_view filters_str) {
6✔
153
    if (filters_str.empty()) return;
6✔
154

155
    std::string buf(filters_str);
2✔
156
    auto* doc = yyjson_read(buf.c_str(), buf.size(), 0);
2!
157
    if (!doc) return;
2!
158
    auto doc_guard = std::unique_ptr<yyjson_doc, decltype(&yyjson_doc_free)>(
2✔
159
        doc, yyjson_doc_free);
2✔
160

161
    yyjson_val* root = yyjson_doc_get_root(doc);
2✔
162
    if (!root || !yyjson_is_arr(root)) return;
2!
163

164
    yyjson_val* item;
165
    yyjson_arr_iter iter;
166
    yyjson_arr_iter_init(root, &iter);
2!
167
    while ((item = yyjson_arr_iter_next(&iter)) != nullptr) {
4!
168
        if (!yyjson_is_obj(item)) continue;
2!
169

170
        auto* field_val = yyjson_obj_get(item, "field");
2!
171
        auto* op_val = yyjson_obj_get(item, "op");
2!
172
        auto* value_val = yyjson_obj_get(item, "value");
2!
173
        if (!field_val || !op_val || !value_val) continue;
2!
174

175
        const char* field = yyjson_get_str(field_val);
2!
176
        const char* op = yyjson_get_str(op_val);
2!
177
        if (!field || !op) continue;
2!
178

179
        std::string val = extract_json_value(value_val);
2!
180
        if (val.empty()) continue;
2!
181

182
        std::string op_str(op);
2!
183
        std::string field_str(field);
2!
184
        if (field_str == "begin") field_str = "ts";
2!
185
        if (field_str == "end") field_str = "ts";
2!
186
        if (field_str == "duration") field_str = "dur";
2!
187

188
        std::string query_op;
2✔
189
        if (op_str == "=")
2✔
190
            query_op = "==";
1!
191
        else if (op_str == ">=")
1!
192
            query_op = ">=";
1!
193
        else if (op_str == "<=")
×
194
            query_op = "<=";
×
195
        else if (op_str == ">")
×
196
            query_op = ">";
×
197
        else if (op_str == "<")
×
198
            query_op = "<";
×
199
        else
200
            continue;
×
201

202
        if (!dsl.empty()) dsl += " and ";
2!
203
        bool numeric = !val.empty() && (std::isdigit(val[0]) || val[0] == '-');
4!
204
        if (numeric || query_op != "==") {
2!
205
            dsl += field_str + " " + query_op + " " + val;
2!
206
        } else {
2✔
207
            dsl += field_str + " " + query_op + " \"" + val + "\"";
×
208
        }
209
    }
2!
210
}
6✔
211

212
/// Direct-scan a small file without any sidecar index.
213
/// Streams via async_streaming_gz_lines(), parses JSON, applies
214
/// predicate filters, collects matching events as raw JSON strings.
215
static coro::CoroTask<void> direct_scan_events(
1,302!
216
    const TraceIndex::FileInfo* file_info, const Query* query,
217
    bool include_metadata, std::vector<std::string>* collected_events,
218
    std::uint64_t* total_scanned, std::uint64_t* total_matched, int limit) {
6!
219
    using dftracer::utils::utilities::fileio::lines::sources::
220
        async_streaming_gz_lines;
221

222
    try {
223
        auto gen = async_streaming_gz_lines(file_info->path);
6!
224

225
        std::unordered_map<std::string, std::string> pending_metadata;
6✔
226
        std::unordered_set<std::string> emitted_hashes;
6✔
227

228
        while (auto line = co_await gen.next()) {
1,278!
229
            if (limit > 0 &&
312!
UNCOV
230
                collected_events->size() >= static_cast<std::size_t>(limit)) {
×
UNCOV
231
                co_return;
×
232
            }
233
            if (line->content.empty()) continue;
312!
234

235
            JsonDocGuard guard{yyjson_read_opts(
624!
236
                const_cast<char*>(line->content.data()), line->content.size(),
312✔
237
                YYJSON_READ_NOFLAG, nullptr, nullptr)};
238
            if (!guard.doc) continue;
312✔
239

240
            yyjson_val* root = yyjson_doc_get_root(guard.doc);
300✔
241
            if (root && yyjson_is_obj(root)) {
300!
242
                JsonValue json(root);
300!
243
                // line->content is a string_view valid only for this
244
                // iteration.  All storage into collected_events and
245
                // pending_metadata must copy to owning std::string.
246
                std::string_view ph = json["ph"].get<std::string_view>();
300!
247

248
                if (ph == "M" && include_metadata) {
300!
UNCOV
249
                    std::string name_str = json["name"].get<std::string>();
×
250

UNCOV
251
                    if (HASH_METADATA_NAMES.count(name_str)) {
×
UNCOV
252
                        auto args = json["args"];
×
UNCOV
253
                        if (args.exists()) {
×
UNCOV
254
                            auto val = args["value"];
×
UNCOV
255
                            if (val.exists()) {
×
UNCOV
256
                                std::string hash_val = val.get<std::string>();
×
UNCOV
257
                                if (!emitted_hashes.count(hash_val)) {
×
UNCOV
258
                                    pending_metadata[hash_val] =
×
UNCOV
259
                                        std::string(line->content.data(),
×
UNCOV
260
                                                    line->content.size());
×
UNCOV
261
                                }
×
UNCOV
262
                            }
×
UNCOV
263
                        }
×
UNCOV
264
                    } else {
×
UNCOV
265
                        collected_events->emplace_back(line->content.data(),
×
UNCOV
266
                                                       line->content.size());
×
UNCOV
267
                        (*total_matched)++;
×
268
                    }
269
                } else if (ph != "M") {
300!
270
                    (*total_scanned)++;
300✔
271
                    if (!query || query->evaluate(json)) {
300!
272
                        // Flush referenced hash metadata first
UNCOV
273
                        if (include_metadata) {
×
UNCOV
274
                            auto args = json["args"];
×
UNCOV
275
                            if (args.exists()) {
×
276
                                static const char* hash_fields[] = {
277
                                    "hhash", "fhash", "shash"};
UNCOV
278
                                for (const char* field : hash_fields) {
×
UNCOV
279
                                    auto val = args[field];
×
UNCOV
280
                                    if (!val.exists()) continue;
×
UNCOV
281
                                    std::string hash_val =
×
UNCOV
282
                                        val.get<std::string>();
×
UNCOV
283
                                    if (emitted_hashes.count(hash_val))
×
UNCOV
284
                                        continue;
×
UNCOV
285
                                    auto it = pending_metadata.find(hash_val);
×
UNCOV
286
                                    if (it != pending_metadata.end()) {
×
UNCOV
287
                                        collected_events->push_back(
×
UNCOV
288
                                            std::move(it->second));
×
UNCOV
289
                                        (*total_matched)++;
×
UNCOV
290
                                        emitted_hashes.insert(hash_val);
×
UNCOV
291
                                        pending_metadata.erase(it);
×
UNCOV
292
                                    }
×
UNCOV
293
                                }
×
UNCOV
294
                            }
×
UNCOV
295
                        }
×
UNCOV
296
                        collected_events->emplace_back(line->content.data(),
×
UNCOV
297
                                                       line->content.size());
×
UNCOV
298
                        (*total_matched)++;
×
UNCOV
299
                    }
×
300
                }
300✔
301
            }
300✔
302
        }
318✔
303
    } catch (const std::exception& e) {
642!
UNCOV
304
        DFTRACER_UTILS_LOG_WARN("Direct scan failed for %s: %s",
×
305
                                file_info->path.c_str(), e.what());
UNCOV
306
    }
×
307

308
    co_return;
6✔
309
}
1,272✔
310

311
// --- GET /api/v1/viz/events ---
312
static coro::CoroTask<HttpResponse> handle_viz_events(
59!
313
    const HttpRequest& /*req*/, const QueryParams& params, TraceIndex& index) {
7!
314
    // Required: begin, end, summary
315
    if (!params.has("begin") || !params.has("end") || !params.has("summary")) {
7!
316
        co_return HttpResponse::bad_request(
8!
317
            "Missing required parameters: begin, end, summary");
1!
318
    }
319

320
    double begin = params.get_double("begin", 0);
6!
321
    double end = params.get_double("end", 0);
6!
322
    int summary = params.get_int("summary", 1);
6!
323
    if (summary < 1) summary = 1;
6!
324

325
    // Timestamp normalization: default ON, opt-out with ?ts_normalize=0
326
    auto ts_norm_param = params.get("ts_normalize");
6!
327
    bool normalize = ts_norm_param.empty() || ts_norm_param != "0";
6!
328

329
    std::uint64_t global_min = 0;
6✔
330
    if (normalize) {
6✔
331
        global_min = index.global_min_timestamp_us();
5!
332
        if (global_min == std::numeric_limits<std::uint64_t>::max()) {
5!
333
            global_min = 0;  // No valid bounds, skip normalization
5✔
334
        }
5✔
335
    }
5✔
336

337
    // When normalization is active the user sends normalized
338
    // begin/end values (relative to global_min).  De-normalize them
339
    // so the predicate filters against absolute timestamps.
340
    double original_begin = begin;
6✔
341
    double original_end = end;
6✔
342
    if (normalize && global_min > 0) {
6!
UNCOV
343
        begin += static_cast<double>(global_min);
×
UNCOV
344
        end += static_cast<double>(global_min);
×
UNCOV
345
    }
×
346

347
    double min_dur =
6✔
348
        duration_threshold(begin, end, static_cast<unsigned>(summary));
6!
349

350
    // Build view with time range and optional filters
351
    ViewDefinition view;
6✔
352
    view.name = "viz_query";
6!
353
    view.description = "Visualization query";
6!
354

355
    std::string dsl;
6✔
356
    dsl += "ts >= " + std::to_string(static_cast<uint64_t>(begin));
6!
357
    dsl += " and ts <= " + std::to_string(static_cast<uint64_t>(end));
6!
358
    if (min_dur > 0) {
6!
UNCOV
359
        dsl += " and dur >= " + std::to_string(static_cast<uint64_t>(min_dur));
×
UNCOV
360
    }
×
361

362
    apply_lanes(dsl, params.get("lanes"));
6!
363
    apply_filters(dsl, params.get("filters"));
6!
364

365
    auto pid = params.get("pid");
6!
366
    if (!pid.empty()) {
6!
UNCOV
367
        dsl += " and pid == " + std::string(pid);
×
UNCOV
368
    }
×
369

370
    auto tid = params.get("tid");
6!
371
    if (!tid.empty()) {
6!
UNCOV
372
        dsl += " and tid == " + std::string(tid);
×
UNCOV
373
    }
×
374

375
    auto cat = params.get("cat");
6!
376
    if (!cat.empty()) {
6!
UNCOV
377
        dsl += " and cat == \"" + std::string(cat) + "\"";
×
UNCOV
378
    }
×
379

380
    view.with_query(dsl);
6!
381

382
    // Optional limit: 0 (default) means no limit.
383
    int limit = params.get_int("limit", 0);
6!
384
    if (limit < 0) limit = 0;
6!
385

386
    // Determine files
387
    std::vector<const TraceIndex::FileInfo*> target_files;
6✔
388
    auto file_param = params.get("file");
6!
389
    if (!file_param.empty()) {
6!
UNCOV
390
        auto* f = index.find_file(std::string(file_param));
×
UNCOV
391
        if (f) target_files.push_back(f);
×
UNCOV
392
    } else {
×
393
        for (const auto& f : index.files()) {
12!
394
            target_files.push_back(&f);
6!
395
        }
6✔
396
    }
397

398
    // File-level time range skip: remove files whose cached time
399
    // bounds don't overlap the query window [begin, end].
400
    if (begin > 0 || end > 0) {
6!
401
        std::vector<const TraceIndex::FileInfo*> filtered;
6✔
402
        filtered.reserve(target_files.size());
6!
403
        for (auto* fi : target_files) {
12✔
404
            if (fi->is_small) {
6!
405
                filtered.push_back(fi);
6!
406
                continue;
6✔
407
            }
UNCOV
408
            if (fi->min_timestamp_us == 0 && fi->max_timestamp_us == 0) {
×
UNCOV
409
                filtered.push_back(fi);
×
UNCOV
410
                continue;
×
411
            }
UNCOV
412
            double fi_min = static_cast<double>(fi->min_timestamp_us);
×
UNCOV
413
            double fi_max = static_cast<double>(fi->max_timestamp_us);
×
UNCOV
414
            if (fi_max < begin || fi_min > end) continue;
×
UNCOV
415
            filtered.push_back(fi);
×
416
        }
6!
417
        target_files = std::move(filtered);
6✔
418
    }
6✔
419

420
    const Query* viz_query_ptr = view.query ? &*view.query : nullptr;
6!
421

422
    std::vector<std::string> collected_events;
6✔
423

424
    bool truncated = false;
6✔
425

426
    if (target_files.size() <= 1) {
6!
427
        for (auto* file_info : target_files) {
36✔
428
            if (limit > 0 &&
18!
UNCOV
429
                static_cast<int>(collected_events.size()) >= limit) {
×
UNCOV
430
                truncated = true;
×
UNCOV
431
                break;
×
432
            }
433
            if (file_info->is_small) {
18!
434
                std::uint64_t scanned = 0;
18✔
435
                std::uint64_t matched = 0;
18✔
436
                co_await direct_scan_events(
42!
437
                    file_info, viz_query_ptr, view.include_metadata,
18✔
438
                    &collected_events, &scanned, &matched, limit);
18✔
439
                if (limit > 0 &&
6!
UNCOV
440
                    static_cast<int>(collected_events.size()) >= limit)
×
UNCOV
441
                    truncated = true;
×
442
            } else {
6!
UNCOV
443
                if (file_info->uncompressed_size == 0 &&
×
UNCOV
444
                    file_info->num_checkpoints == 0)
×
UNCOV
445
                    continue;
×
446

UNCOV
447
                ViewBuilderInput builder_input;
×
UNCOV
448
                builder_input.with_view(view)
×
UNCOV
449
                    .with_file_path(file_info->path)
×
UNCOV
450
                    .with_idx_path(
×
UNCOV
451
                        file_info->has_bloom_data ? file_info->idx_path : "")
×
UNCOV
452
                    .with_uncompressed_size(file_info->uncompressed_size)
×
UNCOV
453
                    .with_num_checkpoints(file_info->num_checkpoints)
×
UNCOV
454
                    .with_bloom_cache(&index.bloom_cache())
×
UNCOV
455
                    .with_time_range(begin, end);
×
456

UNCOV
457
                ViewBuilderUtility builder;
×
UNCOV
458
                auto build_output = co_await builder.process(builder_input);
×
UNCOV
459
                if (!build_output.success || !build_output.file_may_match)
×
UNCOV
460
                    continue;
×
461

UNCOV
462
                for (const auto& candidate : build_output.candidates) {
×
UNCOV
463
                    if (limit > 0 &&
×
UNCOV
464
                        static_cast<int>(collected_events.size()) >= limit) {
×
UNCOV
465
                        truncated = true;
×
UNCOV
466
                        break;
×
467
                    }
UNCOV
468
                    ViewReaderInput reader_input;
×
UNCOV
469
                    reader_input.with_file_path(file_info->path)
×
UNCOV
470
                        .with_idx_path(file_info->idx_path)
×
UNCOV
471
                        .with_byte_range(candidate.start_byte,
×
UNCOV
472
                                         candidate.end_byte)
×
UNCOV
473
                        .with_checkpoint_idx(candidate.checkpoint_idx)
×
UNCOV
474
                        .with_view(view);
×
475

UNCOV
476
                    ViewReaderUtility reader;
×
UNCOV
477
                    auto gen = reader.process(reader_input);
×
UNCOV
478
                    while (auto batch = co_await gen.next()) {
×
UNCOV
479
                        for (auto& event : batch->events) {
×
UNCOV
480
                            if (limit > 0 &&
×
UNCOV
481
                                static_cast<int>(collected_events.size()) >=
×
UNCOV
482
                                    limit) {
×
UNCOV
483
                                truncated = true;
×
UNCOV
484
                                break;
×
485
                            }
UNCOV
486
                            collected_events.push_back(std::move(event));
×
UNCOV
487
                        }
×
UNCOV
488
                        if (truncated) break;
×
UNCOV
489
                    }
×
UNCOV
490
                }
×
UNCOV
491
            }
×
492
        }
6✔
493
    } else {
6✔
UNCOV
494
        std::size_t num_workers =
×
UNCOV
495
            std::min(index.max_concurrent(), target_files.size());
×
UNCOV
496
        auto* executor = Executor::current();
×
497

UNCOV
498
        auto file_chan = coro::make_channel<std::size_t>(num_workers * 2);
×
UNCOV
499
        auto collected_mutex = std::make_shared<std::mutex>();
×
UNCOV
500
        auto remaining = std::make_shared<std::atomic<int>>(
×
UNCOV
501
            limit > 0 ? limit : std::numeric_limits<int>::max());
×
502

UNCOV
503
        auto* target_files_ptr = &target_files;
×
UNCOV
504
        auto* collected_ptr = &collected_events;
×
UNCOV
505
        auto* view_ptr = &view;
×
UNCOV
506
        auto* bloom_cache_ptr = &index.bloom_cache();
×
UNCOV
507
        double t_begin = begin;
×
UNCOV
508
        double t_end = end;
×
509

UNCOV
510
        CoroScope scope(executor);
×
511

512
        scope.spawn([ch = file_chan->producer(), target_files_ptr](
×
UNCOV
513
                        CoroScope&) mutable -> coro::CoroTask<void> {
×
UNCOV
514
            auto guard = ch.guard();
×
UNCOV
515
            for (std::size_t i = 0; i < target_files_ptr->size(); ++i) {
×
UNCOV
516
                if (!co_await ch.send(i)) co_return;
×
UNCOV
517
            }
×
UNCOV
518
            co_return;
×
519
        });
×
520

UNCOV
521
        for (std::size_t w = 0; w < num_workers; ++w) {
×
522
            scope.spawn([file_chan, target_files_ptr, collected_mutex,
×
UNCOV
523
                         collected_ptr, viz_query_ptr, view_ptr,
×
UNCOV
524
                         bloom_cache_ptr, remaining, t_begin,
×
UNCOV
525
                         t_end](CoroScope&) -> coro::CoroTask<void> {
×
UNCOV
526
                while (auto fi_opt = co_await file_chan->receive()) {
×
UNCOV
527
                    if (remaining->load(std::memory_order_relaxed) <= 0)
×
UNCOV
528
                        co_return;
×
UNCOV
529
                    auto* file_info = (*target_files_ptr)[*fi_opt];
×
530

UNCOV
531
                    if (file_info->is_small) {
×
UNCOV
532
                        std::vector<std::string> local_events;
×
UNCOV
533
                        std::uint64_t local_scanned = 0;
×
UNCOV
534
                        std::uint64_t local_matched = 0;
×
UNCOV
535
                        int local_limit =
×
UNCOV
536
                            remaining->load(std::memory_order_relaxed);
×
UNCOV
537
                        if (local_limit <= 0) co_return;
×
UNCOV
538
                        co_await direct_scan_events(
×
UNCOV
539
                            file_info, viz_query_ptr,
×
UNCOV
540
                            view_ptr->include_metadata, &local_events,
×
UNCOV
541
                            &local_scanned, &local_matched, local_limit);
×
UNCOV
542
                        if (!local_events.empty()) {
×
UNCOV
543
                            std::lock_guard<std::mutex> lock(*collected_mutex);
×
UNCOV
544
                            for (auto& ev : local_events) {
×
UNCOV
545
                                collected_ptr->push_back(std::move(ev));
×
UNCOV
546
                            }
×
UNCOV
547
                            remaining->fetch_sub(
×
UNCOV
548
                                static_cast<int>(local_events.size()));
×
UNCOV
549
                        }
×
UNCOV
550
                    } else {
×
UNCOV
551
                        if (file_info->uncompressed_size == 0 &&
×
UNCOV
552
                            file_info->num_checkpoints == 0)
×
UNCOV
553
                            continue;
×
554

UNCOV
555
                        ViewBuilderInput builder_input;
×
UNCOV
556
                        builder_input.with_view(*view_ptr)
×
UNCOV
557
                            .with_file_path(file_info->path)
×
UNCOV
558
                            .with_idx_path(file_info->has_bloom_data
×
UNCOV
559
                                               ? file_info->idx_path
×
UNCOV
560
                                               : "")
×
UNCOV
561
                            .with_uncompressed_size(
×
UNCOV
562
                                file_info->uncompressed_size)
×
UNCOV
563
                            .with_num_checkpoints(file_info->num_checkpoints)
×
UNCOV
564
                            .with_bloom_cache(bloom_cache_ptr)
×
UNCOV
565
                            .with_time_range(t_begin, t_end);
×
566

UNCOV
567
                        ViewBuilderUtility builder;
×
UNCOV
568
                        auto build_output =
×
UNCOV
569
                            co_await builder.process(builder_input);
×
UNCOV
570
                        if (!build_output.success ||
×
UNCOV
571
                            !build_output.file_may_match)
×
UNCOV
572
                            continue;
×
573

UNCOV
574
                        for (const auto& candidate : build_output.candidates) {
×
UNCOV
575
                            if (remaining->load(std::memory_order_relaxed) <= 0)
×
UNCOV
576
                                break;
×
577

UNCOV
578
                            ViewReaderInput reader_input;
×
UNCOV
579
                            reader_input.with_file_path(file_info->path)
×
UNCOV
580
                                .with_idx_path(file_info->idx_path)
×
UNCOV
581
                                .with_byte_range(candidate.start_byte,
×
UNCOV
582
                                                 candidate.end_byte)
×
UNCOV
583
                                .with_checkpoint_idx(candidate.checkpoint_idx)
×
UNCOV
584
                                .with_view(*view_ptr);
×
585

UNCOV
586
                            ViewReaderUtility reader;
×
UNCOV
587
                            auto gen = reader.process(reader_input);
×
UNCOV
588
                            while (auto batch = co_await gen.next()) {
×
UNCOV
589
                                if (!batch->events.empty()) {
×
UNCOV
590
                                    std::lock_guard<std::mutex> lock(
×
UNCOV
591
                                        *collected_mutex);
×
UNCOV
592
                                    for (auto& event : batch->events) {
×
UNCOV
593
                                        collected_ptr->push_back(
×
UNCOV
594
                                            std::move(event));
×
UNCOV
595
                                    }
×
UNCOV
596
                                    remaining->fetch_sub(
×
UNCOV
597
                                        static_cast<int>(batch->events.size()));
×
UNCOV
598
                                }
×
UNCOV
599
                            }
×
UNCOV
600
                        }
×
UNCOV
601
                    }
×
UNCOV
602
                }
×
UNCOV
603
                co_return;
×
604
            });
×
UNCOV
605
        }
×
606

UNCOV
607
        co_await scope.join();
×
608

UNCOV
609
        if (limit > 0 && static_cast<int>(collected_events.size()) > limit) {
×
UNCOV
610
            collected_events.resize(static_cast<std::size_t>(limit));
×
UNCOV
611
            truncated = true;
×
UNCOV
612
        }
×
UNCOV
613
    }
×
614

615
    // Apply normalization to event timestamps.
616
    if (normalize && global_min > 0) {
6!
UNCOV
617
        for (auto& event : collected_events) {
×
UNCOV
618
            event = normalize_event_ts(event, global_min);
×
UNCOV
619
        }
×
UNCOV
620
    }
×
621

622
    // Use the original (normalized) begin/end for metadata.
623
    double meta_begin = original_begin;
6✔
624
    double meta_end = original_end;
6✔
625

626
    // Build response matching the Chrome Trace Event Format.
627
    // Pre-compute size to avoid repeated reallocations.
628
    std::size_t body_size = 256;
6✔
629
    for (const auto& ev : collected_events) body_size += ev.size() + 1;
6!
630
    std::string body;
6✔
631
    body.reserve(body_size);
6!
632
    body += "{\"events\":[";
6!
633
    for (std::size_t i = 0; i < collected_events.size(); ++i) {
6!
UNCOV
634
        if (i > 0) body += ',';
×
UNCOV
635
        body += collected_events[i];  // Already JSON
×
UNCOV
636
    }
×
637
    body += "],\"metadata\":{\"begin\":";
6!
638
    body += std::to_string(meta_begin);
6!
639
    body += ",\"end\":";
6!
640
    body += std::to_string(meta_end);
6!
641
    body += ",\"count\":";
6!
642
    body += std::to_string(collected_events.size());
6!
643
    body += ",\"limit\":";
6!
644
    body += std::to_string(limit);
6!
645
    body += ",\"truncated\":";
6!
646
    body += truncated ? "true" : "false";
6!
647
    body += ",\"ts_normalized\":";
6!
648
    body += (normalize && global_min > 0) ? "true" : "false";
6!
649
    body += ",\"global_min_timestamp_us\":";
6!
650
    body += std::to_string(index.global_min_timestamp_us());
6!
651
    body += "}}";
6!
652

653
    co_return HttpResponse::ok(body);
6!
654
}
30✔
655

656
void register_viz_api(Router& router, TraceIndex& index) {
2✔
657
    auto* index_ptr = &index;
2✔
658

659
    router.get(
2!
660
        "/api/v1/viz/events",
2✔
661
        [index_ptr](const HttpRequest& req,
51!
662
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
7!
663
            co_return co_await handle_viz_events(req, params, *index_ptr);
35!
664
        });
14✔
665
}
2✔
666

667
}  // namespace dftracer::utils::server
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc