• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23730905027

30 Mar 2026 06:22AM UTC coverage: 51.451% (+0.4%) from 51.022%
23730905027

push

github

rayandrew
chore(docs)!: regenerate C++ API reference pages from Doxygen XML

- Add generate_api_index.py script for automated API doc generation
- Rename core_common.rst to core_infrastructure.rst
- Update all API reference pages with current class/function signatures
- Add doxygen group annotations to public headers

BREAKING CHANGE: API reference page structure reorganized

23019 of 57787 branches covered (39.83%)

Branch coverage included in aggregate %.

20057 of 25936 relevant lines covered (77.33%)

13268.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.83
/src/dftracer/utils/server/trace_api.cpp
1
#include <dftracer/utils/core/common/filesystem.h>
2
#include <dftracer/utils/core/common/logging.h>
3
#include <dftracer/utils/core/common/transparent_string_hash.h>
4
#include <dftracer/utils/core/coro/channel.h>
5
#include <dftracer/utils/core/pipeline/executor.h>
6
#include <dftracer/utils/core/tasks/coro_scope.h>
7
#include <dftracer/utils/server/cursor.h>
8
#include <dftracer/utils/server/http_request.h>
9
#include <dftracer/utils/server/http_response.h>
10
#include <dftracer/utils/server/router.h>
11
#include <dftracer/utils/server/trace_api.h>
12
#include <dftracer/utils/server/trace_index.h>
13
#include <dftracer/utils/utilities/common/json/json_doc_guard.h>
14
#include <dftracer/utils/utilities/common/json/json_value.h>
15
#include <dftracer/utils/utilities/common/query/query.h>
16
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
17
#include <dftracer/utils/utilities/composites/dft/statistics/statistics_aggregator_utility.h>
18
#include <dftracer/utils/utilities/composites/dft/statistics/statistics_query_utility.h>
19
#include <dftracer/utils/utilities/composites/dft/views/view_builder_utility.h>
20
#include <dftracer/utils/utilities/composites/dft/views/view_definition.h>
21
#include <dftracer/utils/utilities/composites/dft/views/view_reader_utility.h>
22
#include <dftracer/utils/utilities/fileio/lines/sources/async_streaming_gz_line_generator.h>
23
#include <yyjson.h>
24

25
#include <atomic>
26
#include <cstddef>
27
#include <limits>
28
#include <mutex>
29
#include <string>
30
#include <unordered_map>
31
#include <unordered_set>
32
#include <vector>
33

34
namespace dftracer::utils::server {
35

36
using namespace dftracer::utils::utilities::composites::dft;
37
using namespace dftracer::utils::utilities::composites::dft::indexing;
38
using namespace dftracer::utils::utilities::composites::dft::statistics;
39
using namespace dftracer::utils::utilities::composites::dft::views;
40

41
// JSON-escape a string value (minimal: quotes, backslash, control chars).
42
static std::string json_escape(const std::string& s) {
8✔
43
    std::string out;
8✔
44
    out.reserve(s.size() + 2);
8!
45
    for (char c : s) {
740✔
46
        switch (c) {
732!
47
            case '"':
48
                out += "\\\"";
×
49
                break;
×
50
            case '\\':
51
                out += "\\\\";
×
52
                break;
×
53
            case '\n':
54
                out += "\\n";
×
55
                break;
×
56
            case '\r':
57
                out += "\\r";
×
58
                break;
×
59
            case '\t':
60
                out += "\\t";
×
61
                break;
×
62
            default:
280✔
63
                out += c;
732!
64
                break;
732✔
65
        }
66
    }
67
    return out;
8✔
68
}
4!
69

70
using dftracer::utils::utilities::common::json::JsonValue;
71

72
// Hash metadata types that need smart filtering (FH, HH, SH).
73
static const std::unordered_set<std::string> HASH_METADATA_NAMES = {"FH", "HH",
226!
74
                                                                    "SH"};
226!
75

76
using dftracer::utils::utilities::common::json::JsonDocGuard;
77
using dftracer::utils::utilities::common::query::Query;
78

79
/// Direct-scan a small file without any sidecar index.
80
/// Streams via async_streaming_gz_lines(), parses JSON, applies
81
/// predicate filters, collects matching events as raw JSON strings.
82
static coro::CoroTask<void> direct_scan_events(
268!
83
    const TraceIndex::FileInfo* file_info, const Query* query,
84
    bool include_metadata, std::vector<std::string>* collected_events,
85
    std::uint64_t* total_scanned, std::uint64_t* total_matched, int limit) {
2!
86
    using dftracer::utils::utilities::fileio::lines::sources::
87
        async_streaming_gz_lines;
88

89
    try {
90
        auto gen = async_streaming_gz_lines(file_info->path);
2!
91

92
        std::unordered_map<std::string, std::string> pending_metadata;
2✔
93
        std::unordered_set<std::string> emitted_hashes;
2✔
94

95
        while (auto line = co_await gen.next()) {
262!
96
            if (limit > 0 &&
64✔
97
                collected_events->size() >= static_cast<std::size_t>(limit)) {
12✔
98
                co_return;
1✔
99
            }
100
            if (line->content.empty()) continue;
63!
101

102
            JsonDocGuard guard{yyjson_read_opts(
126!
103
                const_cast<char*>(line->content.data()), line->content.size(),
63✔
104
                YYJSON_READ_NOFLAG, nullptr, nullptr)};
105
            if (!guard.doc) continue;
63✔
106

107
            yyjson_val* root = yyjson_doc_get_root(guard.doc);
60!
108
            if (root && yyjson_is_obj(root)) {
60!
109
                JsonValue json(root);
60!
110
                // line->content is a string_view valid only for this
111
                // iteration.  All storage into collected_events and
112
                // pending_metadata must copy to owning std::string.
113
                std::string_view ph = json["ph"].get<std::string_view>();
60!
114

115
                if (ph == "M" && include_metadata) {
60!
116
                    std::string name_str = json["name"].get<std::string>();
×
117

118
                    if (HASH_METADATA_NAMES.count(name_str)) {
×
119
                        auto args = json["args"];
×
120
                        if (args.exists()) {
×
121
                            auto val = args["value"];
×
122
                            if (val.exists()) {
×
123
                                std::string hash_val = val.get<std::string>();
×
124
                                if (!emitted_hashes.count(hash_val)) {
×
125
                                    pending_metadata[hash_val] =
×
126
                                        std::string(line->content.data(),
×
127
                                                    line->content.size());
128
                                }
129
                            }
130
                        }
131
                    } else {
132
                        collected_events->emplace_back(line->content.data(),
×
133
                                                       line->content.size());
134
                        (*total_matched)++;
135
                    }
136
                } else if (ph != "M") {
60!
137
                    (*total_scanned)++;
60✔
138
                    if (!query || query->evaluate(json)) {
60!
139
                        // Flush referenced hash metadata first
140
                        if (include_metadata) {
60!
141
                            auto args = json["args"];
60!
142
                            if (args.exists()) {
60!
143
                                static const char* hash_fields[] = {
144
                                    "hhash", "fhash", "shash"};
145
                                for (const char* field : hash_fields) {
240✔
146
                                    auto val = args[field];
180!
147
                                    if (!val.exists()) continue;
180!
148
                                    std::string hash_val =
60✔
149
                                        val.get<std::string>();
60!
150
                                    if (emitted_hashes.count(hash_val))
60!
151
                                        continue;
152
                                    auto it = pending_metadata.find(hash_val);
60!
153
                                    if (it != pending_metadata.end()) {
60!
154
                                        collected_events->push_back(
×
155
                                            std::move(it->second));
×
156
                                        (*total_matched)++;
157
                                        emitted_hashes.insert(hash_val);
×
158
                                        pending_metadata.erase(it);
×
159
                                    }
160
                                }
180✔
161
                            }
60✔
162
                        }
60✔
163
                        collected_events->emplace_back(line->content.data(),
120!
164
                                                       line->content.size());
60✔
165
                        (*total_matched)++;
60✔
166
                    }
60✔
167
                }
60✔
168
            }
60✔
169
        }
65✔
170
    } catch (const std::exception& e) {
130!
171
        DFTRACER_UTILS_LOG_WARN("Direct scan failed for %s: %s",
×
172
                                file_info->path.c_str(), e.what());
173
    }
×
174

175
    co_return;
1✔
176
}
264!
177

178
// --- GET /api/v1/files ---
179
static coro::CoroTask<HttpResponse> handle_files(const HttpRequest& /*req*/,
12!
180
                                                 const QueryParams& /*params*/,
181
                                                 TraceIndex& index) {
2!
182
    std::string body;
2✔
183
    body.reserve(128 * index.file_count() + 32);
2!
184
    body += "{\"files\":[";
2!
185
    bool first = true;
2✔
186
    for (const auto& f : index.files()) {
4!
187
        if (!first) body += ',';
2!
188
        first = false;
2✔
189
        body += "{\"path\":\"";
2!
190
        body += json_escape(f.path);
2!
191
        body += "\",\"has_bloom_data\":";
2!
192
        body += f.has_bloom_data ? "true" : "false";
2!
193
        body += ",\"has_checkpoint_index\":";
2!
194
        body += f.has_checkpoint_index ? "true" : "false";
2!
195
        body += ",\"is_small\":";
2!
196
        body += f.is_small ? "true" : "false";
2!
197
        body += '}';
2!
198
    }
2✔
199
    body += "],\"count\":";
2!
200
    body += std::to_string(index.file_count());
2!
201
    body += '}';
2!
202
    co_return HttpResponse::ok(body);
4!
203
}
6!
204

205
// --- GET /api/v1/files/info ---
206
static coro::CoroTask<HttpResponse> handle_file_info(const HttpRequest& /*req*/,
16!
207
                                                     const QueryParams& params,
208
                                                     TraceIndex& index) {
2!
209
    auto file_param = params.get("file");
2!
210
    if (file_param.empty()) {
2✔
211
        co_return HttpResponse::bad_request("Missing required parameter: file");
3!
212
    }
213

214
    std::string file_path(file_param);
1!
215
    auto* info = index.find_file(file_path);
1!
216
    if (!info) {
1!
217
        co_return HttpResponse::not_found();
×
218
    }
219

220
    std::string body;
1✔
221
    body.reserve(512);
1!
222
    body += "{\"path\":\"";
1!
223
    body += json_escape(info->path);
1!
224
    body += "\",\"has_bloom_data\":";
1!
225
    body += info->has_bloom_data ? "true" : "false";
1!
226
    body += ",\"has_checkpoint_index\":";
1!
227
    body += info->has_checkpoint_index ? "true" : "false";
1!
228
    body += ",\"is_small\":";
1!
229
    body += info->is_small ? "true" : "false";
1!
230

231
    body += ",\"size_mb\":";
1!
232
    body += std::to_string(info->size_mb);
1!
233
    body += ",\"compressed_size\":";
1!
234
    body += std::to_string(info->compressed_size);
1!
235
    if (!info->is_small) {
1!
236
        body += ",\"num_lines\":";
×
237
        body += std::to_string(info->num_lines);
×
238
        body += ",\"num_checkpoints\":";
×
239
        body += std::to_string(info->num_checkpoints);
×
240
        body += ",\"uncompressed_size\":";
×
241
        body += std::to_string(info->uncompressed_size);
×
242
    }
243

244
    body += '}';
1!
245
    co_return HttpResponse::ok(body);
1!
246
}
6!
247

248
static std::vector<std::string> split_csv(std::string_view s) {
×
249
    std::vector<std::string> result;
×
250
    std::string token;
×
251
    for (char c : s) {
×
252
        if (c == ',') {
×
253
            if (!token.empty()) result.push_back(token);
×
254
            token.clear();
×
255
        } else {
256
            token += c;
×
257
        }
258
    }
259
    if (!token.empty()) result.push_back(token);
×
260
    return result;
×
261
}
×
262

263
static std::string format_in_clause(const std::string& field,
×
264
                                    const std::vector<std::string>& vals) {
265
    if (vals.size() == 1) return field + " == \"" + vals[0] + "\"";
×
266
    std::string s = field + " in [";
×
267
    for (std::size_t i = 0; i < vals.size(); ++i) {
×
268
        if (i > 0) s += ", ";
×
269
        s += "\"" + vals[i] + "\"";
×
270
    }
271
    s += "]";
×
272
    return s;
×
273
}
×
274

275
static std::optional<Query> build_query_from_params(const QueryParams& params) {
8✔
276
    std::string dsl;
8✔
277

278
    auto cat = params.get("cat");
8!
279
    if (!cat.empty()) {
8!
280
        auto vals = split_csv(cat);
×
281
        if (!vals.empty()) dsl += format_in_clause("cat", vals);
×
282
    }
×
283

284
    auto name = params.get("name");
8!
285
    if (!name.empty()) {
8!
286
        auto vals = split_csv(name);
×
287
        if (!vals.empty()) {
×
288
            if (!dsl.empty()) dsl += " and ";
×
289
            dsl += format_in_clause("name", vals);
×
290
        }
291
    }
×
292

293
    auto pid = params.get("pid");
8!
294
    if (!pid.empty()) {
8!
295
        if (!dsl.empty()) dsl += " and ";
×
296
        dsl += "pid == " + std::string(pid);
×
297
    }
298

299
    double ts_min = params.get_double("ts_min", 0);
8!
300
    double ts_max = params.get_double("ts_max", 0);
8!
301
    if (ts_min > 0) {
8!
302
        if (!dsl.empty()) dsl += " and ";
×
303
        dsl += "ts >= " + std::to_string(static_cast<uint64_t>(ts_min));
×
304
    }
305
    if (ts_max > 0) {
8!
306
        if (!dsl.empty()) dsl += " and ";
×
307
        dsl += "ts <= " + std::to_string(static_cast<uint64_t>(ts_max));
×
308
    }
309

310
    double dur_min = params.get_double("dur_min", 0);
8!
311
    double dur_max = params.get_double("dur_max", 0);
8!
312
    if (dur_min > 0) {
8!
313
        if (!dsl.empty()) dsl += " and ";
×
314
        dsl += "dur >= " + std::to_string(static_cast<uint64_t>(dur_min));
×
315
    }
316
    if (dur_max > 0) {
8!
317
        if (!dsl.empty()) dsl += " and ";
×
318
        dsl += "dur <= " + std::to_string(static_cast<uint64_t>(dur_max));
×
319
    }
320

321
    if (dsl.empty()) return std::nullopt;
8✔
322
    auto result = Query::from_string(dsl);
×
323
    if (!result) return std::nullopt;
×
324
    return std::move(*result);
×
325
}
8✔
326

327
static ViewDefinition build_view_from_params(const QueryParams& params) {
4✔
328
    ViewDefinition view;
4✔
329
    view.name = "api_query";
4!
330
    view.description = "HTTP API query";
4!
331

332
    auto q = build_query_from_params(params);
4!
333
    if (q) view.with_query(std::move(*q));
4!
334
    return view;
6✔
335
}
4!
336

337
// ============================================================================
338
// Shared helpers for event streaming endpoints
339
// ============================================================================
340

341
static std::vector<const TraceIndex::FileInfo*> resolve_target_files(
4✔
342
    TraceIndex& index, const QueryParams& params, double ts_min = 0,
343
    double ts_max = 0) {
344
    std::vector<const TraceIndex::FileInfo*> files;
4✔
345
    auto file_param = params.get("file");
4!
346
    if (!file_param.empty()) {
4!
347
        auto* f = index.find_file(std::string(file_param));
×
348
        if (f) files.push_back(f);
×
349
    } else {
350
        for (const auto& f : index.files()) {
8✔
351
            files.push_back(&f);
4!
352
        }
353
    }
354

355
    if (ts_min > 0 || ts_max > 0) {
4!
356
        std::vector<const TraceIndex::FileInfo*> filtered;
×
357
        filtered.reserve(files.size());
×
358
        for (auto* fi : files) {
×
359
            if (fi->is_small) {
×
360
                filtered.push_back(fi);
×
361
                continue;
×
362
            }
363
            if (fi->min_timestamp_us == 0 && fi->max_timestamp_us == 0) {
×
364
                filtered.push_back(fi);
×
365
                continue;
×
366
            }
367
            double fi_min = static_cast<double>(fi->min_timestamp_us);
×
368
            double fi_max = static_cast<double>(fi->max_timestamp_us);
×
369
            if (fi_max < ts_min || (ts_max > 0 && fi_min > ts_max)) continue;
×
370
            filtered.push_back(fi);
×
371
        }
372
        files = std::move(filtered);
×
373
    }
×
374

375
    return files;
6✔
376
}
2!
377

378
using StreamChunk = HttpResponse::StreamChunk;
379

380
static coro::AsyncGenerator<StreamChunk> stream_events(
20!
381
    std::vector<const TraceIndex::FileInfo*> files, ViewDefinition ev_view,
382
    std::optional<Query> query_opt, double ts_min, double ts_max,
383
    BloomFilterCache* bloom_cache, int limit) {
2!
384
    int emitted = 0;
2✔
385
    const Query* query_ptr = query_opt ? &*query_opt : nullptr;
2!
386

387
    for (auto* file_info : files) {
8✔
388
        if (limit > 0 && emitted >= limit) break;
2!
389

390
        if (file_info->is_small) {
2!
391
            std::vector<std::string> events;
2✔
392
            std::uint64_t scanned = 0;
2✔
393
            std::uint64_t matched = 0;
2✔
394
            co_await direct_scan_events(
8!
395
                file_info, query_ptr, ev_view.include_metadata, &events,
2✔
396
                &scanned, &matched, limit > 0 ? limit - emitted : 0);
2✔
397
            std::vector<std::string_view> views;
6✔
398
            for (const auto& event : events) {
66✔
399
                if (limit > 0 && emitted >= limit) break;
60!
400
                views.push_back(event);
60!
401
                emitted++;
60✔
402
            }
60✔
403
            if (!views.empty()) {
6!
404
                co_yield StreamChunk{views};
8!
405
            }
2✔
406
            continue;
2✔
407
        }
6✔
408

409
        if (file_info->uncompressed_size == 0 &&
×
410
            file_info->num_checkpoints == 0)
411
            continue;
412

413
        ViewBuilderInput builder_input;
414
        builder_input.with_view(ev_view)
×
415
            .with_file_path(file_info->path)
×
416
            .with_idx_path(file_info->has_bloom_data ? file_info->idx_path : "")
×
417
            .with_uncompressed_size(file_info->uncompressed_size)
×
418
            .with_num_checkpoints(file_info->num_checkpoints)
×
419
            .with_bloom_cache(bloom_cache)
×
420
            .with_time_range(ts_min, ts_max);
×
421

422
        ViewBuilderUtility builder;
×
423
        auto build_output = co_await builder.process(builder_input);
×
424
        if (!build_output.success || !build_output.file_may_match) continue;
×
425

426
        for (const auto& candidate : build_output.candidates) {
×
427
            if (limit > 0 && emitted >= limit) break;
×
428

429
            ViewReaderInput reader_input;
430
            reader_input.with_file_path(file_info->path)
×
431
                .with_idx_path(file_info->idx_path)
×
432
                .with_byte_range(candidate.start_byte, candidate.end_byte)
×
433
                .with_checkpoint_idx(candidate.checkpoint_idx)
×
434
                .with_view(ev_view);
×
435

436
            ViewReaderUtility reader;
×
437
            auto event_gen = reader.process(reader_input);
×
438
            while (auto batch = co_await event_gen.next()) {
×
439
                int count = std::min(
×
440
                    static_cast<int>(batch->events.size()),
441
                    limit > 0 ? limit - emitted
×
442
                              : static_cast<int>(batch->events.size()));
443
                if (count > 0) {
×
444
                    co_yield StreamChunk{std::span<const std::string_view>(
×
445
                        batch->events.data(), static_cast<std::size_t>(count))};
446
                    emitted += count;
447
                }
448
            }
×
449
        }
×
450
    }
6!
451
}
18!
452

453
// ============================================================================
454
// Event endpoints
455
// ============================================================================
456

457
// --- GET /api/v1/events ---
458
static coro::CoroTask<HttpResponse> handle_events(const HttpRequest& /*req*/,
6!
459
                                                  const QueryParams& params,
460
                                                  TraceIndex& index) {
1!
461
    int limit = params.get_int("limit", 1000);
1!
462
    if (limit <= 0) limit = 1000;
1!
463
    if (limit > 100000) limit = 100000;
1!
464

465
    double ts_min = params.get_double("ts_min", 0);
1!
466
    double ts_max = params.get_double("ts_max", 0);
1!
467
    auto files = resolve_target_files(index, params, ts_min, ts_max);
1!
468
    auto view = build_view_from_params(params);
1!
469
    auto query = build_query_from_params(params);
1!
470

471
    auto gen = std::make_unique<HttpResponse::StreamGenerator>(
2!
472
        stream_events(std::move(files), std::move(view), std::move(query),
1!
473
                      ts_min, ts_max, &index.bloom_cache(), limit));
1!
474

475
    auto resp = HttpResponse::streaming(std::move(gen));
1!
476
    resp.headers.push_back({"X-Limit", std::to_string(limit)});
1!
477
    co_return resp;
2✔
478
}
3!
479

480
// --- GET /api/v1/events/stream ---
481
static coro::CoroTask<HttpResponse> handle_events_stream(
6!
482
    const HttpRequest& /*req*/, const QueryParams& params, TraceIndex& index) {
1!
483
    double ts_min = params.get_double("ts_min", 0);
1!
484
    double ts_max = params.get_double("ts_max", 0);
1!
485
    auto files = resolve_target_files(index, params, ts_min, ts_max);
1!
486
    auto view = build_view_from_params(params);
1!
487
    auto query = build_query_from_params(params);
1!
488
    int limit = params.get_int("limit", 0);
1!
489

490
    auto gen = std::make_unique<HttpResponse::StreamGenerator>(
1!
491
        stream_events(std::move(files), std::move(view), std::move(query),
2!
492
                      ts_min, ts_max, &index.bloom_cache(), limit));
1✔
493

494
    co_return HttpResponse::streaming(std::move(gen));
2!
495
}
3!
496

497
// --- GET /api/v1/stats ---
498
static coro::CoroTask<HttpResponse> handle_stats(const HttpRequest& req,
6!
499
                                                 const QueryParams& /*params*/,
500
                                                 TraceIndex& index) {
1!
501
    static std::mutex cache_mutex;
1!
502
    static std::unordered_map<std::string, std::string,
1!
503
                              dftracer::utils::TransparentStringHash,
504
                              dftracer::utils::TransparentStringEqual>
505
        stats_cache;
1✔
506

507
    {
508
        std::lock_guard<std::mutex> lock(cache_mutex);
1!
509
        auto it = stats_cache.find(req.path);
1!
510
        if (it != stats_cache.end()) {
1!
511
            co_return HttpResponse::ok(it->second);
1!
512
        }
513
    }
1!
514

515
    std::vector<TraceStatistics> all_stats;
1✔
516
    std::size_t skipped_small = 0;
1✔
517

518
    std::vector<const TraceIndex::FileInfo*> stat_files;
1✔
519
    for (const auto& file_info : index.files()) {
2✔
520
        if (file_info.is_small) {
1!
521
            skipped_small++;
1✔
522
            continue;
1✔
523
        }
524
        if (!file_info.has_bloom_data) continue;
×
525
        stat_files.push_back(&file_info);
×
526
    }
1!
527

528
    if (stat_files.size() <= 1) {
1!
529
        for (auto* file_info : stat_files) {
1!
530
            StatisticsAggregatorInput agg_input;
531
            agg_input.file_path = file_info->path;
×
532
            agg_input.idx_path = file_info->idx_path;
×
533
            agg_input.index_dir = index.index_dir();
×
534

535
            StatisticsAggregatorUtility aggregator;
×
536
            auto stats = co_await aggregator.process(agg_input);
×
537
            if (stats.success) {
×
538
                all_stats.push_back(std::move(stats));
×
539
            }
540
        }
×
541
    } else {
1✔
542
        std::size_t num_workers =
543
            std::min(index.max_concurrent(), stat_files.size());
×
544
        auto* executor = Executor::current();
545

546
        auto file_chan = coro::make_channel<std::size_t>(num_workers * 2);
×
547
        auto stats_mutex = std::make_shared<std::mutex>();
×
548
        auto* all_stats_ptr = &all_stats;
549
        auto* stat_files_ptr = &stat_files;
550
        std::string index_dir = index.index_dir();
×
551
        const auto* index_dir_ptr = &index_dir;
552

553
        CoroScope scope(executor);
×
554

555
        scope.spawn([ch = file_chan->producer(), stat_files_ptr](
×
556
                        CoroScope&) mutable -> coro::CoroTask<void> {
×
557
            auto guard = ch.guard();
×
558
            for (std::size_t i = 0; i < stat_files_ptr->size(); ++i) {
×
559
                if (!co_await ch.send(i)) co_return;
×
560
            }
561
            co_return;
562
        });
×
563

564
        for (std::size_t w = 0; w < num_workers; ++w) {
×
565
            scope.spawn([file_chan, stat_files_ptr, stats_mutex, all_stats_ptr,
×
566
                         index_dir_ptr](CoroScope&) -> coro::CoroTask<void> {
×
567
                while (auto fi_opt = co_await file_chan->receive()) {
×
568
                    auto* file_info = (*stat_files_ptr)[*fi_opt];
569

570
                    StatisticsAggregatorInput agg_input;
571
                    agg_input.file_path = file_info->path;
×
572
                    agg_input.idx_path = file_info->idx_path;
×
573
                    agg_input.index_dir = *index_dir_ptr;
×
574

575
                    StatisticsAggregatorUtility aggregator;
×
576
                    auto stats = co_await aggregator.process(agg_input);
×
577

578
                    if (stats.success) {
×
579
                        std::lock_guard<std::mutex> lock(*stats_mutex);
×
580
                        all_stats_ptr->push_back(std::move(stats));
×
581
                    }
582
                }
×
583
                co_return;
584
            });
×
585
        }
586

587
        co_await scope.join();
×
588
    }
×
589

590
    std::uint64_t total_events = 0;
1✔
591
    std::size_t file_count = all_stats.size();
1✔
592
    for (const auto& s : all_stats) {
1!
593
        total_events += s.total_events();
×
594
    }
595

596
    std::string body;
1✔
597
    body.reserve(256 * all_stats.size() + 64);
1!
598
    body += "{\"file_count\":";
1!
599
    body += std::to_string(file_count);
1!
600
    body += ",\"total_events\":";
1!
601
    body += std::to_string(total_events);
1!
602
    body += ",\"skipped_small_files\":";
1!
603
    body += std::to_string(skipped_small);
1!
604
    body += ",\"files\":[";
1!
605
    for (std::size_t i = 0; i < all_stats.size(); ++i) {
1!
606
        if (i > 0) body += ',';
×
607
        body += all_stats[i].to_json();
×
608
    }
609
    body += "]}";
1!
610

611
    {
612
        std::lock_guard<std::mutex> lock(cache_mutex);
1!
613
        stats_cache.emplace(std::string(req.path), body);
1!
614
    }
1✔
615
    co_return HttpResponse::ok(body);
1!
616
}
3!
617

618
// --- GET /api/v1/info ---
619
static coro::CoroTask<HttpResponse> handle_info(const HttpRequest& /*req*/,
8!
620
                                                const QueryParams& /*params*/,
621
                                                TraceIndex& index) {
1!
622
    auto global_min = index.global_min_timestamp_us();
3!
623
    auto global_max = index.global_max_timestamp_us();
3!
624
    bool has_time_range =
3✔
625
        global_max > 0 &&
3!
626
        global_min != std::numeric_limits<std::uint64_t>::max();
627

628
    std::string body;
3✔
629
    body.reserve(256 * index.file_count() + 128);
3✔
630
    body += "{\"file_count\":";
1!
631
    body += std::to_string(index.file_count());
1!
632

633
    if (has_time_range) {
1!
634
        body += ",\"time_range\":{\"min_timestamp_us\":";
×
635
        body += std::to_string(global_min);
×
636
        body += ",\"max_timestamp_us\":";
×
637
        body += std::to_string(global_max);
×
638
        body += "}";
×
639
    }
640

641
    body += ",\"files\":[";
1✔
642
    bool first = true;
3✔
643
    for (const auto& f : index.files()) {
4✔
644
        if (!first) body += ',';
3!
645
        first = false;
3✔
646
        body += "{\"path\":\"";
3✔
647
        body += json_escape(f.path);
1!
648
        body += "\",\"has_bloom_data\":";
1!
649
        body += f.has_bloom_data ? "true" : "false";
1!
650
        body += ",\"has_checkpoint_index\":";
1!
651
        body += f.has_checkpoint_index ? "true" : "false";
1!
652
        body += ",\"is_small\":";
1!
653
        body += f.is_small ? "true" : "false";
1!
654
        if (f.min_timestamp_us > 0 || f.max_timestamp_us > 0) {
1!
655
            body += ",\"min_timestamp_us\":";
1!
656
            body += std::to_string(f.min_timestamp_us);
×
657
            body += ",\"max_timestamp_us\":";
×
658
            body += std::to_string(f.max_timestamp_us);
×
659
        }
660
        body += '}';
2✔
661
    }
1✔
662
    body += "]}";
1!
663
    co_return HttpResponse::ok(body);
2!
664
}
11!
665

666
void register_trace_api(Router& router, TraceIndex& index) {
4✔
667
    auto* index_ptr = &index;
4✔
668

669
    router.get(
4!
670
        "/api/v1/files",
2✔
671
        [index_ptr](const HttpRequest& req,
18!
672
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
2!
673
            co_return co_await handle_files(req, params, *index_ptr);
10!
674
        });
8!
675

676
    router.get(
4!
677
        "/api/v1/files/info",
2✔
678
        [index_ptr](const HttpRequest& req,
18!
679
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
2!
680
            co_return co_await handle_file_info(req, params, *index_ptr);
10!
681
        });
8!
682

683
    router.get(
4!
684
        "/api/v1/events",
2✔
685
        [index_ptr](const HttpRequest& req,
10!
686
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
1!
687
            co_return co_await handle_events(req, params, *index_ptr);
5!
688
        });
4!
689

690
    router.get(
4!
691
        "/api/v1/events/stream",
2✔
692
        [index_ptr](const HttpRequest& req,
10!
693
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
1!
694
            co_return co_await handle_events_stream(req, params, *index_ptr);
5!
695
        });
4!
696

697
    router.get(
4!
698
        "/api/v1/stats",
2✔
699
        [index_ptr](const HttpRequest& req,
10!
700
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
1!
701
            co_return co_await handle_stats(req, params, *index_ptr);
5!
702
        });
4!
703

704
    router.get(
4!
705
        "/api/v1/info",
2✔
706
        [index_ptr](const HttpRequest& req,
10!
707
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
1!
708
            co_return co_await handle_info(req, params, *index_ptr);
5!
709
        });
4!
710
}
4✔
711

712
}  // namespace dftracer::utils::server
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc