• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26932094552

04 Jun 2026 05:08AM UTC coverage: 49.905% (-2.3%) from 52.184%
26932094552

push

github

hariharan-devarajan
chore(utils): add portable to_chars_double fallback for macOS and update zstd handling

- Introduce to_chars_double wrapper that falls back to snprintf on macOS < 13.3
- Force CPM-built zstd on Apple to avoid deployment target mismatches
- Update version patch to 8

16076 of 43875 branches covered (36.64%)

Branch coverage included in aggregate %.

0 of 3 new or added lines in 1 file covered. (0.0%)

660 existing lines in 103 files now uncovered.

21461 of 31342 relevant lines covered (68.47%)

13056.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

29.62
/src/dftracer/utils/server/trace_api.cpp
1
#include <dftracer/utils/core/common/filesystem.h>
2
#include <dftracer/utils/core/common/logging.h>
3
#include <dftracer/utils/core/common/transparent_string_hash.h>
4
#include <dftracer/utils/core/coro/channel.h>
5
#include <dftracer/utils/core/pipeline/executor.h>
6
#include <dftracer/utils/core/tasks/coro_scope.h>
7
#include <dftracer/utils/server/cursor.h>
8
#include <dftracer/utils/server/http_request.h>
9
#include <dftracer/utils/server/http_response.h>
10
#include <dftracer/utils/server/router.h>
11
#include <dftracer/utils/server/trace_api.h>
12
#include <dftracer/utils/server/trace_index.h>
13
#include <dftracer/utils/utilities/common/json/json_doc_guard.h>
14
#include <dftracer/utils/utilities/common/json/json_value.h>
15
#include <dftracer/utils/utilities/common/query/query.h>
16
#include <dftracer/utils/utilities/composites/dft/indexing/index_resolver_utility.h>
17
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
18
#include <dftracer/utils/utilities/composites/dft/statistics/shared_index_statistics_reader.h>
19
#include <dftracer/utils/utilities/composites/dft/statistics/statistics_aggregator_utility.h>
20
#include <dftracer/utils/utilities/composites/dft/statistics/statistics_query_utility.h>
21
#include <dftracer/utils/utilities/composites/dft/views/view_builder_utility.h>
22
#include <dftracer/utils/utilities/composites/dft/views/view_definition.h>
23
#include <dftracer/utils/utilities/composites/dft/views/view_reader_utility.h>
24
#include <dftracer/utils/utilities/fileio/lines/sources/async_streaming_gz_line_generator.h>
25

26
#include <atomic>
27
#include <cstddef>
28
#include <limits>
29
#include <mutex>
30
#include <string>
31
#include <unordered_map>
32
#include <unordered_set>
33
#include <vector>
34

35
namespace dftracer::utils::server {
36

37
using namespace dftracer::utils::utilities::composites::dft;
38
using namespace dftracer::utils::utilities::composites::dft::indexing;
39
using namespace dftracer::utils::utilities::composites::dft::statistics;
40
using namespace dftracer::utils::utilities::composites::dft::views;
41

42
// JSON-escape a string value (minimal: quotes, backslash, control chars).
43
static std::string json_escape(const std::string& s) {
4✔
44
    std::string out;
4✔
45
    out.reserve(s.size() + 2);
4!
46
    for (char c : s) {
284✔
47
        switch (c) {
280!
UNCOV
48
            case '"':
×
49
                out += "\\\"";
×
50
                break;
×
UNCOV
51
            case '\\':
×
52
                out += "\\\\";
×
53
                break;
×
UNCOV
54
            case '\n':
×
55
                out += "\\n";
×
56
                break;
×
UNCOV
57
            case '\r':
×
58
                out += "\\r";
×
59
                break;
×
UNCOV
60
            case '\t':
×
61
                out += "\\t";
×
62
                break;
×
63
            default:
280✔
64
                out += c;
280!
65
                break;
280✔
66
        }
67
    }
68
    return out;
4✔
UNCOV
69
}
×
70

71
using dftracer::utils::utilities::common::json::JsonValue;
72

73
// Hash metadata types that need smart filtering (FH, HH, SH).
74
static const std::unordered_set<std::string> HASH_METADATA_NAMES = {"FH", "HH",
75
                                                                    "SH"};
76

77
using dftracer::utils::utilities::common::json::JsonDocGuard;
78
using dftracer::utils::utilities::common::query::Query;
79

80
// --- GET /api/v1/files ---
81
static coro::CoroTask<HttpResponse> handle_files(const HttpRequest& /*req*/,
2!
82
                                                 const QueryParams& /*params*/,
83
                                                 TraceIndex& index) {
84
    std::string body;
85
    body.reserve(128 * index.file_count() + 32);
86
    body += "{\"files\":[";
87
    bool first = true;
88
    for (const auto& f : index.files()) {
89
        if (!first) body += ',';
90
        first = false;
91
        body += "{\"path\":\"";
92
        body += json_escape(f.path);
93
        body += "\",\"has_bloom_data\":";
94
        body += f.has_bloom_data ? "true" : "false";
95
        body += ",\"has_checkpoint_index\":";
96
        body += f.has_checkpoint_index ? "true" : "false";
97
        body += '}';
98
    }
99
    body += "],\"count\":";
100
    body += std::to_string(index.file_count());
101
    body += '}';
102
    co_return HttpResponse::ok(body);
103
}
4!
104

105
// --- GET /api/v1/files/info ---
106
static coro::CoroTask<HttpResponse> handle_file_info(const HttpRequest& /*req*/,
2!
107
                                                     const QueryParams& params,
108
                                                     TraceIndex& index) {
109
    auto file_param = params.get("file");
110
    if (file_param.empty()) {
111
        co_return HttpResponse::bad_request("Missing required parameter: file");
112
    }
113

114
    std::string file_path(file_param);
115
    auto* info = index.find_file(file_path);
116
    if (!info) {
117
        co_return HttpResponse::not_found();
118
    }
119

120
    std::string body;
121
    body.reserve(512);
122
    body += "{\"path\":\"";
123
    body += json_escape(info->path);
124
    body += "\",\"has_bloom_data\":";
125
    body += info->has_bloom_data ? "true" : "false";
126
    body += ",\"has_checkpoint_index\":";
127
    body += info->has_checkpoint_index ? "true" : "false";
128
    body += ",\"size_mb\":";
129
    body += std::to_string(info->size_mb);
130
    body += ",\"compressed_size\":";
131
    body += std::to_string(info->compressed_size);
132
    body += ",\"num_lines\":";
133
    body += std::to_string(info->num_lines);
134
    body += ",\"num_checkpoints\":";
135
    body += std::to_string(info->num_checkpoints);
136
    body += ",\"uncompressed_size\":";
137
    body += std::to_string(info->uncompressed_size);
138

139
    body += '}';
140
    co_return HttpResponse::ok(body);
141
}
4!
142

143
static std::vector<std::string> split_csv(std::string_view s) {
×
144
    std::vector<std::string> result;
×
145
    std::string token;
×
146
    for (char c : s) {
×
147
        if (c == ',') {
×
148
            if (!token.empty()) result.push_back(token);
×
149
            token.clear();
×
150
        } else {
151
            token += c;
×
152
        }
153
    }
154
    if (!token.empty()) result.push_back(token);
×
155
    return result;
×
156
}
×
157

158
static std::string format_in_clause(const std::string& field,
×
159
                                    const std::vector<std::string>& vals) {
160
    if (vals.size() == 1) return field + " == \"" + vals[0] + "\"";
×
161
    std::string s = field + " in [";
×
162
    for (std::size_t i = 0; i < vals.size(); ++i) {
×
163
        if (i > 0) s += ", ";
×
164
        s += "\"" + vals[i] + "\"";
×
165
    }
166
    s += "]";
×
167
    return s;
×
168
}
×
169

170
static std::optional<Query> build_query_from_params(const QueryParams& params) {
4✔
171
    std::string dsl;
4✔
172

173
    auto cat = params.get("cat");
4!
174
    if (!cat.empty()) {
4!
175
        auto vals = split_csv(cat);
×
176
        if (!vals.empty()) dsl += format_in_clause("cat", vals);
×
177
    }
×
178

179
    auto name = params.get("name");
4!
180
    if (!name.empty()) {
4!
181
        auto vals = split_csv(name);
×
182
        if (!vals.empty()) {
×
183
            if (!dsl.empty()) dsl += " and ";
×
184
            dsl += format_in_clause("name", vals);
×
185
        }
186
    }
×
187

188
    auto pid = params.get("pid");
4!
189
    if (!pid.empty()) {
4!
190
        if (!dsl.empty()) dsl += " and ";
×
191
        dsl += "pid == " + std::string(pid);
×
192
    }
193

194
    double ts_min = params.get_double("ts_min", 0);
4!
195
    double ts_max = params.get_double("ts_max", 0);
4!
196
    if (ts_min > 0) {
4!
197
        if (!dsl.empty()) dsl += " and ";
×
198
        dsl += "ts >= " + std::to_string(static_cast<uint64_t>(ts_min));
×
199
    }
200
    if (ts_max > 0) {
4!
201
        if (!dsl.empty()) dsl += " and ";
×
202
        dsl += "ts <= " + std::to_string(static_cast<uint64_t>(ts_max));
×
203
    }
204

205
    double dur_min = params.get_double("dur_min", 0);
4!
206
    double dur_max = params.get_double("dur_max", 0);
4!
207
    if (dur_min > 0) {
4!
208
        if (!dsl.empty()) dsl += " and ";
×
209
        dsl += "dur >= " + std::to_string(static_cast<uint64_t>(dur_min));
×
210
    }
211
    if (dur_max > 0) {
4!
212
        if (!dsl.empty()) dsl += " and ";
×
213
        dsl += "dur <= " + std::to_string(static_cast<uint64_t>(dur_max));
×
214
    }
215

216
    if (dsl.empty()) return std::nullopt;
4!
217
    auto result = Query::from_string(dsl);
×
218
    if (!result) return std::nullopt;
×
219
    return std::move(*result);
×
220
}
4✔
221

222
static ViewDefinition build_view_from_params(const QueryParams& params) {
2✔
223
    ViewDefinition view;
2✔
224
    view.name = "api_query";
2!
225
    view.description = "HTTP API query";
2!
226

227
    auto q = build_query_from_params(params);
2!
228
    if (q) view.with_query(std::move(*q));
2!
229
    return view;
4✔
230
}
2✔
231

232
// ============================================================================
233
// Shared helpers for event streaming endpoints
234
// ============================================================================
235

236
static std::vector<const TraceIndex::FileInfo*> resolve_target_files(
2✔
237
    TraceIndex& index, const QueryParams& params, double ts_min = 0,
238
    double ts_max = 0) {
239
    std::vector<const TraceIndex::FileInfo*> files;
2✔
240
    auto file_param = params.get("file");
2!
241
    if (!file_param.empty()) {
2!
242
        auto* f = index.find_file(std::string(file_param));
×
243
        if (f) files.push_back(f);
×
244
    } else {
245
        for (const auto& f : index.files()) {
4✔
246
            files.push_back(&f);
2!
247
        }
248
    }
249

250
    if (ts_min > 0 || ts_max > 0) {
2!
251
        std::vector<const TraceIndex::FileInfo*> filtered;
×
252
        filtered.reserve(files.size());
×
253
        for (auto* fi : files) {
×
254
            if (fi->min_timestamp_us == 0 && fi->max_timestamp_us == 0) {
×
255
                filtered.push_back(fi);
×
256
                continue;
×
257
            }
258
            double fi_min = static_cast<double>(fi->min_timestamp_us);
×
259
            double fi_max = static_cast<double>(fi->max_timestamp_us);
×
260
            if (fi_max < ts_min || (ts_max > 0 && fi_min > ts_max)) continue;
×
261
            filtered.push_back(fi);
×
262
        }
263
        files = std::move(filtered);
×
264
    }
×
265

266
    return files;
4✔
UNCOV
267
}
×
268

269
using StreamChunk = HttpResponse::StreamChunk;
270

271
static coro::AsyncGenerator<StreamChunk> stream_events(
2!
272
    std::vector<const TraceIndex::FileInfo*> files, ViewDefinition ev_view,
273
    std::optional<Query> /*query_opt*/, double ts_min, double ts_max,
274
    BloomFilterCache* bloom_cache, int limit) {
275
    int emitted = 0;
276

277
    for (auto* file_info : files) {
278
        if (limit > 0 && emitted >= limit) break;
279

280
        if (file_info->uncompressed_size == 0 &&
281
            file_info->num_checkpoints == 0)
282
            continue;
283

284
        ViewBuilderInput builder_input;
285
        builder_input.with_view(ev_view)
286
            .with_file_path(file_info->path)
287
            .with_index_path(file_info->has_bloom_data ? file_info->index_path
288
                                                       : "")
289
            .with_uncompressed_size(file_info->uncompressed_size)
290
            .with_num_checkpoints(file_info->num_checkpoints)
291
            .with_bloom_cache(bloom_cache)
292
            .with_time_range(ts_min, ts_max);
293

294
        ViewBuilderUtility builder;
295
        auto build_output = co_await builder.process(builder_input);
296
        if (!build_output.success || !build_output.file_may_match) continue;
297

298
        for (const auto& candidate : build_output.candidates) {
299
            if (limit > 0 && emitted >= limit) break;
300

301
            ViewReaderInput reader_input;
302
            reader_input.with_file_path(file_info->path)
303
                .with_index_path(file_info->index_path)
304
                .with_byte_range(candidate.start_byte, candidate.end_byte)
305
                .with_checkpoint_idx(candidate.checkpoint_idx)
306
                .with_view(ev_view);
307

308
            ViewReaderUtility reader;
309
            auto event_gen = reader.process(reader_input);
310
            while (auto batch = co_await event_gen.next()) {
311
                int count = std::min(
312
                    static_cast<int>(batch->events.size()),
313
                    limit > 0 ? limit - emitted
314
                              : static_cast<int>(batch->events.size()));
315
                if (count > 0) {
316
                    co_yield StreamChunk{std::span<const std::string_view>(
317
                        batch->events.data(), static_cast<std::size_t>(count))};
318
                    emitted += count;
319
                }
320
            }
321
        }
322
    }
323
}
4!
324

325
// ============================================================================
326
// Event endpoints
327
// ============================================================================
328

329
// --- GET /api/v1/events ---
330
static coro::CoroTask<HttpResponse> handle_events(const HttpRequest& /*req*/,
1!
331
                                                  const QueryParams& params,
332
                                                  TraceIndex& index) {
333
    int limit = params.get_int("limit", 1000);
334
    if (limit <= 0) limit = 1000;
335
    if (limit > 100000) limit = 100000;
336

337
    double ts_min = params.get_double("ts_min", 0);
338
    double ts_max = params.get_double("ts_max", 0);
339
    auto files = resolve_target_files(index, params, ts_min, ts_max);
340
    auto view = build_view_from_params(params);
341
    auto query = build_query_from_params(params);
342

343
    auto gen = std::make_unique<HttpResponse::StreamGenerator>(
344
        stream_events(std::move(files), std::move(view), std::move(query),
345
                      ts_min, ts_max, &index.bloom_cache(), limit));
346

347
    auto resp = HttpResponse::streaming(std::move(gen));
348
    resp.headers.push_back({"X-Limit", std::to_string(limit)});
349
    co_return resp;
350
}
2!
351

352
// --- GET /api/v1/events/stream ---
353
static coro::CoroTask<HttpResponse> handle_events_stream(
1!
354
    const HttpRequest& /*req*/, const QueryParams& params, TraceIndex& index) {
355
    double ts_min = params.get_double("ts_min", 0);
356
    double ts_max = params.get_double("ts_max", 0);
357
    auto files = resolve_target_files(index, params, ts_min, ts_max);
358
    auto view = build_view_from_params(params);
359
    auto query = build_query_from_params(params);
360
    int limit = params.get_int("limit", 0);
361

362
    auto gen = std::make_unique<HttpResponse::StreamGenerator>(
363
        stream_events(std::move(files), std::move(view), std::move(query),
364
                      ts_min, ts_max, &index.bloom_cache(), limit));
365

366
    co_return HttpResponse::streaming(std::move(gen));
367
}
2!
368

369
// --- GET /api/v1/stats ---
370
static coro::CoroTask<HttpResponse> handle_stats(const HttpRequest& req,
1!
371
                                                 const QueryParams& /*params*/,
372
                                                 TraceIndex& index) {
373
    static std::mutex cache_mutex;
374
    static std::unordered_map<std::string, std::string,
375
                              dftracer::utils::TransparentStringHash,
376
                              dftracer::utils::TransparentStringEqual>
377
        stats_cache;
378

379
    {
380
        std::lock_guard<std::mutex> lock(cache_mutex);
381
        auto it = stats_cache.find(req.path);
382
        if (it != stats_cache.end()) {
383
            co_return HttpResponse::ok(it->second);
384
        }
385
    }
386

387
    std::vector<TraceStatistics> all_stats;
388

389
    // Group files by index_path
390
    std::unordered_map<std::string,
391
                       std::vector<std::pair<std::size_t, std::string>>>
392
        files_by_index;
393
    std::size_t file_idx = 0;
394
    for (const auto& file_info : index.files()) {
395
        if (!file_info.has_bloom_data) continue;
396
        files_by_index[file_info.index_path].emplace_back(file_idx++,
397
                                                          file_info.path);
398
    }
399

400
    // Resolve each group and read statistics
401
    for (auto& [idx_path, files] : files_by_index) {
402
        std::vector<std::string> file_paths;
403
        file_paths.reserve(files.size());
404
        for (const auto& [_, path] : files) {
405
            file_paths.push_back(path);
406
        }
407

408
        IndexResolverUtility resolver;
409
        ResolverInput input;
410
        input.files = std::move(file_paths);
411
        input.require_checkpoints = false;
412

413
        auto result = co_await resolver.process(input);
414

415
        if (result.cached.empty()) {
416
            continue;
417
        }
418

419
        try {
420
            SharedIndexStatisticsReader reader;
421
            auto batch_rows = co_await reader.query(
422
                result.index_path, std::move(result.cached),
423
                StatisticsQueryType::SUMMARY);
424
            auto callback = [&all_stats](std::size_t /*file_index*/,
1✔
425
                                         TraceStatistics&& stats) {
1✔
426
                if (stats.success) {
1!
427
                    all_stats.push_back(std::move(stats));
1✔
428
                }
429
            };
1✔
430
            SharedIndexStatisticsReader::process_batch_results(batch_rows,
431
                                                               callback);
432
        } catch (const std::exception& e) {
433
            DFTRACER_UTILS_LOG_WARN("Server stats batch read failed for %s: %s",
434
                                    idx_path.c_str(), e.what());
435
        }
436
    }
437

438
    std::uint64_t total_events = 0;
439
    std::size_t file_count = all_stats.size();
440
    for (const auto& s : all_stats) {
441
        total_events += s.total_events();
442
    }
443

444
    std::string body;
445
    body.reserve(256 * all_stats.size() + 64);
446
    body += "{\"file_count\":";
447
    body += std::to_string(file_count);
448
    body += ",\"total_events\":";
449
    body += std::to_string(total_events);
450
    body += ",\"files\":[";
451
    for (std::size_t i = 0; i < all_stats.size(); ++i) {
452
        if (i > 0) body += ',';
453
        body += all_stats[i].to_json();
454
    }
455
    body += "]}";
456

457
    {
458
        std::lock_guard<std::mutex> lock(cache_mutex);
459
        stats_cache.emplace(std::string(req.path), body);
460
    }
461
    co_return HttpResponse::ok(body);
462
}
2!
463

464
// --- GET /api/v1/info ---
465
static coro::CoroTask<HttpResponse> handle_info(const HttpRequest& /*req*/,
1!
466
                                                const QueryParams& /*params*/,
467
                                                TraceIndex& index) {
468
    auto global_min = index.global_min_timestamp_us();
469
    auto global_max = index.global_max_timestamp_us();
470
    bool has_time_range =
471
        global_max > 0 &&
472
        global_min != std::numeric_limits<std::uint64_t>::max();
473

474
    std::string body;
475
    body.reserve(256 * index.file_count() + 128);
476
    body += "{\"file_count\":";
477
    body += std::to_string(index.file_count());
478

479
    if (has_time_range) {
480
        body += ",\"time_range\":{\"min_timestamp_us\":";
481
        body += std::to_string(global_min);
482
        body += ",\"max_timestamp_us\":";
483
        body += std::to_string(global_max);
484
        body += "}";
485
    }
486

487
    body += ",\"files\":[";
488
    bool first = true;
489
    for (const auto& f : index.files()) {
490
        if (!first) body += ',';
491
        first = false;
492
        body += "{\"path\":\"";
493
        body += json_escape(f.path);
494
        body += "\",\"has_bloom_data\":";
495
        body += f.has_bloom_data ? "true" : "false";
496
        body += ",\"has_checkpoint_index\":";
497
        body += f.has_checkpoint_index ? "true" : "false";
498
        if (f.min_timestamp_us > 0 || f.max_timestamp_us > 0) {
499
            body += ",\"min_timestamp_us\":";
500
            body += std::to_string(f.min_timestamp_us);
501
            body += ",\"max_timestamp_us\":";
502
            body += std::to_string(f.max_timestamp_us);
503
        }
504
        body += '}';
505
    }
506
    body += "]}";
507
    co_return HttpResponse::ok(body);
508
}
2!
509

510
void register_trace_api(Router& router, TraceIndex& index) {
2✔
511
    auto* index_ptr = &index;
2✔
512

513
    router.get(
2!
514
        "/api/v1/files",
515
        [index_ptr](const HttpRequest& req,
2!
516
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
517
            co_return co_await handle_files(req, params, *index_ptr);
518
        });
4!
519

520
    router.get(
2!
521
        "/api/v1/files/info",
522
        [index_ptr](const HttpRequest& req,
2!
523
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
524
            co_return co_await handle_file_info(req, params, *index_ptr);
525
        });
4!
526

527
    router.get(
2!
528
        "/api/v1/events",
529
        [index_ptr](const HttpRequest& req,
1!
530
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
531
            co_return co_await handle_events(req, params, *index_ptr);
532
        });
2!
533

534
    router.get(
2!
535
        "/api/v1/events/stream",
536
        [index_ptr](const HttpRequest& req,
1!
537
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
538
            co_return co_await handle_events_stream(req, params, *index_ptr);
539
        });
2!
540

541
    router.get(
2!
542
        "/api/v1/stats",
543
        [index_ptr](const HttpRequest& req,
1!
544
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
545
            co_return co_await handle_stats(req, params, *index_ptr);
546
        });
2!
547

548
    router.get(
2!
549
        "/api/v1/info",
550
        [index_ptr](const HttpRequest& req,
1!
551
                    const QueryParams& params) -> coro::CoroTask<HttpResponse> {
552
            co_return co_await handle_info(req, params, *index_ptr);
553
        });
2!
554
}
2✔
555

556
}  // namespace dftracer::utils::server
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc