• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28693295402

04 Jul 2026 03:17AM UTC coverage: 52.408% (+0.1%) from 52.278%
28693295402

push

github

hariharan-devarajan
feat: silence noisy warnings on aarch64

37318 of 92666 branches covered (40.27%)

Branch coverage included in aggregate %.

33462 of 42389 relevant lines covered (78.94%)

20557.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.14
/src/dftracer/utils/server/trace_api.cpp
1
#include <dftracer/utils/core/common/filesystem.h>
2
#include <dftracer/utils/core/common/logging.h>
3
#include <dftracer/utils/core/common/transparent_string_hash.h>
4
#include <dftracer/utils/core/coro/channel.h>
5
#include <dftracer/utils/core/pipeline/executor.h>
6
#include <dftracer/utils/core/tasks/coro_scope.h>
7
#include <dftracer/utils/server/cursor.h>
8
#include <dftracer/utils/server/http_request.h>
9
#include <dftracer/utils/server/http_response.h>
10
#include <dftracer/utils/server/router.h>
11
#include <dftracer/utils/server/trace_api.h>
12
#include <dftracer/utils/server/trace_index.h>
13
#include <dftracer/utils/utilities/common/json/json_doc_guard.h>
14
#include <dftracer/utils/utilities/common/json/json_value.h>
15
#include <dftracer/utils/utilities/common/query/query.h>
16
#include <dftracer/utils/utilities/composites/dft/indexing/index_resolver_utility.h>
17
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
18
#include <dftracer/utils/utilities/composites/dft/statistics/shared_index_statistics_reader.h>
19
#include <dftracer/utils/utilities/composites/dft/statistics/statistics_aggregator_utility.h>
20
#include <dftracer/utils/utilities/composites/dft/statistics/statistics_query_utility.h>
21
#include <dftracer/utils/utilities/composites/dft/views/view_builder_utility.h>
22
#include <dftracer/utils/utilities/composites/dft/views/view_definition.h>
23
#include <dftracer/utils/utilities/composites/dft/views/view_reader_utility.h>
24
#include <dftracer/utils/utilities/fileio/lines/sources/async_streaming_gz_line_generator.h>
25

26
#include <cstddef>
27
#include <limits>
28
#include <mutex>
29
#include <string>
30
#include <unordered_map>
31
#include <unordered_set>
32
#include <vector>
33

34
namespace dftracer::utils::server {
35

36
using namespace dftracer::utils::utilities::composites::dft;
37
using namespace dftracer::utils::utilities::composites::dft::indexing;
38
using namespace dftracer::utils::utilities::composites::dft::statistics;
39
using namespace dftracer::utils::utilities::composites::dft::views;
40

41
// JSON-escape a string value (minimal: quotes, backslash, control chars).
42
static std::string json_escape(const std::string& s) {
12✔
43
    std::string out;
12✔
44
    out.reserve(s.size() + 2);
12!
45
    for (char c : s) {
1,116✔
46
        switch (c) {
1,104!
47
            case '"':
48
                out += "\\\"";
×
49
                break;
×
50
            case '\\':
51
                out += "\\\\";
×
52
                break;
×
53
            case '\n':
54
                out += "\\n";
×
55
                break;
×
56
            case '\r':
57
                out += "\\r";
×
58
                break;
×
59
            case '\t':
60
                out += "\\t";
×
61
                break;
×
62
            default:
420✔
63
                out += c;
1,104!
64
                break;
1,104✔
65
        }
66
    }
67
    return out;
12✔
68
}
6!
69

70
// Hash metadata types that need smart filtering (FH, HH, SH).
71
static const std::unordered_set<std::string> HASH_METADATA_NAMES = {"FH", "HH",
265!
72
                                                                    "SH"};
265!
73

74
using dftracer::utils::utilities::common::query::Query;
75

76
// --- GET /api/v1/files ---
77
static coro::CoroTask<HttpResponse> handle_files(const HttpRequest& /*req*/,
24!
78
                                                 const QueryParams& /*params*/,
79
                                                 TraceIndex& index) {
4!
80
    std::string body;
4✔
81
    body.reserve(128 * index.file_count() + 32);
4!
82
    body += "{\"files\":[";
4!
83
    bool first = true;
4✔
84
    for (const auto& f : index.files()) {
8!
85
        if (!first) body += ',';
4!
86
        first = false;
4✔
87
        body += "{\"path\":\"";
4!
88
        body += json_escape(f.path);
4!
89
        body += "\",\"has_bloom_data\":";
4!
90
        body += f.has_bloom_data ? "true" : "false";
4!
91
        body += ",\"has_checkpoint_index\":";
4!
92
        body += f.has_checkpoint_index ? "true" : "false";
4!
93
        body += '}';
4!
94
    }
4✔
95
    body += "],\"count\":";
4!
96
    body += std::to_string(index.file_count());
4!
97
    body += '}';
4!
98
    co_return HttpResponse::ok(body);
8!
99
}
12!
100

101
// --- GET /api/v1/files/info ---
102
static coro::CoroTask<HttpResponse> handle_file_info(const HttpRequest& /*req*/,
16!
103
                                                     const QueryParams& params,
104
                                                     TraceIndex& index) {
2!
105
    auto file_param = params.get("file");
2!
106
    if (file_param.empty()) {
2✔
107
        co_return HttpResponse::bad_request("Missing required parameter: file");
3!
108
    }
109

110
    std::string file_path(file_param);
1!
111
    auto* info = index.find_file(file_path);
1!
112
    if (!info) {
1!
113
        co_return HttpResponse::not_found();
×
114
    }
115

116
    std::string body;
1✔
117
    body.reserve(512);
1!
118
    body += "{\"path\":\"";
1!
119
    body += json_escape(info->path);
1!
120
    body += "\",\"has_bloom_data\":";
1!
121
    body += info->has_bloom_data ? "true" : "false";
1!
122
    body += ",\"has_checkpoint_index\":";
1!
123
    body += info->has_checkpoint_index ? "true" : "false";
1!
124
    body += ",\"size_mb\":";
1!
125
    body += std::to_string(info->size_mb);
1!
126
    body += ",\"compressed_size\":";
1!
127
    body += std::to_string(info->compressed_size);
1!
128
    body += ",\"num_lines\":";
1!
129
    body += std::to_string(info->num_lines);
1!
130
    body += ",\"num_checkpoints\":";
1!
131
    body += std::to_string(info->num_checkpoints);
1!
132
    body += ",\"uncompressed_size\":";
1!
133
    body += std::to_string(info->uncompressed_size);
1!
134

135
    body += '}';
1!
136
    co_return HttpResponse::ok(body);
1!
137
}
6!
138

139
static std::vector<std::string> split_csv(std::string_view s) {
×
140
    std::vector<std::string> result;
×
141
    std::string token;
×
142
    for (char c : s) {
×
143
        if (c == ',') {
×
144
            if (!token.empty()) result.push_back(token);
×
145
            token.clear();
×
146
        } else {
147
            token += c;
×
148
        }
149
    }
150
    if (!token.empty()) result.push_back(token);
×
151
    return result;
×
152
}
×
153

154
static std::string format_in_clause(const std::string& field,
×
155
                                    const std::vector<std::string>& vals) {
156
    if (vals.size() == 1) return field + " == \"" + vals[0] + "\"";
×
157
    std::string s = field + " in [";
×
158
    for (std::size_t i = 0; i < vals.size(); ++i) {
×
159
        if (i > 0) s += ", ";
×
160
        s += "\"" + vals[i] + "\"";
×
161
    }
162
    s += "]";
×
163
    return s;
×
164
}
×
165

166
static std::optional<Query> build_query_from_params(const QueryParams& params) {
8✔
167
    std::string dsl;
8✔
168

169
    auto cat = params.get("cat");
8!
170
    if (!cat.empty()) {
8!
171
        auto vals = split_csv(cat);
×
172
        if (!vals.empty()) dsl += format_in_clause("cat", vals);
×
173
    }
×
174

175
    auto name = params.get("name");
8!
176
    if (!name.empty()) {
8!
177
        auto vals = split_csv(name);
×
178
        if (!vals.empty()) {
×
179
            if (!dsl.empty()) dsl += " and ";
×
180
            dsl += format_in_clause("name", vals);
×
181
        }
182
    }
×
183

184
    auto pid = params.get("pid");
8!
185
    if (!pid.empty()) {
8!
186
        if (!dsl.empty()) dsl += " and ";
×
187
        dsl += "pid == " + std::string(pid);
×
188
    }
189

190
    double ts_min = params.get_double("ts_min", 0);
8!
191
    double ts_max = params.get_double("ts_max", 0);
8!
192
    if (ts_min > 0) {
8!
193
        if (!dsl.empty()) dsl += " and ";
×
194
        dsl += "ts >= " + std::to_string(static_cast<uint64_t>(ts_min));
×
195
    }
196
    if (ts_max > 0) {
8!
197
        if (!dsl.empty()) dsl += " and ";
×
198
        dsl += "ts <= " + std::to_string(static_cast<uint64_t>(ts_max));
×
199
    }
200

201
    double dur_min = params.get_double("dur_min", 0);
8!
202
    double dur_max = params.get_double("dur_max", 0);
8!
203
    if (dur_min > 0) {
8!
204
        if (!dsl.empty()) dsl += " and ";
×
205
        dsl += "dur >= " + std::to_string(static_cast<uint64_t>(dur_min));
×
206
    }
207
    if (dur_max > 0) {
8!
208
        if (!dsl.empty()) dsl += " and ";
×
209
        dsl += "dur <= " + std::to_string(static_cast<uint64_t>(dur_max));
×
210
    }
211

212
    if (dsl.empty()) return std::nullopt;
8✔
213
    auto result = Query::from_string(dsl);
×
214
    if (!result) return std::nullopt;
×
215
    return std::move(*result);
×
216
}
8✔
217

218
static ViewDefinition build_view_from_params(const QueryParams& params) {
4✔
219
    ViewDefinition view;
4✔
220
    view.name = "api_query";
4!
221
    view.description = "HTTP API query";
4!
222

223
    auto q = build_query_from_params(params);
4!
224
    if (q) view.with_query(std::move(*q));
4!
225
    return view;
6✔
226
}
4!
227

228
// ============================================================================
229
// Shared helpers for event streaming endpoints
230
// ============================================================================
231

232
static std::vector<const TraceIndex::FileInfo*> resolve_target_files(
4✔
233
    TraceIndex& index, const QueryParams& params, double ts_min = 0,
234
    double ts_max = 0) {
235
    auto files = collect_candidate_files(index, params);
4✔
236

237
    if (ts_min > 0 || ts_max > 0) {
4!
238
        std::vector<const TraceIndex::FileInfo*> filtered;
×
239
        filtered.reserve(files.size());
×
240
        for (auto* fi : files) {
×
241
            if (fi->min_timestamp_us == 0 && fi->max_timestamp_us == 0) {
×
242
                filtered.push_back(fi);
×
243
                continue;
×
244
            }
245
            double fi_min = static_cast<double>(fi->min_timestamp_us);
×
246
            double fi_max = static_cast<double>(fi->max_timestamp_us);
×
247
            if (fi_max < ts_min || (ts_max > 0 && fi_min > ts_max)) continue;
×
248
            filtered.push_back(fi);
×
249
        }
250
        files = std::move(filtered);
×
251
    }
×
252

253
    return files;
4✔
254
}
2!
255

256
using StreamChunk = HttpResponse::StreamChunk;
257

258
static coro::AsyncGenerator<StreamChunk> stream_events(
20!
259
    std::vector<const TraceIndex::FileInfo*> files, ViewDefinition ev_view,
260
    std::optional<Query> /*query_opt*/, double ts_min, double ts_max,
261
    BloomFilterCache* bloom_cache, int limit) {
2!
262
    int emitted = 0;
2✔
263

264
    for (auto* file_info : files) {
8✔
265
        if (limit > 0 && emitted >= limit) break;
2!
266

267
        if (file_info->uncompressed_size == 0 &&
2!
268
            file_info->num_checkpoints == 0)
269
            continue;
270

271
        ViewBuilderInput builder_input;
2✔
272
        builder_input.with_view(ev_view)
4!
273
            .with_file_path(file_info->path)
2!
274
            .with_index_path(file_info->has_bloom_data ? file_info->index_path
2!
275
                                                       : "")
×
276
            .with_uncompressed_size(file_info->uncompressed_size)
2!
277
            .with_num_checkpoints(file_info->num_checkpoints)
2!
278
            .with_bloom_cache(bloom_cache)
2!
279
            .with_time_range(ts_min, ts_max);
2!
280

281
        ViewBuilderUtility builder;
2!
282
        auto build_output = co_await builder.process(builder_input);
6!
283
        if (!build_output || !build_output->file_may_match) continue;
2!
284

285
        for (const auto& candidate : build_output->candidates) {
8!
286
            if (limit > 0 && emitted >= limit) break;
2!
287

288
            ViewReaderInput reader_input;
2✔
289
            reader_input.with_file_path(file_info->path)
2!
290
                .with_index_path(file_info->index_path)
2!
291
                .with_byte_range(candidate.start_byte, candidate.end_byte)
2!
292
                .with_checkpoint_idx(candidate.checkpoint_idx)
2!
293
                .with_view(ev_view);
2!
294

295
            ViewReaderUtility reader;
2!
296
            auto event_gen = reader.process(reader_input);
2!
297
            while (auto batch = co_await event_gen.next()) {
16!
298
                int count = std::min(
4!
299
                    static_cast<int>(batch->events.size()),
2✔
300
                    limit > 0 ? limit - emitted
2✔
301
                              : static_cast<int>(batch->events.size()));
1✔
302
                if (count > 0) {
2✔
303
                    co_yield StreamChunk{std::span<const std::string_view>(
14!
304
                        batch->events.data(), static_cast<std::size_t>(count))};
6✔
305
                    emitted += count;
2✔
306
                }
2✔
307
            }
8✔
308
        }
6✔
309
    }
6!
310
}
34!
311

312
// ============================================================================
313
// Event endpoints
314
// ============================================================================
315

316
// --- GET /api/v1/events ---
317
static coro::CoroTask<HttpResponse> handle_events(const HttpRequest& /*req*/,
6!
318
                                                  const QueryParams& params,
319
                                                  TraceIndex& index) {
1!
320
    int limit = params.get_int("limit", 1000);
1!
321
    if (limit <= 0) limit = 1000;
1!
322
    if (limit > 100000) limit = 100000;
1!
323

324
    double ts_min = params.get_double("ts_min", 0);
1!
325
    double ts_max = params.get_double("ts_max", 0);
1!
326
    auto files = resolve_target_files(index, params, ts_min, ts_max);
1!
327
    auto view = build_view_from_params(params);
1!
328
    auto query = build_query_from_params(params);
1!
329

330
    auto gen = std::make_unique<HttpResponse::StreamGenerator>(
2!
331
        stream_events(std::move(files), std::move(view), std::move(query),
1!
332
                      ts_min, ts_max, &index.bloom_cache(), limit));
1!
333

334
    auto resp = HttpResponse::streaming(std::move(gen));
1!
335
    resp.headers.push_back({"X-Limit", std::to_string(limit)});
1!
336
    co_return resp;
2✔
337
}
3!
338

339
// --- GET /api/v1/events/stream ---
340
static coro::CoroTask<HttpResponse> handle_events_stream(
6!
341
    const HttpRequest& /*req*/, const QueryParams& params, TraceIndex& index) {
1!
342
    double ts_min = params.get_double("ts_min", 0);
1!
343
    double ts_max = params.get_double("ts_max", 0);
1!
344
    auto files = resolve_target_files(index, params, ts_min, ts_max);
1!
345
    auto view = build_view_from_params(params);
1!
346
    auto query = build_query_from_params(params);
1!
347
    int limit = params.get_int("limit", 0);
1!
348

349
    auto gen = std::make_unique<HttpResponse::StreamGenerator>(
1!
350
        stream_events(std::move(files), std::move(view), std::move(query),
2!
351
                      ts_min, ts_max, &index.bloom_cache(), limit));
1✔
352

353
    co_return HttpResponse::streaming(std::move(gen));
2!
354
}
3!
355

356
// --- GET /api/v1/stats ---
357
static coro::CoroTask<HttpResponse> handle_stats(const HttpRequest& req,
6!
358
                                                 const QueryParams& /*params*/,
359
                                                 TraceIndex& index) {
1!
360
    static std::mutex cache_mutex;
1!
361
    static std::unordered_map<std::string, std::string,
1!
362
                              dftracer::utils::TransparentStringHash,
363
                              dftracer::utils::TransparentStringEqual>
364
        stats_cache;
1✔
365

366
    {
367
        std::lock_guard<std::mutex> lock(cache_mutex);
1!
368
        auto it = stats_cache.find(req.path);
1!
369
        if (it != stats_cache.end()) {
1!
370
            co_return HttpResponse::ok(it->second);
1!
371
        }
372
    }
1!
373

374
    std::vector<TraceStatistics> all_stats;
1✔
375

376
    // Group files by index_path
377
    std::unordered_map<std::string,
1✔
378
                       std::vector<std::pair<std::size_t, std::string>>>
379
        files_by_index;
1✔
380
    std::size_t file_idx = 0;
1✔
381
    for (const auto& file_info : index.files()) {
2✔
382
        if (!file_info.has_bloom_data) continue;
1!
383
        files_by_index[file_info.index_path].emplace_back(file_idx++,
1!
384
                                                          file_info.path);
1✔
385
    }
1!
386

387
    // Resolve each group and read statistics
388
    for (auto& [idx_path, files] : files_by_index) {
4!
389
        std::vector<std::string> file_paths;
3✔
390
        file_paths.reserve(files.size());
3!
391
        for (const auto& [_, path] : files) {
4✔
392
            file_paths.push_back(path);
1!
393
        }
1✔
394

395
        IndexResolverUtility resolver;
3!
396
        ResolverInput input;
3✔
397
        input.files = std::move(file_paths);
3✔
398
        input.require_checkpoints = false;
3✔
399

400
        auto result = co_await resolver.process(input);
4!
401

402
        if (result.cached.empty()) {
3!
403
            continue;
404
        }
405

406
        try {
407
            SharedIndexStatisticsReader reader;
3✔
408
            auto batch_rows = co_await reader.query(
4!
409
                result.index_path, std::move(result.cached),
3!
410
                StatisticsQueryType::SUMMARY);
411
            auto callback = [&all_stats](std::size_t /*file_index*/,
3✔
412
                                         TraceStatistics&& stats) {
1✔
413
                if (stats.success) {
2✔
414
                    all_stats.push_back(std::move(stats));
2✔
415
                }
1✔
416
            };
2✔
417
            SharedIndexStatisticsReader::process_batch_results(batch_rows,
1!
418
                                                               callback);
419
        } catch (const std::exception& e) {
1!
420
            DFTRACER_UTILS_LOG_WARN("Server stats batch read failed for %s: %s",
×
421
                                    idx_path.c_str(), e.what());
422
        }
×
423
    }
3!
424

425
    std::uint64_t total_events = 0;
1✔
426
    std::size_t file_count = all_stats.size();
1✔
427
    for (const auto& s : all_stats) {
2✔
428
        total_events += s.total_events();
1!
429
    }
1✔
430

431
    std::string body;
1✔
432
    body.reserve(256 * all_stats.size() + 64);
1!
433
    body += "{\"file_count\":";
1!
434
    body += std::to_string(file_count);
1!
435
    body += ",\"total_events\":";
1!
436
    body += std::to_string(total_events);
1!
437
    body += ",\"files\":[";
1!
438
    for (std::size_t i = 0; i < all_stats.size(); ++i) {
2✔
439
        if (i > 0) body += ',';
1!
440
        body += all_stats[i].to_json();
1!
441
    }
1✔
442
    body += "]}";
1!
443

444
    {
445
        std::lock_guard<std::mutex> lock(cache_mutex);
1!
446
        stats_cache.emplace(std::string(req.path), body);
1!
447
    }
1✔
448
    co_return HttpResponse::ok(body);
1!
449
}
11!
450

451
// --- GET /api/v1/info ---
452
static coro::CoroTask<HttpResponse> handle_info(const HttpRequest& /*req*/,
6!
453
                                                const QueryParams& /*params*/,
454
                                                TraceIndex& index) {
1!
455
    auto global_min = index.global_min_timestamp_us();
1!
456
    auto global_max = index.global_max_timestamp_us();
1!
457
    bool has_time_range =
2✔
458
        global_max > 0 &&
1!
459
        global_min != std::numeric_limits<std::uint64_t>::max();
1✔
460

461
    std::string body;
1✔
462
    body.reserve(256 * index.file_count() + 128);
1!
463
    body += "{\"file_count\":";
1!
464
    body += std::to_string(index.file_count());
1!
465

466
    if (has_time_range) {
1!
467
        body += ",\"time_range\":{\"min_timestamp_us\":";
1!
468
        body += std::to_string(global_min);
1!
469
        body += ",\"max_timestamp_us\":";
1!
470
        body += std::to_string(global_max);
1!
471
        body += "}";
1!
472
    }
1✔
473

474
    body += ",\"files\":[";
1!
475
    bool first = true;
1✔
476
    for (const auto& f : index.files()) {
2✔
477
        if (!first) body += ',';
1!
478
        first = false;
1✔
479
        body += "{\"path\":\"";
1!
480
        body += json_escape(f.path);
1!
481
        body += "\",\"has_bloom_data\":";
1!
482
        body += f.has_bloom_data ? "true" : "false";
1!
483
        body += ",\"has_checkpoint_index\":";
1!
484
        body += f.has_checkpoint_index ? "true" : "false";
1!
485
        if (f.min_timestamp_us > 0 || f.max_timestamp_us > 0) {
1!
486
            body += ",\"min_timestamp_us\":";
1!
487
            body += std::to_string(f.min_timestamp_us);
1!
488
            body += ",\"max_timestamp_us\":";
1!
489
            body += std::to_string(f.max_timestamp_us);
1!
490
        }
1✔
491
        body += '}';
1!
492
    }
1✔
493
    body += "]}";
1!
494
    co_return HttpResponse::ok(body);
2!
495
}
3!
496

497
void register_trace_api(Router& router, TraceIndex& index) {
4✔
498
    auto* index_ptr = &index;
4✔
499

500
    router.get(
4!
501
        "/api/v1/files",
2✔
502
        [index_ptr](const HttpRequest& req,
34!
503
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
4!
504
            co_return co_await handle_files(req, params, *index_ptr);
20!
505
        });
16!
506

507
    router.get(
4!
508
        "/api/v1/files/info",
2✔
509
        [index_ptr](const HttpRequest& req,
18!
510
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
2!
511
            co_return co_await handle_file_info(req, params, *index_ptr);
10!
512
        });
8!
513

514
    router.get(
4!
515
        "/api/v1/events",
2✔
516
        [index_ptr](const HttpRequest& req,
10!
517
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
1!
518
            co_return co_await handle_events(req, params, *index_ptr);
5!
519
        });
4!
520

521
    router.get(
4!
522
        "/api/v1/events/stream",
2✔
523
        [index_ptr](const HttpRequest& req,
10!
524
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
1!
525
            co_return co_await handle_events_stream(req, params, *index_ptr);
5!
526
        });
4!
527

528
    router.get(
4!
529
        "/api/v1/stats",
2✔
530
        [index_ptr](const HttpRequest& req,
10!
531
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
1!
532
            co_return co_await handle_stats(req, params, *index_ptr);
5!
533
        });
4!
534

535
    router.get(
4!
536
        "/api/v1/info",
2✔
537
        [index_ptr](const HttpRequest& req,
10!
538
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
1!
539
            co_return co_await handle_info(req, params, *index_ptr);
5!
540
        });
4!
541
}
4✔
542

543
}  // namespace dftracer::utils::server
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc