• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26043728131

18 May 2026 03:37PM UTC coverage: 51.706% (-0.4%) from 52.076%
26043728131

push

github

hariharan-devarajan
feat(perf): performance improvements for parallel reading, indexing, and aggregation

Indexer
- Streaming parse-and-emit worker pipeline with bounded memory usage
- Concurrent SST artifact ingestion with staging support
- Gzip member slicing for parallel indexing
- Lazy decoding for compressed value counts
- Bypass DOM wrapper for indexer hot path (simdjson on_demand)
- Decoupled write workers from parse workers
- --rebuild-summaries flag and optimized root summary rebuild

Aggregator / MPI
- Task-based DAG execution for aggregator pipeline
- Shared staging for multi-node artifact relocation
- Per-node thread scaling to avoid oversubscription
- Unified distributed aggregation tracking, removed manifest consolidation
- Deterministic aggregation and intra-file parallelism

Trace reader / query
- Compiled predicate evaluation for AND-of-EQ queries
- Uniform-match shortcut for AND-of-EQ queries
- Line-range support for work items and checkpoint processing
- Optimized chunk pruning and checkpoint handling

Replay
- Pipelined replay with coroutines and channels
- JsonParser-based trace processing
- Optimized string handling and i/o buffering

Organize / writer / dft
- Parallel slice creation and merging in organize visitor
- Inline indexer in organize
- Gzip member tracking in writer
- Coroutine-based event dispatcher with extracted parse logic
- Batch flushing in organize visitor

Arrow / call_tree
- Optimized arrow conversion
- Arrow IPC support and improved save/load in call_tree

Build / infrastructure
- zlib-ng option, system simdjson fallback
- cgroup v1/v2 memory limit detection
- Auto-computed per-file memory estimates and batch sizes
- CI: perf branch trigger, formatting

Docs
- Rewritten indexer and trace reader API references

35907 of 90345 branches covered (39.74%)

Branch coverage included in aggregate %.

16869 of 21880 new or added lines in 137 files covered. (77.1%)

273 existing lines in 39 files now uncovered.

32021 of 41028 relevant lines covered (78.05%)

13164.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.2
/src/dftracer/utils/python/utilities/comparator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <dftracer/utils/core/common/filesystem.h>
3
#include <dftracer/utils/core/common/platform_compat.h>
4
#include <dftracer/utils/core/coro/channel.h>
5
#include <dftracer/utils/core/coro/task.h>
6
#include <dftracer/utils/core/coro/when_all.h>
7
#include <dftracer/utils/core/pipeline/pipeline.h>
8
#include <dftracer/utils/core/pipeline/pipeline_config.h>
9
#include <dftracer/utils/core/runtime.h>
10
#include <dftracer/utils/core/tasks/coro_scope.h>
11
#include <dftracer/utils/core/tasks/task.h>
12
#include <dftracer/utils/python/arrow_helpers.h>
13
#include <dftracer/utils/python/runtime.h>
14
#include <dftracer/utils/python/utilities/comparator.h>
15
#include <dftracer/utils/utilities/common/query/query.h>
16
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregators.h>
17
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_config.h>
18
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_result.h>
19
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_utility.h>
20
#include <dftracer/utils/utilities/composites/dft/comparator/tree_table_formatter.h>
21
#include <dftracer/utils/utilities/composites/dft/indexing/index_resolver_utility.h>
22
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
23
#include <dftracer/utils/utilities/composites/dft/metadata_collector_utility.h>
24
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
25
#include <dftracer/utils/utilities/indexer/index_builder_utility.h>
26
#include <dftracer/utils/utilities/indexer/internal/indexer.h>
27

28
#include <atomic>
29
#include <chrono>
30
#include <cstdio>
31
#include <ctime>
32
#include <string>
33
#include <vector>
34

35
using dftracer::utils::Runtime;
36
using dftracer::utils::coro::CoroTask;
37
using namespace dftracer::utils;
38
using namespace dftracer::utils::utilities;
39
using namespace dftracer::utils::utilities::composites::dft::aggregators;
40
using namespace dftracer::utils::utilities::composites::dft::comparator;
41

42
#include <dftracer/utils/core/common/config.h>
43
#ifdef DFTRACER_UTILS_ENABLE_ARROW
44
using dftracer::utils::python::arrow_result_to_table;
45
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
46
#endif
47

48
static Runtime *get_runtime(ComparatorObject *self) {
26✔
49
    if (self->runtime_obj)
26!
50
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
×
51
    return get_default_runtime();
26✔
52
}
13✔
53

54
static void Comparator_dealloc(ComparatorObject *self) {
26✔
55
    Py_XDECREF(self->runtime_obj);
26✔
56
    Py_TYPE(self)->tp_free((PyObject *)self);
26✔
57
}
26✔
58

59
static PyObject *Comparator_new(PyTypeObject *type, PyObject *args,
26✔
60
                                PyObject *kwds) {
61
    ComparatorObject *self = (ComparatorObject *)type->tp_alloc(type, 0);
26✔
62
    if (self) {
26✔
63
        self->runtime_obj = NULL;
26✔
64
    }
13✔
65
    return (PyObject *)self;
26✔
66
}
67

68
static int Comparator_init(ComparatorObject *self, PyObject *args,
26✔
69
                           PyObject *kwds) {
70
    static const char *kwlist[] = {"runtime", NULL};
71
    PyObject *runtime_arg = NULL;
26✔
72

73
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char **)kwlist,
26!
74
                                     &runtime_arg)) {
75
        return -1;
×
76
    }
77

78
    if (runtime_arg && runtime_arg != Py_None) {
26!
79
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
80
            Py_INCREF(runtime_arg);
×
81
            self->runtime_obj = runtime_arg;
×
82
        } else {
83
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
84
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
×
85
                self->runtime_obj = native;
×
86
            } else {
87
                Py_XDECREF(native);
×
88
                PyErr_SetString(PyExc_TypeError,
×
89
                                "runtime must be a Runtime instance or None");
90
                return -1;
×
91
            }
92
        }
93
    }
94

95
    return 0;
26✔
96
}
13✔
97

98
// -----------------------------------------------------------------------
99
// Helpers
100
// -----------------------------------------------------------------------
101

102
struct ComparatorArgs {
39!
103
    std::string baseline;
104
    std::string variant;
105
    std::string query;
106
    std::string group_by;
107
    std::string format;
108
    double time_interval_ms = 5000.0;
13✔
109
    double threshold = 0.0;
13✔
110
    std::size_t executor_threads = 0;
13✔
111
    std::string baseline_index_dir;
112
    std::string variant_index_dir;
113
    bool force_rebuild = false;
13✔
114
    std::string config_path;
115
};
116

117
static int parse_comparator_args(PyObject *args, PyObject *kwds,
26✔
118
                                 ComparatorArgs &out) {
119
    static const char *kwlist[] = {"baseline",
120
                                   "variant",
121
                                   "query",
122
                                   "group_by",
123
                                   "format",
124
                                   "time_interval_ms",
125
                                   "threshold",
126
                                   "executor_threads",
127
                                   "baseline_index_dir",
128
                                   "variant_index_dir",
129
                                   "force_rebuild",
130
                                   "config",
131
                                   NULL};
132

133
    const char *baseline = NULL;
26✔
134
    const char *variant = NULL;
26✔
135
    const char *query = "";
26✔
136
    const char *group_by = "";
26✔
137
    const char *format = "table";
26✔
138
    double time_interval_ms = 5000.0;
26✔
139
    double threshold = 0.0;
26✔
140
    Py_ssize_t executor_threads = 0;
26✔
141
    const char *baseline_index_dir = "";
26✔
142
    const char *variant_index_dir = "";
26✔
143
    int force_rebuild = 0;
26✔
144
    const char *config = "";
26✔
145

146
    if (!PyArg_ParseTupleAndKeywords(
26!
147
            args, kwds, "ss|sssddnssps", (char **)kwlist, &baseline, &variant,
13✔
148
            &query, &group_by, &format, &time_interval_ms, &threshold,
149
            &executor_threads, &baseline_index_dir, &variant_index_dir,
150
            &force_rebuild, &config))
UNCOV
151
        return -1;
×
152

153
    out.baseline = baseline;
26!
154
    out.variant = variant;
26!
155
    out.query = query;
26!
156
    out.group_by = group_by;
26!
157
    out.format = format;
26!
158
    out.time_interval_ms = time_interval_ms;
26✔
159
    out.threshold = threshold;
26✔
160
    out.executor_threads = static_cast<std::size_t>(executor_threads);
26✔
161
    out.baseline_index_dir = baseline_index_dir;
26!
162
    out.variant_index_dir = variant_index_dir;
26!
163
    out.force_rebuild = force_rebuild != 0;
26✔
164
    out.config_path = config;
26!
165

166
    return 0;
26✔
167
}
13✔
168

169
namespace {
170

171
void flatten_nodes(const ComparisonNode &node,
26✔
172
                   std::vector<const ComparisonNode *> &out) {
173
    out.push_back(&node);
26!
174
    for (const auto &child : node.children) {
26!
175
        flatten_nodes(child, out);
×
176
    }
177
}
26✔
178

179
CoroTask<EventAggregatorOutput> run_aggregation(
156!
180
    std::vector<std::string> input_files, AggregationConfig agg_config,
181
    std::optional<common::query::Query> query, std::string index_dir,
182
    std::size_t checkpoint_size, bool force_rebuild,
183
    std::size_t executor_threads) {
26!
184
    constexpr std::size_t CHUNK_SIZE_MB = 4;
26✔
185
    constexpr std::size_t BATCH_SIZE_MB = 4;
26✔
186

187
    auto pipeline_config = PipelineConfig()
52!
188
                               .with_name("DFTracer Comparator Aggregation")
26!
189
                               .with_compute_threads(executor_threads)
26!
190
                               .with_watchdog(false);
26!
191
    Pipeline pipeline(pipeline_config);
26!
192

193
    EventAggregator merger;
26!
194
    std::atomic<int> global_chunk_idx{0};
26✔
195

196
    auto streaming_task = make_task(
52!
197
        [&](CoroScope &ctx) -> CoroTask<void> {
234!
198
            auto chunk_chan = coro::make_channel<ChunkAggregatorInput>(0);
78!
199
            auto result_chan = coro::make_channel<ChunkAggregationOutput>(2);
78!
200

201
            co_await ctx.scope([&](CoroScope &scope) -> CoroTask<void> {
286!
202
                for (const auto &file_path : input_files) {
54✔
203
                    auto *global_chunk_idx_ptr = &global_chunk_idx;
28✔
204
                    scope.spawn([file_path, ch = chunk_chan->producer(),
476!
205
                                 index_dir, checkpoint_size, force_rebuild,
28!
206
                                 agg_config, query, global_chunk_idx_ptr](
28!
207
                                    CoroScope & /*fctx*/) mutable
208
                                    -> CoroTask<void> {
28!
209
                        [[maybe_unused]] auto producer_guard = ch.guard();
84!
210

211
                        std::string index_path =
84✔
212
                            composites::dft::internal::determine_index_path(
84!
213
                                file_path, index_dir);
84✔
214

215
                        auto meta_input =
84✔
216
                            composites::dft::MetadataCollectorUtilityInput::
168!
217
                                from_file(file_path)
84!
218
                                    .with_checkpoint_size(checkpoint_size)
84✔
219
                                    .with_force_rebuild(force_rebuild)
28!
220
                                    .with_index(index_path);
28✔
221
                        auto metadata =
84✔
222
                            co_await composites::dft::MetadataCollectorUtility{}
224!
223
                                .process(meta_input);
84!
224

225
                        if (!metadata.success) {
84!
226
                            co_return;
227
                        }
228

229
                        FileChunkMapperUtility file_mapper;
84!
230
                        auto mapper_input =
84✔
231
                            FileChunkMapperInput::from_metadata(metadata)
168!
232
                                .with_config(agg_config)
84✔
233
                                .with_checkpoint_size(checkpoint_size)
28!
234
                                .with_target_chunk_size(CHUNK_SIZE_MB)
28!
235
                                .with_batch_size(BATCH_SIZE_MB * 1024 * 1024);
28!
236
                        mapper_input.query = query;
84!
237
                        auto file_chunks =
84✔
238
                            co_await file_mapper.process(mapper_input);
112!
239

240
                        int start_idx = global_chunk_idx_ptr->fetch_add(
168✔
241
                            static_cast<int>(file_chunks.size()));
84✔
242
                        for (int i = 0;
112✔
243
                             i < static_cast<int>(file_chunks.size()); ++i) {
112✔
244
                            file_chunks[i].chunk_index = start_idx + i;
28✔
245
                        }
28✔
246

247
                        for (auto &chunk : file_chunks) {
112!
248
                            if (!co_await ch.send(std::move(chunk))) {
112!
249
                                co_return;
250
                            }
251
                        }
28!
252
                        co_return;
28✔
253
                    });
756!
254
                }
28✔
255

256
                for (std::size_t w = 0; w < executor_threads; ++w) {
104✔
257
                    (void)w;
258
                    scope.spawn([chunk_chan, rp = result_chan->producer(),
769!
259
                                 result_chan](
78✔
260
                                    CoroScope &wctx) mutable -> CoroTask<void> {
78!
261
                        [[maybe_unused]] auto producer_guard = rp.guard();
78!
262
                        while (auto input = co_await wctx.receive(chunk_chan)) {
502!
263
                            ChunkAggregatorUtility agg;
84!
264
                            auto output = co_await agg.process(*input);
112!
265
                            if (!co_await result_chan->send(
112!
266
                                    std::move(output))) {
267
                                co_return;
268
                            }
269
                        }
218✔
270
                        co_return;
78✔
271
                    });
973!
272
                }
78✔
273

274
                auto *merger_ptr = &merger;
26✔
275
                scope.spawn([result_chan,
340!
276
                             merger_ptr](CoroScope &mctx) -> CoroTask<void> {
52!
277
                    while (auto output = co_await mctx.receive(result_chan)) {
179!
278
                        merger_ptr->merge_chunk(std::move(*output));
28!
279
                    }
54✔
280
                    co_return;
26✔
281
                });
184!
282

283
                co_return;
52✔
284
            });
78!
285

286
            co_return;
26✔
287
        },
130!
288
        "StreamingAggregate");
26!
289

290
    EventAggregatorOutput result;
26✔
291
    auto post_task = make_task(
52!
292
        [&](CoroScope & /*ctx*/) -> CoroTask<bool> {
182!
293
            result = merger.finalize();
26!
294
            co_return result.success;
52!
295
        },
52!
296
        "Finalize");
26!
297

298
    post_task->depends_on(streaming_task);
26!
299
    pipeline.set_source(streaming_task);
26!
300
    pipeline.set_destination(post_task);
26!
301
    pipeline.execute();
26!
302

303
    co_return result;
52!
304
}
78!
305

306
}  // namespace
307

308
static int run_comparison_pipeline(ComparatorObject *self,
26✔
309
                                   const ComparatorArgs &cargs,
310
                                   ComparisonOutput &output,
311
                                   std::string &error_msg) {
312
    ComparatorArgs args_copy = cargs;
26!
313
    auto *output_ptr = &output;
26✔
314

315
    Py_BEGIN_ALLOW_THREADS try {
26!
316
        ComparisonConfig config;
26!
317
        if (!args_copy.config_path.empty()) {
26✔
318
            std::string parse_error;
×
319
            auto parsed = ComparisonConfig::from_json_file(
×
320
                args_copy.config_path, parse_error);
×
321
            if (!parsed) {
×
322
                error_msg = "Config error: " + parse_error;
×
323
                goto done;
×
324
            }
325
            config = std::move(*parsed);
×
326
        } else {
×
327
            config = ComparisonConfig::from_cli(
39!
328
                args_copy.baseline, args_copy.variant, args_copy.query,
13✔
329
                args_copy.group_by);
26✔
330
        }
331

332
        config.format = args_copy.format;
26!
333
        config.no_color = true;
26✔
334
        if (args_copy.executor_threads > 0)
26✔
335
            config.executor_threads = args_copy.executor_threads;
×
336
        if (!args_copy.baseline_index_dir.empty())
26!
NEW
337
            config.baseline_index_dir = args_copy.baseline_index_dir;
×
338
        if (!args_copy.variant_index_dir.empty())
26!
NEW
339
            config.variant_index_dir = args_copy.variant_index_dir;
×
340
        if (args_copy.force_rebuild)
26✔
341
            config.force_rebuild = args_copy.force_rebuild;
×
342
        if (args_copy.threshold > 0.0)
26✔
343
            config.defaults.threshold_pct = args_copy.threshold;
×
344
        if (args_copy.time_interval_ms > 0.0)
26✔
345
            config.defaults.time_interval_ms = args_copy.time_interval_ms;
26✔
346

347
        config.resolve();
26!
348

349
        if (config.executor_threads == 0) {
26!
350
            config.executor_threads = dftracer_utils_hardware_concurrency();
26!
351
        }
13✔
352
        if (config.checkpoint_size == 0) {
26✔
353
            config.checkpoint_size =
26✔
354
                indexer::internal::Indexer::DEFAULT_CHECKPOINT_SIZE;
355
        }
13✔
356

357
        using composites::dft::indexing::IndexResolverUtility;
358
        using composites::dft::indexing::ResolverInput;
359
        using indexer::IndexBatchBuilderUtility;
360
        using indexer::IndexBuildBatchConfig;
361

362
        Runtime *rt = get_runtime(self);
26!
363

364
        auto *error_msg_ptr = &error_msg;
26✔
365
        auto task = [config, output_ptr, error_msg_ptr,
169!
366
                     rt]() -> CoroTask<void> {
26!
367
            auto resolve_and_build =
39✔
368
                [&config](
169!
369
                    CoroScope &scope, const std::string &path,
370
                    const std::string &index_dir,
371
                    std::vector<std::string> &out_files) -> CoroTask<void> {
13!
372
                IndexResolverUtility resolver;
13!
373
                ResolverInput resolve_input;
13✔
374
                resolve_input.index_dir = index_dir;
13!
375
                resolve_input.require_checkpoints = !config.force_rebuild;
13✔
376
                if (fs::is_regular_file(path)) {
13!
377
                    resolve_input.files = {path};
12!
378
                } else {
12✔
379
                    resolve_input.directory = path;
1!
380
                }
381

382
                auto result = co_await resolver.process(resolve_input);
39!
383
                out_files = std::move(result.all_files);
39✔
384

385
                if (out_files.empty() || result.needs_checkpoint.empty()) {
39✔
386
                    co_return;
52✔
387
                }
388

389
                auto batch_cfg = std::make_shared<IndexBuildBatchConfig>();
39!
390
                batch_cfg->file_paths.reserve(result.needs_checkpoint.size());
39✔
391
                for (const auto &item : result.needs_checkpoint) {
27✔
392
                    batch_cfg->file_paths.push_back(item.file_path);
14!
393
                }
14✔
394
                batch_cfg->index_dir = index_dir;
13✔
395
                batch_cfg->checkpoint_size = config.checkpoint_size;
39✔
396
                batch_cfg->parallelism = config.executor_threads;
39✔
397
                batch_cfg->force_rebuild = config.force_rebuild;
39✔
398
                batch_cfg->use_batch_write = true;
39✔
399
                batch_cfg->rebuild_root_summaries = true;
39✔
400

401
                co_await IndexBatchBuilderUtility::process(
91!
402
                    &scope, std::move(batch_cfg));
39✔
403
            };
169!
404

405
            std::vector<std::string> baseline_files;
39✔
406
            std::vector<std::string> variant_files;
39✔
407

408
            bool shared_index =
78✔
409
                composites::dft::internal::determine_index_path(
117!
410
                    config.baseline, config.baseline_index_dir) ==
78✔
411
                composites::dft::internal::determine_index_path(
78!
412
                    config.variant, config.variant_index_dir);
39✔
413

414
            co_await run_coro_scope(
52!
415
                rt->executor(), [&](CoroScope &scope) -> CoroTask<void> {
169!
416
                    if (shared_index) {
39!
417
                        co_await resolve_and_build(scope, config.baseline,
104!
418
                                                   config.baseline_index_dir,
39✔
419
                                                   baseline_files);
39✔
420
                        if (config.baseline == config.variant) {
13!
421
                            variant_files = baseline_files;
13!
422
                        } else {
13✔
423
                            co_await resolve_and_build(scope, config.variant,
×
424
                                                       config.variant_index_dir,
425
                                                       variant_files);
426
                        }
427
                    } else {
13✔
NEW
428
                        scope.spawn([&](CoroScope &s) -> CoroTask<void> {
×
429
                            co_await resolve_and_build(
×
430
                                s, config.baseline, config.baseline_index_dir,
431
                                baseline_files);
NEW
432
                        });
×
433
                        co_await resolve_and_build(scope, config.variant,
×
434
                                                   config.variant_index_dir,
435
                                                   variant_files);
436
                    }
437
                });
52!
438

439
            if (baseline_files.empty()) {
39!
440
                *error_msg_ptr =
441
                    "No trace files found in baseline: " + config.baseline;
×
442
                co_return;
443
            }
444
            if (variant_files.empty()) {
39!
445
                *error_msg_ptr =
446
                    "No trace files found in variant: " + config.variant;
×
447
                co_return;
448
            }
449

450
            output_ptr->baseline_path = config.baseline;
39✔
451
            output_ptr->variant_path = config.variant;
13✔
452
            output_ptr->baseline_file_count = baseline_files.size();
39✔
453
            output_ptr->variant_file_count = variant_files.size();
39✔
454

455
            auto start_time = std::chrono::high_resolution_clock::now();
39✔
456

457
            for (auto &node : config.nodes) {
52!
458
                std::vector<const ComparisonNode *> visitors;
39✔
459
                flatten_nodes(node, visitors);
39!
460

461
                std::vector<ComparisonVisitorPair> pairs;
39✔
462
                pairs.reserve(visitors.size());
39!
463

464
                for (const auto *visitor : visitors) {
78!
465
                    std::optional<common::query::Query> query;
39✔
466
                    if (!visitor->composed_query.empty()) {
39✔
467
                        auto result = common::query::Query::from_string(
26!
468
                            visitor->composed_query);
13✔
469
                        if (!result) {
13!
470
                            *error_msg_ptr = "Invalid query for node '" +
×
471
                                             visitor->name +
×
472
                                             "': " + result.error().format();
×
473
                            co_return;
474
                        }
475
                        query = std::move(*result);
13!
476
                    }
13!
477

478
                    AggregationConfig agg_cfg;
39!
479
                    agg_cfg.time_interval_us = static_cast<std::uint64_t>(
39✔
480
                        config.defaults.time_interval_ms * 1000.0);
39✔
481
                    agg_cfg.extra_group_keys = {};
39✔
482
                    agg_cfg.compute_statistics = true;
13✔
483
                    agg_cfg.compute_percentiles = true;
13✔
484
                    agg_cfg.percentiles = visitor->resolved_percentiles;
13✔
485
                    agg_cfg.sketch_accuracy = 0.01;
39✔
486
                    agg_cfg.track_process_parents = false;
39✔
487

488
                    auto [base_result, var_result] = co_await coro::when_all(
117!
489
                        run_aggregation(
78!
490
                            baseline_files, agg_cfg, query,
39!
491
                            config.baseline_index_dir, config.checkpoint_size,
39!
492
                            config.force_rebuild, config.executor_threads),
39✔
493
                        run_aggregation(
78!
494
                            variant_files, agg_cfg, query,
39!
495
                            config.variant_index_dir, config.checkpoint_size,
39!
496
                            config.force_rebuild, config.executor_threads));
39✔
497

498
                    if (pairs.empty()) {
13!
499
                        output_ptr->baseline_meta = extract_metadata(
26!
500
                            base_result.aggregations, baseline_files.size());
13✔
501
                        output_ptr->variant_meta = extract_metadata(
26!
502
                            var_result.aggregations, variant_files.size());
13✔
503
                    }
13✔
504

505
                    ComparisonVisitorPair pair;
13✔
506
                    pair.baseline = std::move(base_result);
13✔
507
                    pair.variant = std::move(var_result);
13✔
508
                    pair.node = *visitor;
13!
509
                    pairs.push_back(std::move(pair));
13!
510
                }
13!
511

512
                ComparisonUtilityInput cmp_input;
39✔
513
                cmp_input.visitors = std::move(pairs);
39✔
514
                cmp_input.root_node = node;
39!
515
                cmp_input.baseline_file_count = baseline_files.size();
39✔
516
                cmp_input.variant_file_count = variant_files.size();
39✔
517

518
                ComparisonUtility cmp;
39!
519
                auto cmp_output = co_await cmp.process(cmp_input);
52!
520
                output_ptr->nodes.push_back(std::move(cmp_output.result));
13!
521
            }
13!
522

523
            // Inject metadata rows into root SUMMARY.
524
            auto meta_rows = build_metadata_metrics(output_ptr->baseline_meta,
26!
525
                                                    output_ptr->variant_meta);
13✔
526
            for (auto &node : output_ptr->nodes) {
26✔
527
                node.summary.metrics.insert(node.summary.metrics.begin(),
39!
528
                                            meta_rows.begin(), meta_rows.end());
26✔
529
            }
13✔
530

531
            auto end_time = std::chrono::high_resolution_clock::now();
13✔
532
            std::chrono::duration<double, std::milli> duration =
13✔
533
                end_time - start_time;
13!
534
            output_ptr->execution_time_ms = duration.count();
13!
535
        };
338!
536

537
        rt->submit(task(), "comparator").get();
26!
538
    } catch (const std::exception &e) {
26!
UNCOV
539
        error_msg = e.what();
×
540
    }
13!
541
done:
13✔
542
    Py_END_ALLOW_THREADS
26!
543

544
        return error_msg.empty()
26✔
545
        ? 0
13!
546
        : -1;
13✔
547
}
26✔
548

549
// -----------------------------------------------------------------------
550
// compare() -- returns ArrowTable
551
// -----------------------------------------------------------------------
552

553
static PyObject *Comparator_compare(ComparatorObject *self, PyObject *args,
12✔
554
                                    PyObject *kwds) {
555
    ComparatorArgs cargs;
12✔
556
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
12!
557

558
    ComparisonOutput output;
12✔
559
    std::string error_msg;
12✔
560
    if (run_comparison_pipeline(self, cargs, output, error_msg) < 0) {
12!
561
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
562
        return NULL;
×
563
    }
564

565
#ifdef DFTRACER_UTILS_ENABLE_ARROW
566
    auto arrow_result = output.to_arrow();
12!
567
    if (!arrow_result.valid()) {
12!
568
        PyErr_SetString(PyExc_RuntimeError,
×
569
                        "Failed to convert comparison output "
570
                        "to Arrow");
571
        return NULL;
×
572
    }
573
    return arrow_result_to_table(std::move(arrow_result));
12!
574
#else
575
    PyErr_SetString(PyExc_RuntimeError,
576
                    "dftracer-utils was built without Arrow support");
577
    return NULL;
578
#endif
579
}
12✔
580

581
// -----------------------------------------------------------------------
582
// compare_json() -- returns JSON string
583
// -----------------------------------------------------------------------
584

585
static PyObject *Comparator_compare_json(ComparatorObject *self, PyObject *args,
8✔
586
                                         PyObject *kwds) {
587
    ComparatorArgs cargs;
8✔
588
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
8!
589

590
    ComparisonOutput output;
8✔
591
    std::string error_msg;
8✔
592
    if (run_comparison_pipeline(self, cargs, output, error_msg) < 0) {
8!
593
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
594
        return NULL;
×
595
    }
596

597
    TreeTableFormatter formatter;
8!
598
    std::string json = formatter.render_json(output);
8!
599
    return PyUnicode_FromStringAndSize(json.data(), (Py_ssize_t)json.size());
8!
600
}
8✔
601

602
// -----------------------------------------------------------------------
603
// compare_table() -- returns formatted table string
604
// -----------------------------------------------------------------------
605

606
static PyObject *Comparator_compare_table(ComparatorObject *self,
6✔
607
                                          PyObject *args, PyObject *kwds) {
608
    ComparatorArgs cargs;
6✔
609
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
6!
610

611
    ComparisonOutput output;
6✔
612
    std::string error_msg;
6✔
613
    if (run_comparison_pipeline(self, cargs, output, error_msg) < 0) {
6!
614
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
615
        return NULL;
×
616
    }
617

618
    FormatterOptions fmt_opts;
6✔
619
    fmt_opts.use_color = false;
6✔
620
    fmt_opts.use_unicode = false;
6✔
621
    TreeTableFormatter formatter(fmt_opts);
6!
622

623
    // Render to a temporary file and read back as string
624
    char *buf = NULL;
6✔
625
    std::size_t buf_size = 0;
6✔
626
    FILE *memstream = open_memstream(&buf, &buf_size);
6!
627
    if (!memstream) {
6!
628
        PyErr_SetString(PyExc_RuntimeError, "Failed to create memory stream");
×
629
        return NULL;
×
630
    }
631

632
    formatter.render(memstream, output);
6!
633
    fflush(memstream);
6!
634
    fclose(memstream);
6!
635

636
    PyObject *result = PyUnicode_FromStringAndSize(buf, (Py_ssize_t)buf_size);
6!
637
    free(buf);
6!
638
    return result;
6✔
639
}
6✔
640

641
// -----------------------------------------------------------------------
642
// __call__ delegates to compare()
643
// -----------------------------------------------------------------------
644

645
static PyObject *Comparator_call(PyObject *self, PyObject *args,
2✔
646
                                 PyObject *kwds) {
647
    return Comparator_compare((ComparatorObject *)self, args, kwds);
2✔
648
}
649

650
// -----------------------------------------------------------------------
651
// Method table
652
// -----------------------------------------------------------------------
653

654
static const char *COMPARE_DOC =
655
    "compare(baseline, variant, query='', group_by='',\n"
656
    "        format='table', time_interval_ms=5000.0,\n"
657
    "        threshold=0.0, executor_threads=0,\n"
658
    "        index_dir='', force_rebuild=False, config='')\n"
659
    "--\n"
660
    "\n"
661
    "Run comparison pipeline, return materialized ArrowTable.\n"
662
    "\n"
663
    "Args:\n"
664
    "    baseline (str): Baseline trace file or directory.\n"
665
    "    variant (str): Variant trace file or directory.\n"
666
    "    query (str): Query filter (default: all events).\n"
667
    "    group_by (str): Comma-separated group keys.\n"
668
    "    format (str): Output format (default 'table').\n"
669
    "    time_interval_ms (float): Time bucket in ms "
670
    "(default 5000).\n"
671
    "    threshold (float): Hide changes below this pct.\n"
672
    "    executor_threads (int): Parallel threads (0=auto).\n"
673
    "    index_dir (str): Directory for .dftindex stores.\n"
674
    "    force_rebuild (bool): Force index rebuild.\n"
675
    "    config (str): JSON config file path.\n"
676
    "\n"
677
    "Returns:\n"
678
    "    ArrowTable: Comparison results.\n";
679

680
static const char *COMPARE_JSON_DOC =
681
    "compare_json(baseline, variant, query='', group_by='',\n"
682
    "             format='table', time_interval_ms=5000.0,\n"
683
    "             threshold=0.0, executor_threads=0,\n"
684
    "             index_dir='', force_rebuild=False, "
685
    "config='')\n"
686
    "--\n"
687
    "\n"
688
    "Run comparison pipeline, return JSON string.\n"
689
    "\n"
690
    "Args:\n"
691
    "    baseline (str): Baseline trace file or directory.\n"
692
    "    variant (str): Variant trace file or directory.\n"
693
    "    query (str): Query filter (default: all events).\n"
694
    "    group_by (str): Comma-separated group keys.\n"
695
    "    format (str): Output format (default 'table').\n"
696
    "    time_interval_ms (float): Time bucket in ms "
697
    "(default 5000).\n"
698
    "    threshold (float): Hide changes below this pct.\n"
699
    "    executor_threads (int): Parallel threads (0=auto).\n"
700
    "    index_dir (str): Directory for .dftindex stores.\n"
701
    "    force_rebuild (bool): Force index rebuild.\n"
702
    "    config (str): JSON config file path.\n"
703
    "\n"
704
    "Returns:\n"
705
    "    str: JSON representation of comparison results.\n";
706

707
static const char *COMPARE_TABLE_DOC =
708
    "compare_table(baseline, variant, query='', group_by='',\n"
709
    "              format='table', time_interval_ms=5000.0,\n"
710
    "              threshold=0.0, executor_threads=0,\n"
711
    "              index_dir='', force_rebuild=False, "
712
    "config='')\n"
713
    "--\n"
714
    "\n"
715
    "Run comparison pipeline, return formatted table string.\n"
716
    "\n"
717
    "Args:\n"
718
    "    baseline (str): Baseline trace file or directory.\n"
719
    "    variant (str): Variant trace file or directory.\n"
720
    "    query (str): Query filter (default: all events).\n"
721
    "    group_by (str): Comma-separated group keys.\n"
722
    "    format (str): Output format (default 'table').\n"
723
    "    time_interval_ms (float): Time bucket in ms "
724
    "(default 5000).\n"
725
    "    threshold (float): Hide changes below this pct.\n"
726
    "    executor_threads (int): Parallel threads (0=auto).\n"
727
    "    index_dir (str): Directory for .dftindex stores.\n"
728
    "    force_rebuild (bool): Force index rebuild.\n"
729
    "    config (str): JSON config file path.\n"
730
    "\n"
731
    "Returns:\n"
732
    "    str: Formatted ASCII table of comparison results.\n";
733

734
static PyMethodDef Comparator_methods[] = {
1✔
735
    {"compare", (PyCFunction)Comparator_compare, METH_VARARGS | METH_KEYWORDS,
2✔
736
     COMPARE_DOC},
1✔
737
    {"compare_json", (PyCFunction)Comparator_compare_json,
2✔
738
     METH_VARARGS | METH_KEYWORDS, COMPARE_JSON_DOC},
1✔
739
    {"compare_table", (PyCFunction)Comparator_compare_table,
2✔
740
     METH_VARARGS | METH_KEYWORDS, COMPARE_TABLE_DOC},
1✔
741
    {NULL}};
1✔
742

743
PyTypeObject ComparatorType = {
744
    PyVarObject_HEAD_INIT(
745
        NULL, 0) "dftracer_utils_ext.ComparatorUtility", /* tp_name */
746
    sizeof(ComparatorObject),                            /* tp_basicsize */
747
    0,                                                   /* tp_itemsize */
748
    (destructor)Comparator_dealloc,                      /* tp_dealloc */
749
    0,                                        /* tp_vectorcall_offset */
750
    0,                                        /* tp_getattr */
751
    0,                                        /* tp_setattr */
752
    0,                                        /* tp_as_async */
753
    0,                                        /* tp_repr */
754
    0,                                        /* tp_as_number */
755
    0,                                        /* tp_as_sequence */
756
    0,                                        /* tp_as_mapping */
757
    0,                                        /* tp_hash */
758
    Comparator_call,                          /* tp_call */
759
    0,                                        /* tp_str */
760
    0,                                        /* tp_getattro */
761
    0,                                        /* tp_setattro */
762
    0,                                        /* tp_as_buffer */
763
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
764
    "ComparatorUtility(runtime: Runtime | None = None)\n"
765
    "--\n\n"
766
    "Compare DFTracer trace metrics between baseline and "
767
    "variant.\n\n"
768
    "Args:\n"
769
    "    runtime (Runtime or None): Runtime for thread pool "
770
    "control.\n"
771
    "        If None, uses the default global Runtime.\n\n"
772
    "compare(baseline, variant, ...) -> ArrowTable\n"
773
    "    Run comparison and return a materialized Arrow "
774
    "table.\n\n"
775
    "compare_json(baseline, variant, ...) -> str\n"
776
    "    Run comparison and return JSON string.\n\n"
777
    "compare_table(baseline, variant, ...) -> str\n"
778
    "    Run comparison and return formatted table "
779
    "string.\n",               /* tp_doc */
780
    0,                         /* tp_traverse */
781
    0,                         /* tp_clear */
782
    0,                         /* tp_richcompare */
783
    0,                         /* tp_weaklistoffset */
784
    0,                         /* tp_iter */
785
    0,                         /* tp_iternext */
786
    Comparator_methods,        /* tp_methods */
787
    0,                         /* tp_members */
788
    0,                         /* tp_getset */
789
    0,                         /* tp_base */
790
    0,                         /* tp_dict */
791
    0,                         /* tp_descr_get */
792
    0,                         /* tp_descr_set */
793
    0,                         /* tp_dictoffset */
794
    (initproc)Comparator_init, /* tp_init */
795
    0,                         /* tp_alloc */
796
    Comparator_new,            /* tp_new */
797
};
798

799
int init_comparator(PyObject *m) {
2✔
800
    if (PyType_Ready(&ComparatorType) < 0) return -1;
2✔
801

802
    Py_INCREF(&ComparatorType);
1✔
803
    if (PyModule_AddObject(m, "ComparatorUtility",
3!
804
                           (PyObject *)&ComparatorType) < 0) {
2!
805
        Py_DECREF(&ComparatorType);
806
        Py_DECREF(m);
807
        return -1;
×
808
    }
809

810
    return 0;
2✔
811
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc