• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23531027933

25 Mar 2026 08:05AM UTC coverage: 48.592% (-1.5%) from 50.098%
23531027933

Pull #57

github

web-flow
Merge d1070e289 into 38f9f3616
Pull Request #57: feat(comparator): add pairwise traces comparator

18900 of 49456 branches covered (38.22%)

Branch coverage included in aggregate %.

1604 of 1954 new or added lines in 25 files covered. (82.09%)

3407 existing lines in 135 files now uncovered.

18487 of 27485 relevant lines covered (67.26%)

240991.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.03
/src/dftracer/utils/server/trace_api.cpp
1
#include <dftracer/utils/core/common/filesystem.h>
2
#include <dftracer/utils/core/common/logging.h>
3
#include <dftracer/utils/core/coro/channel.h>
4
#include <dftracer/utils/core/pipeline/executor.h>
5
#include <dftracer/utils/core/tasks/coro_scope.h>
6
#include <dftracer/utils/server/cursor.h>
7
#include <dftracer/utils/server/http_request.h>
8
#include <dftracer/utils/server/http_response.h>
9
#include <dftracer/utils/server/router.h>
10
#include <dftracer/utils/server/trace_api.h>
11
#include <dftracer/utils/server/trace_index.h>
12
#include <dftracer/utils/utilities/common/json/json_doc_guard.h>
13
#include <dftracer/utils/utilities/common/json/json_value.h>
14
#include <dftracer/utils/utilities/common/query/query.h>
15
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
16
#include <dftracer/utils/utilities/composites/dft/statistics/statistics_aggregator_utility.h>
17
#include <dftracer/utils/utilities/composites/dft/statistics/statistics_query_utility.h>
18
#include <dftracer/utils/utilities/composites/dft/views/view_builder_utility.h>
19
#include <dftracer/utils/utilities/composites/dft/views/view_definition.h>
20
#include <dftracer/utils/utilities/composites/dft/views/view_reader_utility.h>
21
#include <dftracer/utils/utilities/fileio/lines/sources/async_streaming_gz_line_generator.h>
22
#include <yyjson.h>
23

24
#include <atomic>
25
#include <cstddef>
26
#include <limits>
27
#include <mutex>
28
#include <string>
29
#include <unordered_map>
30
#include <unordered_set>
31
#include <vector>
32

33
namespace dftracer::utils::server {
34

35
using namespace dftracer::utils::utilities::composites::dft;
36
using namespace dftracer::utils::utilities::composites::dft::indexing;
37
using namespace dftracer::utils::utilities::composites::dft::statistics;
38
using namespace dftracer::utils::utilities::composites::dft::views;
39

40
// JSON-escape a string value (minimal: quotes, backslash, control chars).
41
static std::string json_escape(const std::string& s) {
4✔
42
    std::string out;
4✔
43
    out.reserve(s.size() + 2);
4!
44
    for (char c : s) {
460✔
45
        switch (c) {
456!
46
            case '"':
47
                out += "\\\"";
×
48
                break;
×
49
            case '\\':
50
                out += "\\\\";
×
51
                break;
×
52
            case '\n':
53
                out += "\\n";
×
54
                break;
×
55
            case '\r':
56
                out += "\\r";
×
57
                break;
×
58
            case '\t':
59
                out += "\\t";
×
60
                break;
×
61
            default:
62
                out += c;
456!
63
                break;
456✔
64
        }
65
    }
66
    return out;
4✔
67
}
4!
68

69
using dftracer::utils::utilities::common::json::JsonValue;
70

71
// Hash metadata types that need smart filtering (FH, HH, SH).
72
static const std::unordered_set<std::string> HASH_METADATA_NAMES = {"FH", "HH",
222!
73
                                                                    "SH"};
222!
74

75
using dftracer::utils::utilities::common::json::JsonDocGuard;
76
using dftracer::utils::utilities::common::query::Query;
77

78
/// Direct-scan a small file without any sidecar index.
79
/// Streams via async_streaming_gz_lines(), parses JSON, applies
80
/// predicate filters, collects matching events as raw JSON strings.
81
static coro::CoroTask<void> direct_scan_events(
266!
82
    const TraceIndex::FileInfo* file_info, const Query* query,
83
    bool include_metadata, std::vector<std::string>* collected_events,
84
    std::uint64_t* total_scanned, std::uint64_t* total_matched, int limit) {
2!
85
    using dftracer::utils::utilities::fileio::lines::sources::
86
        async_streaming_gz_lines;
87

88
    try {
89
        auto gen = async_streaming_gz_lines(file_info->path);
2!
90

91
        std::unordered_map<std::string, std::string> pending_metadata;
2✔
92
        std::unordered_set<std::string> emitted_hashes;
2✔
93

94
        while (auto line = co_await gen.next()) {
262!
95
            if (limit > 0 &&
64✔
96
                collected_events->size() >= static_cast<std::size_t>(limit)) {
12✔
97
                co_return;
1✔
98
            }
99
            if (line->content.empty()) continue;
63!
100

101
            JsonDocGuard guard{yyjson_read_opts(
126!
102
                const_cast<char*>(line->content.data()), line->content.size(),
63✔
103
                YYJSON_READ_NOFLAG, nullptr, nullptr)};
104
            if (!guard.doc) continue;
63✔
105

106
            yyjson_val* root = yyjson_doc_get_root(guard.doc);
60!
107
            if (root && yyjson_is_obj(root)) {
60!
108
                JsonValue json(root);
60!
109
                // line->content is a string_view valid only for this
110
                // iteration.  All storage into collected_events and
111
                // pending_metadata must copy to owning std::string.
112
                std::string_view ph = json["ph"].get<std::string_view>();
60!
113

114
                if (ph == "M" && include_metadata) {
60!
UNCOV
115
                    std::string name_str = json["name"].get<std::string>();
×
116

UNCOV
117
                    if (HASH_METADATA_NAMES.count(name_str)) {
×
UNCOV
118
                        auto args = json["args"];
×
UNCOV
119
                        if (args.exists()) {
×
UNCOV
120
                            auto val = args["value"];
×
UNCOV
121
                            if (val.exists()) {
×
UNCOV
122
                                std::string hash_val = val.get<std::string>();
×
UNCOV
123
                                if (!emitted_hashes.count(hash_val)) {
×
UNCOV
124
                                    pending_metadata[hash_val] =
×
UNCOV
125
                                        std::string(line->content.data(),
×
UNCOV
126
                                                    line->content.size());
×
UNCOV
127
                                }
×
UNCOV
128
                            }
×
UNCOV
129
                        }
×
UNCOV
130
                    } else {
×
UNCOV
131
                        collected_events->emplace_back(line->content.data(),
×
UNCOV
132
                                                       line->content.size());
×
UNCOV
133
                        (*total_matched)++;
×
134
                    }
135
                } else if (ph != "M") {
60!
136
                    (*total_scanned)++;
60✔
137
                    if (!query || query->evaluate(json)) {
60!
138
                        // Flush referenced hash metadata first
139
                        if (include_metadata) {
60!
140
                            auto args = json["args"];
60!
141
                            if (args.exists()) {
60!
142
                                static const char* hash_fields[] = {
143
                                    "hhash", "fhash", "shash"};
144
                                for (const char* field : hash_fields) {
240✔
145
                                    auto val = args[field];
180!
146
                                    if (!val.exists()) continue;
180!
UNCOV
147
                                    std::string hash_val =
×
UNCOV
148
                                        val.get<std::string>();
×
UNCOV
149
                                    if (emitted_hashes.count(hash_val))
×
UNCOV
150
                                        continue;
×
UNCOV
151
                                    auto it = pending_metadata.find(hash_val);
×
UNCOV
152
                                    if (it != pending_metadata.end()) {
×
UNCOV
153
                                        collected_events->push_back(
×
UNCOV
154
                                            std::move(it->second));
×
UNCOV
155
                                        (*total_matched)++;
×
UNCOV
156
                                        emitted_hashes.insert(hash_val);
×
UNCOV
157
                                        pending_metadata.erase(it);
×
UNCOV
158
                                    }
×
159
                                }
180!
160
                            }
60✔
161
                        }
60✔
162
                        collected_events->emplace_back(line->content.data(),
120!
163
                                                       line->content.size());
60✔
164
                        (*total_matched)++;
60✔
165
                    }
60✔
166
                }
60✔
167
            }
60✔
168
        }
65✔
169
    } catch (const std::exception& e) {
130!
UNCOV
170
        DFTRACER_UTILS_LOG_WARN("Direct scan failed for %s: %s",
×
171
                                file_info->path.c_str(), e.what());
UNCOV
172
    }
×
173

174
    co_return;
1✔
175
}
260✔
176

177
// --- GET /api/v1/files ---
178
static coro::CoroTask<HttpResponse> handle_files(const HttpRequest& /*req*/,
10!
179
                                                 const QueryParams& /*params*/,
180
                                                 TraceIndex& index) {
2!
181
    std::string body;
2✔
182
    body.reserve(128 * index.file_count() + 32);
2!
183
    body += "{\"files\":[";
2!
184
    bool first = true;
2✔
185
    for (const auto& f : index.files()) {
4!
186
        if (!first) body += ',';
2!
187
        first = false;
2✔
188
        body += "{\"path\":\"";
2!
189
        body += json_escape(f.path);
2!
190
        body += "\",\"has_bloom_data\":";
2!
191
        body += f.has_bloom_data ? "true" : "false";
2!
192
        body += ",\"has_checkpoint_index\":";
2!
193
        body += f.has_checkpoint_index ? "true" : "false";
2!
194
        body += ",\"is_small\":";
2!
195
        body += f.is_small ? "true" : "false";
2!
196
        body += '}';
2!
197
    }
2✔
198
    body += "],\"count\":";
2!
199
    body += std::to_string(index.file_count());
2!
200
    body += '}';
2!
201
    co_return HttpResponse::ok(body);
4!
202
}
2✔
203

204
// --- GET /api/v1/files/info ---
205
static coro::CoroTask<HttpResponse> handle_file_info(const HttpRequest& /*req*/,
14!
206
                                                     const QueryParams& params,
207
                                                     TraceIndex& index) {
2!
208
    auto file_param = params.get("file");
2!
209
    if (file_param.empty()) {
2✔
210
        co_return HttpResponse::bad_request("Missing required parameter: file");
3!
211
    }
212

213
    std::string file_path(file_param);
1!
214
    auto* info = index.find_file(file_path);
1!
215
    if (!info) {
1!
UNCOV
216
        co_return HttpResponse::not_found();
×
217
    }
218

219
    std::string body;
1✔
220
    body.reserve(512);
1!
221
    body += "{\"path\":\"";
1!
222
    body += json_escape(info->path);
1!
223
    body += "\",\"has_bloom_data\":";
1!
224
    body += info->has_bloom_data ? "true" : "false";
1!
225
    body += ",\"has_checkpoint_index\":";
1!
226
    body += info->has_checkpoint_index ? "true" : "false";
1!
227
    body += ",\"is_small\":";
1!
228
    body += info->is_small ? "true" : "false";
1!
229

230
    body += ",\"size_mb\":";
1!
231
    body += std::to_string(info->size_mb);
1!
232
    body += ",\"compressed_size\":";
1!
233
    body += std::to_string(info->compressed_size);
1!
234
    if (!info->is_small) {
1!
UNCOV
235
        body += ",\"num_lines\":";
×
UNCOV
236
        body += std::to_string(info->num_lines);
×
UNCOV
237
        body += ",\"num_checkpoints\":";
×
UNCOV
238
        body += std::to_string(info->num_checkpoints);
×
UNCOV
239
        body += ",\"uncompressed_size\":";
×
UNCOV
240
        body += std::to_string(info->uncompressed_size);
×
UNCOV
241
    }
×
242

243
    body += '}';
1!
244
    co_return HttpResponse::ok(body);
1!
245
}
2✔
246

247
static std::vector<std::string> split_csv(std::string_view s) {
×
248
    std::vector<std::string> result;
×
249
    std::string token;
×
250
    for (char c : s) {
×
251
        if (c == ',') {
×
252
            if (!token.empty()) result.push_back(token);
×
253
            token.clear();
×
UNCOV
254
        } else {
×
255
            token += c;
×
256
        }
257
    }
258
    if (!token.empty()) result.push_back(token);
×
259
    return result;
×
260
}
×
261

262
static std::string format_in_clause(const std::string& field,
×
263
                                    const std::vector<std::string>& vals) {
264
    if (vals.size() == 1) return field + " == \"" + vals[0] + "\"";
×
265
    std::string s = field + " in [";
×
266
    for (std::size_t i = 0; i < vals.size(); ++i) {
×
267
        if (i > 0) s += ", ";
×
268
        s += "\"" + vals[i] + "\"";
×
UNCOV
269
    }
×
270
    s += "]";
×
271
    return s;
×
272
}
×
273

274
static std::optional<Query> build_query_from_params(const QueryParams& params) {
4✔
275
    std::string dsl;
4✔
276

277
    auto cat = params.get("cat");
4!
278
    if (!cat.empty()) {
4!
279
        auto vals = split_csv(cat);
×
280
        if (!vals.empty()) dsl += format_in_clause("cat", vals);
×
281
    }
×
282

283
    auto name = params.get("name");
4!
284
    if (!name.empty()) {
4!
285
        auto vals = split_csv(name);
×
286
        if (!vals.empty()) {
×
287
            if (!dsl.empty()) dsl += " and ";
×
288
            dsl += format_in_clause("name", vals);
×
UNCOV
289
        }
×
290
    }
×
291

292
    auto pid = params.get("pid");
4!
293
    if (!pid.empty()) {
4!
294
        if (!dsl.empty()) dsl += " and ";
×
295
        dsl += "pid == " + std::string(pid);
×
UNCOV
296
    }
×
297

298
    double ts_min = params.get_double("ts_min", 0);
4!
299
    double ts_max = params.get_double("ts_max", 0);
4!
300
    if (ts_min > 0) {
4!
301
        if (!dsl.empty()) dsl += " and ";
×
302
        dsl += "ts >= " + std::to_string(static_cast<uint64_t>(ts_min));
×
UNCOV
303
    }
×
304
    if (ts_max > 0) {
4!
305
        if (!dsl.empty()) dsl += " and ";
×
306
        dsl += "ts <= " + std::to_string(static_cast<uint64_t>(ts_max));
×
UNCOV
307
    }
×
308

309
    double dur_min = params.get_double("dur_min", 0);
4!
310
    double dur_max = params.get_double("dur_max", 0);
4!
311
    if (dur_min > 0) {
4!
312
        if (!dsl.empty()) dsl += " and ";
×
313
        dsl += "dur >= " + std::to_string(static_cast<uint64_t>(dur_min));
×
UNCOV
314
    }
×
315
    if (dur_max > 0) {
4!
316
        if (!dsl.empty()) dsl += " and ";
×
317
        dsl += "dur <= " + std::to_string(static_cast<uint64_t>(dur_max));
×
UNCOV
318
    }
×
319

320
    if (dsl.empty()) return std::nullopt;
4!
321
    auto result = Query::from_string(dsl);
×
322
    if (!result) return std::nullopt;
×
323
    return std::move(*result);
×
324
}
4✔
325

326
static ViewDefinition build_view_from_params(const QueryParams& params) {
2✔
327
    ViewDefinition view;
2✔
328
    view.name = "api_query";
2!
329
    view.description = "HTTP API query";
2!
330

331
    auto q = build_query_from_params(params);
2!
332
    if (q) view.with_query(std::move(*q));
2!
333
    return view;
2✔
334
}
2!
335

336
// --- GET /api/v1/events ---
337
static coro::CoroTask<HttpResponse> handle_events(const HttpRequest& /*req*/,
9!
338
                                                  const QueryParams& params,
339
                                                  TraceIndex& index) {
1!
340
    int limit = params.get_int("limit", 1000);
1!
341
    if (limit <= 0) limit = 1000;
1!
342
    if (limit > 100000) limit = 100000;
1!
343

344
    auto view = build_view_from_params(params);
1!
345

346
    double query_ts_min = params.get_double("ts_min", 0);
1!
347
    double query_ts_max = params.get_double("ts_max", 0);
1!
348

349
    std::vector<const TraceIndex::FileInfo*> target_files;
1✔
350
    auto file_param = params.get("file");
1!
351
    if (!file_param.empty()) {
1!
UNCOV
352
        auto* f = index.find_file(std::string(file_param));
×
UNCOV
353
        if (f) target_files.push_back(f);
×
UNCOV
354
    } else {
×
355
        for (const auto& f : index.files()) {
2✔
356
            target_files.push_back(&f);
1!
357
        }
1✔
358
    }
359

360
    // File-level time range skip
361
    if (query_ts_min > 0 || query_ts_max > 0) {
1!
UNCOV
362
        std::vector<const TraceIndex::FileInfo*> filtered;
×
UNCOV
363
        filtered.reserve(target_files.size());
×
UNCOV
364
        for (auto* fi : target_files) {
×
UNCOV
365
            if (fi->is_small) {
×
UNCOV
366
                filtered.push_back(fi);
×
UNCOV
367
                continue;
×
368
            }
UNCOV
369
            if (fi->min_timestamp_us == 0 && fi->max_timestamp_us == 0) {
×
UNCOV
370
                filtered.push_back(fi);
×
UNCOV
371
                continue;
×
372
            }
UNCOV
373
            double fi_min = static_cast<double>(fi->min_timestamp_us);
×
UNCOV
374
            double fi_max = static_cast<double>(fi->max_timestamp_us);
×
UNCOV
375
            if (fi_max < query_ts_min ||
×
UNCOV
376
                (query_ts_max > 0 && fi_min > query_ts_max))
×
UNCOV
377
                continue;
×
UNCOV
378
            filtered.push_back(fi);
×
UNCOV
379
        }
×
UNCOV
380
        target_files = std::move(filtered);
×
UNCOV
381
    }
×
382

383
    auto query = build_query_from_params(params);
1!
384
    const Query* query_ptr = query ? &*query : nullptr;
1!
385

386
    std::vector<std::string> collected_events;
1✔
387
    std::uint64_t total_scanned = 0;
1✔
388
    std::uint64_t total_matched = 0;
1✔
389

390
    if (target_files.size() <= 1) {
1!
391
        bool limit_reached = false;
1✔
392
        for (auto* file_info : target_files) {
6✔
393
            if (limit_reached) break;
3!
394
            if (file_info->is_small) {
3!
395
                co_await direct_scan_events(
8!
396
                    file_info, query_ptr, view.include_metadata,
3✔
397
                    &collected_events, &total_scanned, &total_matched, limit);
3✔
398
                limit_reached =
1✔
399
                    collected_events.size() >= static_cast<std::size_t>(limit);
1✔
400
            } else {
1✔
UNCOV
401
                if (file_info->uncompressed_size == 0 &&
×
UNCOV
402
                    file_info->num_checkpoints == 0)
×
UNCOV
403
                    continue;
×
404

UNCOV
405
                ViewBuilderInput builder_input;
×
UNCOV
406
                builder_input.with_view(view)
×
UNCOV
407
                    .with_file_path(file_info->path)
×
UNCOV
408
                    .with_idx_path(
×
UNCOV
409
                        file_info->has_bloom_data ? file_info->idx_path : "")
×
UNCOV
410
                    .with_uncompressed_size(file_info->uncompressed_size)
×
UNCOV
411
                    .with_num_checkpoints(file_info->num_checkpoints)
×
UNCOV
412
                    .with_bloom_cache(&index.bloom_cache())
×
UNCOV
413
                    .with_time_range(query_ts_min, query_ts_max);
×
414

UNCOV
415
                ViewBuilderUtility builder;
×
UNCOV
416
                auto build_output = co_await builder.process(builder_input);
×
UNCOV
417
                if (!build_output.success || !build_output.file_may_match)
×
UNCOV
418
                    continue;
×
419

UNCOV
420
                for (const auto& candidate : build_output.candidates) {
×
UNCOV
421
                    if (limit_reached) break;
×
422

UNCOV
423
                    ViewReaderInput reader_input;
×
UNCOV
424
                    reader_input.with_file_path(file_info->path)
×
UNCOV
425
                        .with_idx_path(file_info->idx_path)
×
UNCOV
426
                        .with_byte_range(candidate.start_byte,
×
UNCOV
427
                                         candidate.end_byte)
×
UNCOV
428
                        .with_checkpoint_idx(candidate.checkpoint_idx)
×
UNCOV
429
                        .with_view(view);
×
430

UNCOV
431
                    ViewReaderUtility reader;
×
UNCOV
432
                    auto gen = reader.process(reader_input);
×
UNCOV
433
                    while (auto batch = co_await gen.next()) {
×
UNCOV
434
                        total_scanned += batch->events_scanned;
×
UNCOV
435
                        total_matched += batch->events_matched;
×
UNCOV
436
                        for (auto& event : batch->events) {
×
UNCOV
437
                            if (collected_events.size() >=
×
UNCOV
438
                                static_cast<std::size_t>(limit)) {
×
UNCOV
439
                                limit_reached = true;
×
UNCOV
440
                                break;
×
441
                            }
UNCOV
442
                            collected_events.push_back(std::move(event));
×
UNCOV
443
                        }
×
UNCOV
444
                        if (limit_reached) break;
×
UNCOV
445
                    }
×
UNCOV
446
                }
×
UNCOV
447
            }
×
448
        }
1✔
449
    } else {
3✔
UNCOV
450
        std::size_t num_workers =
×
UNCOV
451
            std::min(index.max_concurrent(), target_files.size());
×
UNCOV
452
        auto* executor = Executor::current();
×
453

UNCOV
454
        auto file_chan = coro::make_channel<std::size_t>(num_workers * 2);
×
UNCOV
455
        auto collected_mutex = std::make_shared<std::mutex>();
×
UNCOV
456
        auto remaining = std::make_shared<std::atomic<int>>(limit);
×
UNCOV
457
        auto scanned_atomic = std::make_shared<std::atomic<std::uint64_t>>(0);
×
UNCOV
458
        auto matched_atomic = std::make_shared<std::atomic<std::uint64_t>>(0);
×
459

UNCOV
460
        auto* target_files_ptr = &target_files;
×
UNCOV
461
        auto* collected_ptr = &collected_events;
×
UNCOV
462
        auto* view_ptr = &view;
×
UNCOV
463
        auto* bloom_cache_ptr = &index.bloom_cache();
×
UNCOV
464
        double ev_ts_min = query_ts_min;
×
UNCOV
465
        double ev_ts_max = query_ts_max;
×
466

UNCOV
467
        CoroScope scope(executor);
×
468

469
        scope.spawn([ch = file_chan->producer(), target_files_ptr](
×
UNCOV
470
                        CoroScope&) mutable -> coro::CoroTask<void> {
×
UNCOV
471
            auto guard = ch.guard();
×
UNCOV
472
            for (std::size_t i = 0; i < target_files_ptr->size(); ++i) {
×
UNCOV
473
                if (!co_await ch.send(i)) co_return;
×
UNCOV
474
            }
×
UNCOV
475
            co_return;
×
476
        });
×
477

UNCOV
478
        for (std::size_t w = 0; w < num_workers; ++w) {
×
479
            scope.spawn([file_chan, target_files_ptr, collected_mutex,
×
UNCOV
480
                         collected_ptr, remaining, scanned_atomic,
×
UNCOV
481
                         matched_atomic, query_ptr, view_ptr, bloom_cache_ptr,
×
UNCOV
482
                         ev_ts_min,
×
UNCOV
483
                         ev_ts_max](CoroScope&) -> coro::CoroTask<void> {
×
UNCOV
484
                while (auto fi_opt = co_await file_chan->receive()) {
×
UNCOV
485
                    if (remaining->load(std::memory_order_relaxed) <= 0)
×
UNCOV
486
                        co_return;
×
UNCOV
487
                    auto* file_info = (*target_files_ptr)[*fi_opt];
×
488

UNCOV
489
                    if (file_info->is_small) {
×
UNCOV
490
                        std::vector<std::string> local_events;
×
UNCOV
491
                        std::uint64_t local_scanned = 0;
×
UNCOV
492
                        std::uint64_t local_matched = 0;
×
UNCOV
493
                        int local_limit =
×
UNCOV
494
                            remaining->load(std::memory_order_relaxed);
×
UNCOV
495
                        if (local_limit <= 0) co_return;
×
UNCOV
496
                        co_await direct_scan_events(
×
UNCOV
497
                            file_info, query_ptr, view_ptr->include_metadata,
×
498
                            &local_events, &local_scanned, &local_matched,
UNCOV
499
                            local_limit);
×
UNCOV
500
                        scanned_atomic->fetch_add(local_scanned);
×
UNCOV
501
                        matched_atomic->fetch_add(local_matched);
×
UNCOV
502
                        if (!local_events.empty()) {
×
UNCOV
503
                            std::lock_guard<std::mutex> lock(*collected_mutex);
×
UNCOV
504
                            for (auto& ev : local_events) {
×
UNCOV
505
                                collected_ptr->push_back(std::move(ev));
×
UNCOV
506
                            }
×
UNCOV
507
                            remaining->fetch_sub(
×
UNCOV
508
                                static_cast<int>(local_events.size()));
×
UNCOV
509
                        }
×
UNCOV
510
                    } else {
×
UNCOV
511
                        if (file_info->uncompressed_size == 0 &&
×
UNCOV
512
                            file_info->num_checkpoints == 0)
×
UNCOV
513
                            continue;
×
514

UNCOV
515
                        ViewBuilderInput builder_input;
×
UNCOV
516
                        builder_input.with_view(*view_ptr)
×
UNCOV
517
                            .with_file_path(file_info->path)
×
UNCOV
518
                            .with_idx_path(file_info->has_bloom_data
×
UNCOV
519
                                               ? file_info->idx_path
×
UNCOV
520
                                               : "")
×
UNCOV
521
                            .with_uncompressed_size(
×
UNCOV
522
                                file_info->uncompressed_size)
×
UNCOV
523
                            .with_num_checkpoints(file_info->num_checkpoints)
×
UNCOV
524
                            .with_bloom_cache(bloom_cache_ptr)
×
UNCOV
525
                            .with_time_range(ev_ts_min, ev_ts_max);
×
526

UNCOV
527
                        ViewBuilderUtility builder;
×
UNCOV
528
                        auto build_output =
×
UNCOV
529
                            co_await builder.process(builder_input);
×
UNCOV
530
                        if (!build_output.success ||
×
UNCOV
531
                            !build_output.file_may_match)
×
UNCOV
532
                            continue;
×
533

UNCOV
534
                        for (const auto& candidate : build_output.candidates) {
×
UNCOV
535
                            if (remaining->load(std::memory_order_relaxed) <= 0)
×
UNCOV
536
                                break;
×
537

UNCOV
538
                            ViewReaderInput reader_input;
×
UNCOV
539
                            reader_input.with_file_path(file_info->path)
×
UNCOV
540
                                .with_idx_path(file_info->idx_path)
×
UNCOV
541
                                .with_byte_range(candidate.start_byte,
×
UNCOV
542
                                                 candidate.end_byte)
×
UNCOV
543
                                .with_checkpoint_idx(candidate.checkpoint_idx)
×
UNCOV
544
                                .with_view(*view_ptr);
×
545

UNCOV
546
                            ViewReaderUtility reader;
×
UNCOV
547
                            auto gen = reader.process(reader_input);
×
UNCOV
548
                            while (auto batch = co_await gen.next()) {
×
UNCOV
549
                                scanned_atomic->fetch_add(
×
UNCOV
550
                                    batch->events_scanned);
×
UNCOV
551
                                matched_atomic->fetch_add(
×
UNCOV
552
                                    batch->events_matched);
×
UNCOV
553
                                if (!batch->events.empty()) {
×
UNCOV
554
                                    std::lock_guard<std::mutex> lock(
×
UNCOV
555
                                        *collected_mutex);
×
UNCOV
556
                                    for (auto& event : batch->events) {
×
UNCOV
557
                                        collected_ptr->push_back(
×
UNCOV
558
                                            std::move(event));
×
UNCOV
559
                                    }
×
UNCOV
560
                                    remaining->fetch_sub(
×
UNCOV
561
                                        static_cast<int>(batch->events.size()));
×
UNCOV
562
                                }
×
UNCOV
563
                            }
×
UNCOV
564
                        }
×
UNCOV
565
                    }
×
UNCOV
566
                }
×
UNCOV
567
                co_return;
×
568
            });
×
UNCOV
569
        }
×
570

UNCOV
571
        co_await scope.join();
×
572

UNCOV
573
        total_scanned = scanned_atomic->load();
×
UNCOV
574
        total_matched = matched_atomic->load();
×
UNCOV
575
        if (collected_events.size() > static_cast<std::size_t>(limit)) {
×
UNCOV
576
            collected_events.resize(static_cast<std::size_t>(limit));
×
UNCOV
577
        }
×
UNCOV
578
    }
×
579

580
    std::size_t body_size = 64;
1✔
581
    for (const auto& ev : collected_events) body_size += ev.size() + 1;
11✔
582
    std::string body;
1✔
583
    body.reserve(body_size);
1!
584
    body += "{\"events\":[";
1!
585
    for (std::size_t i = 0; i < collected_events.size(); ++i) {
11✔
586
        if (i > 0) body += ',';
10!
587
        body += collected_events[i];
10!
588
    }
10✔
589
    body += "],\"total_scanned\":";
1!
590
    body += std::to_string(total_scanned);
1!
591
    body += ",\"total_matched\":";
1!
592
    body += std::to_string(total_matched);
1!
593
    body += ",\"count\":";
1!
594
    body += std::to_string(collected_events.size());
1!
595
    body += '}';
1!
596

597
    co_return HttpResponse::ok(body);
1!
598
}
5✔
599

600
// --- GET /api/v1/events/stream ---
601
// Returns all matching events as NDJSON (newline-delimited JSON).
602
static coro::CoroTask<HttpResponse> handle_events_stream(
9!
603
    const HttpRequest& /*req*/, const QueryParams& params, TraceIndex& index) {
1!
604
    auto view = build_view_from_params(params);
1!
605

606
    double stream_ts_min = params.get_double("ts_min", 0);
1!
607
    double stream_ts_max = params.get_double("ts_max", 0);
1!
608

609
    std::vector<const TraceIndex::FileInfo*> target_files;
1✔
610
    auto file_param = params.get("file");
1!
611
    if (!file_param.empty()) {
1!
UNCOV
612
        auto* f = index.find_file(std::string(file_param));
×
UNCOV
613
        if (f) target_files.push_back(f);
×
UNCOV
614
    } else {
×
615
        for (const auto& f : index.files()) {
2✔
616
            target_files.push_back(&f);
1!
617
        }
1✔
618
    }
619

620
    // File-level time range skip
621
    if (stream_ts_min > 0 || stream_ts_max > 0) {
1!
UNCOV
622
        std::vector<const TraceIndex::FileInfo*> filtered;
×
UNCOV
623
        filtered.reserve(target_files.size());
×
UNCOV
624
        for (auto* fi : target_files) {
×
UNCOV
625
            if (fi->is_small) {
×
UNCOV
626
                filtered.push_back(fi);
×
UNCOV
627
                continue;
×
628
            }
UNCOV
629
            if (fi->min_timestamp_us == 0 && fi->max_timestamp_us == 0) {
×
UNCOV
630
                filtered.push_back(fi);
×
UNCOV
631
                continue;
×
632
            }
UNCOV
633
            double fi_min = static_cast<double>(fi->min_timestamp_us);
×
UNCOV
634
            double fi_max = static_cast<double>(fi->max_timestamp_us);
×
UNCOV
635
            if (fi_max < stream_ts_min ||
×
UNCOV
636
                (stream_ts_max > 0 && fi_min > stream_ts_max))
×
UNCOV
637
                continue;
×
UNCOV
638
            filtered.push_back(fi);
×
UNCOV
639
        }
×
UNCOV
640
        target_files = std::move(filtered);
×
UNCOV
641
    }
×
642

643
    auto stream_query = build_query_from_params(params);
1!
644
    const Query* stream_query_ptr = stream_query ? &*stream_query : nullptr;
1!
645

646
    std::string ndjson_body;
1✔
647

648
    if (target_files.size() <= 1) {
1!
649
        ndjson_body.reserve(64 * 1024);
1!
650
        for (auto* file_info : target_files) {
6✔
651
            if (file_info->is_small) {
3!
652
                std::vector<std::string> events;
3✔
653
                std::uint64_t scanned = 0;
3✔
654
                std::uint64_t matched = 0;
3✔
655
                co_await direct_scan_events(file_info, stream_query_ptr,
8!
656
                                            view.include_metadata, &events,
3✔
657
                                            &scanned, &matched, 0);
658
                for (const auto& event : events) {
51✔
659
                    ndjson_body += event;
50!
660
                    ndjson_body += '\n';
50!
661
                }
50✔
662
            } else {
1!
UNCOV
663
                if (file_info->uncompressed_size == 0 &&
×
UNCOV
664
                    file_info->num_checkpoints == 0)
×
UNCOV
665
                    continue;
×
666

UNCOV
667
                ViewBuilderInput builder_input;
×
UNCOV
668
                builder_input.with_view(view)
×
UNCOV
669
                    .with_file_path(file_info->path)
×
UNCOV
670
                    .with_idx_path(
×
UNCOV
671
                        file_info->has_bloom_data ? file_info->idx_path : "")
×
UNCOV
672
                    .with_uncompressed_size(file_info->uncompressed_size)
×
UNCOV
673
                    .with_num_checkpoints(file_info->num_checkpoints)
×
UNCOV
674
                    .with_bloom_cache(&index.bloom_cache())
×
UNCOV
675
                    .with_time_range(stream_ts_min, stream_ts_max);
×
676

UNCOV
677
                ViewBuilderUtility builder;
×
UNCOV
678
                auto build_output = co_await builder.process(builder_input);
×
UNCOV
679
                if (!build_output.success || !build_output.file_may_match)
×
UNCOV
680
                    continue;
×
681

UNCOV
682
                for (const auto& candidate : build_output.candidates) {
×
UNCOV
683
                    ViewReaderInput reader_input;
×
UNCOV
684
                    reader_input.with_file_path(file_info->path)
×
UNCOV
685
                        .with_idx_path(file_info->idx_path)
×
UNCOV
686
                        .with_byte_range(candidate.start_byte,
×
UNCOV
687
                                         candidate.end_byte)
×
UNCOV
688
                        .with_checkpoint_idx(candidate.checkpoint_idx)
×
UNCOV
689
                        .with_view(view);
×
690

UNCOV
691
                    ViewReaderUtility reader;
×
UNCOV
692
                    auto gen = reader.process(reader_input);
×
UNCOV
693
                    while (auto batch = co_await gen.next()) {
×
UNCOV
694
                        for (const auto& event : batch->events) {
×
UNCOV
695
                            ndjson_body += event;
×
UNCOV
696
                            ndjson_body += '\n';
×
UNCOV
697
                        }
×
UNCOV
698
                    }
×
UNCOV
699
                }
×
UNCOV
700
            }
×
701
        }
1✔
702
    } else {
1✔
UNCOV
703
        std::size_t num_workers =
×
UNCOV
704
            std::min(index.max_concurrent(), target_files.size());
×
UNCOV
705
        auto* executor = Executor::current();
×
706

UNCOV
707
        auto file_chan = coro::make_channel<std::size_t>(num_workers * 2);
×
UNCOV
708
        auto body_mutex = std::make_shared<std::mutex>();
×
UNCOV
709
        auto* ndjson_ptr = &ndjson_body;
×
UNCOV
710
        auto* target_files_ptr = &target_files;
×
UNCOV
711
        auto* view_ptr = &view;
×
UNCOV
712
        auto* bloom_cache_ptr = &index.bloom_cache();
×
UNCOV
713
        double st_ts_min = stream_ts_min;
×
UNCOV
714
        double st_ts_max = stream_ts_max;
×
715

UNCOV
716
        ndjson_body.reserve(64 * 1024);
×
717

UNCOV
718
        CoroScope scope(executor);
×
719

720
        scope.spawn([ch = file_chan->producer(), target_files_ptr](
×
UNCOV
721
                        CoroScope&) mutable -> coro::CoroTask<void> {
×
UNCOV
722
            auto guard = ch.guard();
×
UNCOV
723
            for (std::size_t i = 0; i < target_files_ptr->size(); ++i) {
×
UNCOV
724
                if (!co_await ch.send(i)) co_return;
×
UNCOV
725
            }
×
UNCOV
726
            co_return;
×
727
        });
×
728

UNCOV
729
        for (std::size_t w = 0; w < num_workers; ++w) {
×
730
            scope.spawn([file_chan, target_files_ptr, body_mutex, ndjson_ptr,
×
UNCOV
731
                         stream_query_ptr, view_ptr, bloom_cache_ptr, st_ts_min,
×
UNCOV
732
                         st_ts_max](CoroScope&) -> coro::CoroTask<void> {
×
UNCOV
733
                while (auto fi_opt = co_await file_chan->receive()) {
×
UNCOV
734
                    auto* file_info = (*target_files_ptr)[*fi_opt];
×
UNCOV
735
                    std::string local_buf;
×
736

UNCOV
737
                    if (file_info->is_small) {
×
UNCOV
738
                        std::vector<std::string> events;
×
UNCOV
739
                        std::uint64_t scanned = 0;
×
UNCOV
740
                        std::uint64_t matched = 0;
×
UNCOV
741
                        co_await direct_scan_events(file_info, stream_query_ptr,
×
UNCOV
742
                                                    view_ptr->include_metadata,
×
743
                                                    &events, &scanned, &matched,
744
                                                    0);
UNCOV
745
                        for (const auto& event : events) {
×
UNCOV
746
                            local_buf += event;
×
UNCOV
747
                            local_buf += '\n';
×
UNCOV
748
                        }
×
UNCOV
749
                    } else {
×
UNCOV
750
                        if (file_info->uncompressed_size == 0 &&
×
UNCOV
751
                            file_info->num_checkpoints == 0)
×
UNCOV
752
                            continue;
×
753

UNCOV
754
                        ViewBuilderInput builder_input;
×
UNCOV
755
                        builder_input.with_view(*view_ptr)
×
UNCOV
756
                            .with_file_path(file_info->path)
×
UNCOV
757
                            .with_idx_path(file_info->has_bloom_data
×
UNCOV
758
                                               ? file_info->idx_path
×
UNCOV
759
                                               : "")
×
UNCOV
760
                            .with_uncompressed_size(
×
UNCOV
761
                                file_info->uncompressed_size)
×
UNCOV
762
                            .with_num_checkpoints(file_info->num_checkpoints)
×
UNCOV
763
                            .with_bloom_cache(bloom_cache_ptr)
×
UNCOV
764
                            .with_time_range(st_ts_min, st_ts_max);
×
765

UNCOV
766
                        ViewBuilderUtility builder;
×
UNCOV
767
                        auto build_output =
×
UNCOV
768
                            co_await builder.process(builder_input);
×
UNCOV
769
                        if (!build_output.success ||
×
UNCOV
770
                            !build_output.file_may_match)
×
UNCOV
771
                            continue;
×
772

UNCOV
773
                        for (const auto& candidate : build_output.candidates) {
×
UNCOV
774
                            ViewReaderInput reader_input;
×
UNCOV
775
                            reader_input.with_file_path(file_info->path)
×
UNCOV
776
                                .with_idx_path(file_info->idx_path)
×
UNCOV
777
                                .with_byte_range(candidate.start_byte,
×
UNCOV
778
                                                 candidate.end_byte)
×
UNCOV
779
                                .with_checkpoint_idx(candidate.checkpoint_idx)
×
UNCOV
780
                                .with_view(*view_ptr);
×
781

UNCOV
782
                            ViewReaderUtility reader;
×
UNCOV
783
                            auto gen = reader.process(reader_input);
×
UNCOV
784
                            while (auto batch = co_await gen.next()) {
×
UNCOV
785
                                for (const auto& event : batch->events) {
×
UNCOV
786
                                    local_buf += event;
×
UNCOV
787
                                    local_buf += '\n';
×
UNCOV
788
                                }
×
UNCOV
789
                            }
×
UNCOV
790
                        }
×
UNCOV
791
                    }
×
792

UNCOV
793
                    if (!local_buf.empty()) {
×
UNCOV
794
                        std::lock_guard<std::mutex> lock(*body_mutex);
×
UNCOV
795
                        ndjson_ptr->append(local_buf);
×
UNCOV
796
                    }
×
UNCOV
797
                }
×
UNCOV
798
                co_return;
×
799
            });
×
UNCOV
800
        }
×
801

UNCOV
802
        co_await scope.join();
×
UNCOV
803
    }
×
804

805
    co_return HttpResponse::ok(ndjson_body, "application/x-ndjson");
1!
806
}
5✔
807

808
// --- GET /api/v1/stats ---
809
static coro::CoroTask<HttpResponse> handle_stats(const HttpRequest& /*req*/,
5!
810
                                                 const QueryParams& /*params*/,
811
                                                 TraceIndex& index) {
1!
812
    std::vector<TraceStatistics> all_stats;
1✔
813
    std::size_t skipped_small = 0;
1✔
814

815
    std::vector<const TraceIndex::FileInfo*> stat_files;
1✔
816
    for (const auto& file_info : index.files()) {
2✔
817
        if (file_info.is_small) {
1!
818
            skipped_small++;
1✔
819
            continue;
1✔
820
        }
UNCOV
821
        if (!file_info.has_bloom_data) continue;
×
UNCOV
822
        stat_files.push_back(&file_info);
×
823
    }
1!
824

825
    if (stat_files.size() <= 1) {
1!
826
        for (auto* file_info : stat_files) {
1!
UNCOV
827
            StatisticsAggregatorInput agg_input;
×
UNCOV
828
            agg_input.file_path = file_info->path;
×
UNCOV
829
            agg_input.idx_path = file_info->idx_path;
×
UNCOV
830
            agg_input.index_dir = index.index_dir();
×
831

UNCOV
832
            StatisticsAggregatorUtility aggregator;
×
833
            auto stats = co_await aggregator.process(agg_input);
1!
UNCOV
834
            if (stats.success) {
×
UNCOV
835
                all_stats.push_back(std::move(stats));
×
UNCOV
836
            }
×
UNCOV
837
        }
×
838
    } else {
1✔
UNCOV
839
        std::size_t num_workers =
×
UNCOV
840
            std::min(index.max_concurrent(), stat_files.size());
×
UNCOV
841
        auto* executor = Executor::current();
×
842

UNCOV
843
        auto file_chan = coro::make_channel<std::size_t>(num_workers * 2);
×
UNCOV
844
        auto stats_mutex = std::make_shared<std::mutex>();
×
UNCOV
845
        auto* all_stats_ptr = &all_stats;
×
UNCOV
846
        auto* stat_files_ptr = &stat_files;
×
UNCOV
847
        std::string index_dir = index.index_dir();
×
848

UNCOV
849
        CoroScope scope(executor);
×
850

851
        scope.spawn([ch = file_chan->producer(), stat_files_ptr](
×
UNCOV
852
                        CoroScope&) mutable -> coro::CoroTask<void> {
×
UNCOV
853
            auto guard = ch.guard();
×
UNCOV
854
            for (std::size_t i = 0; i < stat_files_ptr->size(); ++i) {
×
UNCOV
855
                if (!co_await ch.send(i)) co_return;
×
UNCOV
856
            }
×
UNCOV
857
            co_return;
×
858
        });
×
859

UNCOV
860
        for (std::size_t w = 0; w < num_workers; ++w) {
×
861
            scope.spawn([file_chan, stat_files_ptr, stats_mutex, all_stats_ptr,
×
UNCOV
862
                         index_dir](CoroScope&) -> coro::CoroTask<void> {
×
UNCOV
863
                while (auto fi_opt = co_await file_chan->receive()) {
×
UNCOV
864
                    auto* file_info = (*stat_files_ptr)[*fi_opt];
×
865

UNCOV
866
                    StatisticsAggregatorInput agg_input;
×
UNCOV
867
                    agg_input.file_path = file_info->path;
×
UNCOV
868
                    agg_input.idx_path = file_info->idx_path;
×
UNCOV
869
                    agg_input.index_dir = index_dir;
×
870

UNCOV
871
                    StatisticsAggregatorUtility aggregator;
×
UNCOV
872
                    auto stats = co_await aggregator.process(agg_input);
×
873

UNCOV
874
                    if (stats.success) {
×
UNCOV
875
                        std::lock_guard<std::mutex> lock(*stats_mutex);
×
UNCOV
876
                        all_stats_ptr->push_back(std::move(stats));
×
UNCOV
877
                    }
×
UNCOV
878
                }
×
UNCOV
879
                co_return;
×
880
            });
×
UNCOV
881
        }
×
882

UNCOV
883
        co_await scope.join();
×
UNCOV
884
    }
×
885

886
    std::uint64_t total_events = 0;
1✔
887
    std::size_t file_count = all_stats.size();
1✔
888
    for (const auto& s : all_stats) {
1!
UNCOV
889
        total_events += s.total_events();
×
UNCOV
890
    }
×
891

892
    std::string body;
1✔
893
    body.reserve(256 * all_stats.size() + 64);
1!
894
    body += "{\"file_count\":";
1!
895
    body += std::to_string(file_count);
1!
896
    body += ",\"total_events\":";
1!
897
    body += std::to_string(total_events);
1!
898
    body += ",\"skipped_small_files\":";
1!
899
    body += std::to_string(skipped_small);
1!
900
    body += ",\"files\":[";
1!
901
    for (std::size_t i = 0; i < all_stats.size(); ++i) {
1!
UNCOV
902
        if (i > 0) body += ',';
×
UNCOV
903
        body += all_stats[i].to_json();
×
UNCOV
904
    }
×
905
    body += "]}";
1!
906

907
    co_return HttpResponse::ok(body);
1!
908
}
1!
909

910
// --- GET /api/v1/info ---
911
static coro::CoroTask<HttpResponse> handle_info(const HttpRequest& /*req*/,
7!
912
                                                const QueryParams& /*params*/,
913
                                                TraceIndex& index) {
1!
914
    auto global_min = index.global_min_timestamp_us();
3!
915
    auto global_max = index.global_max_timestamp_us();
3!
916
    bool has_time_range =
3✔
917
        global_max > 0 &&
3!
UNCOV
918
        global_min != std::numeric_limits<std::uint64_t>::max();
×
919

920
    std::string body;
3✔
921
    body.reserve(256 * index.file_count() + 128);
3✔
922
    body += "{\"file_count\":";
1!
923
    body += std::to_string(index.file_count());
1!
924

925
    if (has_time_range) {
1!
UNCOV
926
        body += ",\"time_range\":{\"min_timestamp_us\":";
×
UNCOV
927
        body += std::to_string(global_min);
×
UNCOV
928
        body += ",\"max_timestamp_us\":";
×
UNCOV
929
        body += std::to_string(global_max);
×
UNCOV
930
        body += "}";
×
UNCOV
931
    }
×
932

933
    body += ",\"files\":[";
1✔
934
    bool first = true;
3✔
935
    for (const auto& f : index.files()) {
4✔
936
        if (!first) body += ',';
3!
937
        first = false;
3✔
938
        body += "{\"path\":\"";
3✔
939
        body += json_escape(f.path);
1!
940
        body += "\",\"has_bloom_data\":";
1!
941
        body += f.has_bloom_data ? "true" : "false";
1!
942
        body += ",\"has_checkpoint_index\":";
1!
943
        body += f.has_checkpoint_index ? "true" : "false";
1!
944
        body += ",\"is_small\":";
1!
945
        body += f.is_small ? "true" : "false";
1!
946
        if (f.min_timestamp_us > 0 || f.max_timestamp_us > 0) {
1!
947
            body += ",\"min_timestamp_us\":";
1!
UNCOV
948
            body += std::to_string(f.min_timestamp_us);
×
UNCOV
949
            body += ",\"max_timestamp_us\":";
×
UNCOV
950
            body += std::to_string(f.max_timestamp_us);
×
UNCOV
951
        }
×
UNCOV
952
        body += '}';
✔
953
    }
1✔
954
    body += "]}";
1!
955
    co_return HttpResponse::ok(body);
2!
956
}
9✔
957

958
void register_trace_api(Router& router, TraceIndex& index) {
2✔
959
    auto* index_ptr = &index;
2✔
960

961
    router.get(
2!
962
        "/api/v1/files",
2✔
963
        [index_ptr](const HttpRequest& req,
16!
964
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
2!
965
            co_return co_await handle_files(req, params, *index_ptr);
10!
966
        });
4✔
967

968
    router.get(
2!
969
        "/api/v1/files/info",
2✔
970
        [index_ptr](const HttpRequest& req,
16!
971
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
2!
972
            co_return co_await handle_file_info(req, params, *index_ptr);
10!
973
        });
4✔
974

975
    router.get(
2!
976
        "/api/v1/events",
2✔
977
        [index_ptr](const HttpRequest& req,
9!
978
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
1!
979
            co_return co_await handle_events(req, params, *index_ptr);
5!
980
        });
2✔
981

982
    router.get(
2!
983
        "/api/v1/events/stream",
2✔
984
        [index_ptr](const HttpRequest& req,
9!
985
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
1!
986
            co_return co_await handle_events_stream(req, params, *index_ptr);
5!
987
        });
2✔
988

989
    router.get(
2!
990
        "/api/v1/stats",
2✔
991
        [index_ptr](const HttpRequest& req,
9!
992
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
1!
993
            co_return co_await handle_stats(req, params, *index_ptr);
5!
994
        });
2✔
995

996
    router.get(
2!
997
        "/api/v1/info",
2✔
998
        [index_ptr](const HttpRequest& req,
9!
999
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
1!
1000
            co_return co_await handle_info(req, params, *index_ptr);
5!
1001
        });
2✔
1002
}
2✔
1003

1004
}  // namespace dftracer::utils::server
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc