• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28693295402

04 Jul 2026 03:17AM UTC coverage: 52.408% (+0.1%) from 52.278%
28693295402

push

github

hariharan-devarajan
feat: silence noisy warnings on aarch64

37318 of 92666 branches covered (40.27%)

Branch coverage included in aggregate %.

33462 of 42389 relevant lines covered (78.94%)

20557.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.37
/src/dftracer/utils/python/utilities/comparator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <dftracer/utils/core/common/error.h>
3
#include <dftracer/utils/core/common/filesystem.h>
4
#include <dftracer/utils/core/common/platform_compat.h>
5
#include <dftracer/utils/core/coro/channel.h>
6
#include <dftracer/utils/core/coro/task.h>
7
#include <dftracer/utils/core/coro/when_all.h>
8
#include <dftracer/utils/core/pipeline/pipeline.h>
9
#include <dftracer/utils/core/pipeline/pipeline_config.h>
10
#include <dftracer/utils/core/runtime.h>
11
#include <dftracer/utils/core/tasks/coro_scope.h>
12
#include <dftracer/utils/core/tasks/task.h>
13
#include <dftracer/utils/python/arrow_helpers.h>
14
#include <dftracer/utils/python/py_runtime_mixin.h>
15
#include <dftracer/utils/python/py_type_helpers.h>
16
#include <dftracer/utils/python/runtime.h>
17
#include <dftracer/utils/python/utilities/comparator.h>
18
#include <dftracer/utils/utilities/common/query/query.h>
19
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregators.h>
20
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_config.h>
21
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_result.h>
22
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_utility.h>
23
#include <dftracer/utils/utilities/composites/dft/comparator/tree_table_formatter.h>
24
#include <dftracer/utils/utilities/composites/dft/indexing/index_resolver_utility.h>
25
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
26
#include <dftracer/utils/utilities/composites/dft/metadata_collector_utility.h>
27
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
28
#include <dftracer/utils/utilities/indexer/index_builder_utility.h>
29
#include <dftracer/utils/utilities/indexer/internal/indexer.h>
30

31
#include <atomic>
32
#include <chrono>
33
#include <cstdio>
34
#include <ctime>
35
#include <stdexcept>
36
#include <string>
37
#include <vector>
38

39
using dftracer::utils::Runtime;
40
using dftracer::utils::coro::CoroTask;
41
using namespace dftracer::utils;
42
using namespace dftracer::utils::utilities;
43
using namespace dftracer::utils::utilities::composites::dft::aggregators;
44
using namespace dftracer::utils::utilities::composites::dft::comparator;
45

46
#include <dftracer/utils/core/common/config.h>
47
#ifdef DFTRACER_UTILS_ENABLE_ARROW
48
using dftracer::utils::python::arrow_result_to_table;
49
#endif
50

51
DFTRACER_UTILS_RUNTIME_BACKED_SLOTS(Comparator, ComparatorObject)
78✔
52

53
// -----------------------------------------------------------------------
54
// Helpers
55
// -----------------------------------------------------------------------
56

57
struct ComparatorArgs {
39!
58
    std::string baseline;
59
    std::string variant;
60
    std::string query;
61
    std::string group_by;
62
    std::string format;
63
    double time_interval_ms = 5000.0;
13✔
64
    double threshold = 0.0;
13✔
65
    std::size_t executor_threads = 0;
13✔
66
    std::string baseline_index_dir;
67
    std::string variant_index_dir;
68
    bool force_rebuild = false;
13✔
69
    std::string config_path;
70
};
71

72
static int parse_comparator_args(PyObject *args, PyObject *kwds,
26✔
73
                                 ComparatorArgs &out) {
74
    static const char *kwlist[] = {"baseline",
75
                                   "variant",
76
                                   "query",
77
                                   "group_by",
78
                                   "format",
79
                                   "time_interval_ms",
80
                                   "threshold",
81
                                   "executor_threads",
82
                                   "baseline_index_dir",
83
                                   "variant_index_dir",
84
                                   "force_rebuild",
85
                                   "config",
86
                                   NULL};
87

88
    const char *baseline = NULL;
26✔
89
    const char *variant = NULL;
26✔
90
    const char *query = "";
26✔
91
    const char *group_by = "";
26✔
92
    const char *format = "table";
26✔
93
    double time_interval_ms = 5000.0;
26✔
94
    double threshold = 0.0;
26✔
95
    Py_ssize_t executor_threads = 0;
26✔
96
    const char *baseline_index_dir = "";
26✔
97
    const char *variant_index_dir = "";
26✔
98
    int force_rebuild = 0;
26✔
99
    const char *config = "";
26✔
100

101
    if (!PyArg_ParseTupleAndKeywords(
26!
102
            args, kwds, "ss|sssddnssps", (char **)kwlist, &baseline, &variant,
13✔
103
            &query, &group_by, &format, &time_interval_ms, &threshold,
104
            &executor_threads, &baseline_index_dir, &variant_index_dir,
105
            &force_rebuild, &config))
106
        return -1;
×
107

108
    out.baseline = baseline;
26!
109
    out.variant = variant;
26!
110
    out.query = query;
26!
111
    out.group_by = group_by;
26!
112
    out.format = format;
26!
113
    out.time_interval_ms = time_interval_ms;
26✔
114
    out.threshold = threshold;
26✔
115
    out.executor_threads = static_cast<std::size_t>(executor_threads);
26✔
116
    out.baseline_index_dir = baseline_index_dir;
26!
117
    out.variant_index_dir = variant_index_dir;
26!
118
    out.force_rebuild = force_rebuild != 0;
26✔
119
    out.config_path = config;
26!
120

121
    return 0;
26✔
122
}
13✔
123

124
namespace {
125

126
void flatten_nodes(const ComparisonNode &node,
26✔
127
                   std::vector<const ComparisonNode *> &out) {
128
    out.push_back(&node);
26!
129
    for (const auto &child : node.children) {
26!
130
        flatten_nodes(child, out);
×
131
    }
132
}
26✔
133

134
CoroTask<EventAggregatorOutput> run_aggregation(
156!
135
    std::vector<std::string> input_files, AggregationConfig agg_config,
136
    std::optional<common::query::Query> query, std::string index_dir,
137
    std::size_t checkpoint_size, bool force_rebuild,
138
    std::size_t executor_threads) {
26!
139
    constexpr std::size_t CHUNK_SIZE_MB = 4;
26✔
140
    constexpr std::size_t BATCH_SIZE_MB = 4;
26✔
141

142
    auto pipeline_config = PipelineConfig()
52!
143
                               .with_name("DFTracer Comparator Aggregation")
26!
144
                               .with_compute_threads(executor_threads)
26!
145
                               .with_watchdog(false);
26!
146
    Pipeline pipeline(pipeline_config);
26!
147

148
    EventAggregator merger;
26!
149
    std::atomic<int> global_chunk_idx{0};
26✔
150

151
    auto streaming_task = make_task(
52!
152
        [&](CoroScope &ctx) -> CoroTask<void> {
234!
153
            auto chunk_chan = coro::make_channel<ChunkAggregatorInput>(0);
78!
154
            auto result_chan = coro::make_channel<ChunkAggregationOutput>(2);
78!
155

156
            co_await ctx.scope([&](CoroScope &scope) -> CoroTask<void> {
286!
157
                for (const auto &file_path : input_files) {
54✔
158
                    auto *global_chunk_idx_ptr = &global_chunk_idx;
28✔
159
                    scope.spawn([file_path, ch = chunk_chan->producer(),
476!
160
                                 index_dir, checkpoint_size, force_rebuild,
28!
161
                                 agg_config, query, global_chunk_idx_ptr](
28!
162
                                    CoroScope & /*fctx*/) mutable
163
                                    -> CoroTask<void> {
28!
164
                        [[maybe_unused]] auto producer_guard = ch.guard();
84!
165

166
                        std::string index_path =
84✔
167
                            composites::dft::internal::determine_index_path(
84!
168
                                file_path, index_dir);
84✔
169

170
                        auto meta_input =
84✔
171
                            composites::dft::MetadataCollectorUtilityInput::
168!
172
                                from_file(file_path)
84!
173
                                    .with_checkpoint_size(checkpoint_size)
84✔
174
                                    .with_force_rebuild(force_rebuild)
28!
175
                                    .with_index(index_path);
28✔
176
                        auto metadata =
84✔
177
                            co_await composites::dft::MetadataCollectorUtility{}
224!
178
                                .process(meta_input);
84!
179

180
                        if (!metadata.success) {
84!
181
                            co_return;
182
                        }
183

184
                        FileChunkMapperUtility file_mapper;
84!
185
                        auto mapper_input =
84✔
186
                            FileChunkMapperInput::from_metadata(metadata)
168!
187
                                .with_config(agg_config)
84✔
188
                                .with_checkpoint_size(checkpoint_size)
28!
189
                                .with_target_chunk_size(CHUNK_SIZE_MB)
28!
190
                                .with_batch_size(BATCH_SIZE_MB * 1024 * 1024);
28!
191
                        mapper_input.query = query;
84!
192
                        auto file_chunks =
84✔
193
                            co_await file_mapper.process(mapper_input);
112!
194

195
                        int start_idx = global_chunk_idx_ptr->fetch_add(
168✔
196
                            static_cast<int>(file_chunks.size()));
84✔
197
                        for (int i = 0;
112✔
198
                             i < static_cast<int>(file_chunks.size()); ++i) {
112✔
199
                            file_chunks[i].chunk_index = start_idx + i;
28✔
200
                        }
28✔
201

202
                        for (auto &chunk : file_chunks) {
112!
203
                            if (!co_await ch.send(std::move(chunk))) {
112!
204
                                co_return;
205
                            }
206
                        }
28!
207
                        co_return;
28✔
208
                    });
756!
209
                }
28✔
210

211
                for (std::size_t w = 0; w < executor_threads; ++w) {
104✔
212
                    (void)w;
213
                    scope.spawn([chunk_chan, rp = result_chan->producer(),
780!
214
                                 result_chan](
78✔
215
                                    CoroScope &wctx) mutable -> CoroTask<void> {
79✔
216
                        [[maybe_unused]] auto producer_guard = rp.guard();
75!
217
                        while (auto input = co_await wctx.receive(chunk_chan)) {
502!
218
                            ChunkAggregatorUtility agg;
84!
219
                            auto output = co_await agg.process(*input);
112!
220
                            if (!co_await result_chan->send(
112!
221
                                    std::move(output))) {
222
                                co_return;
223
                            }
224
                        }
216✔
225
                        co_return;
77✔
226
                    });
981!
227
                }
78✔
228

229
                auto *merger_ptr = &merger;
26✔
230
                scope.spawn([result_chan,
340!
231
                             merger_ptr](CoroScope &mctx) -> CoroTask<void> {
52!
232
                    while (auto output = co_await mctx.receive(result_chan)) {
179!
233
                        merger_ptr->merge_chunk(std::move(*output));
28!
234
                    }
54✔
235
                    co_return;
26✔
236
                });
184!
237

238
                co_return;
52✔
239
            });
78!
240

241
            co_return;
26✔
242
        },
130!
243
        "StreamingAggregate");
26!
244

245
    EventAggregatorOutput result;
26✔
246
    auto post_task = make_task(
52!
247
        [&](CoroScope & /*ctx*/) -> CoroTask<bool> {
182!
248
            result = merger.finalize();
26!
249
            co_return result.success;
52!
250
        },
52!
251
        "Finalize");
26!
252

253
    post_task->depends_on(streaming_task);
26!
254
    pipeline.set_source(streaming_task);
26!
255
    pipeline.set_destination(post_task);
26!
256
    pipeline.execute();
26!
257

258
    co_return result;
52!
259
}
78!
260

261
}  // namespace
262

263
static bool run_comparison_pipeline(ComparatorObject *self,
26✔
264
                                    const ComparatorArgs &cargs,
265
                                    ComparisonOutput &output) {
266
    ComparatorArgs args_copy = cargs;
26!
267
    auto *output_ptr = &output;
26✔
268

269
    return run_blocking([&] {
39!
270
        ComparisonConfig config;
26!
271
        if (!args_copy.config_path.empty()) {
26✔
272
            std::string parse_error;
×
273
            auto parsed = ComparisonConfig::from_json_file(
×
274
                args_copy.config_path, parse_error);
×
275
            if (!parsed) {
×
276
                throw DFTUtilsException(ErrorCode::PARSE,
×
277
                                        "Config error: " + parse_error);
×
278
            }
279
            config = std::move(*parsed);
×
280
        } else {
×
281
            config = ComparisonConfig::from_cli(
26!
282
                args_copy.baseline, args_copy.variant, args_copy.query,
26✔
283
                args_copy.group_by);
26!
284
        }
285

286
        config.format = args_copy.format;
26!
287
        config.no_color = true;
26✔
288
        if (args_copy.executor_threads > 0)
26✔
289
            config.executor_threads = args_copy.executor_threads;
×
290
        if (!args_copy.baseline_index_dir.empty())
26!
291
            config.baseline_index_dir = args_copy.baseline_index_dir;
×
292
        if (!args_copy.variant_index_dir.empty())
26!
293
            config.variant_index_dir = args_copy.variant_index_dir;
×
294
        if (args_copy.force_rebuild)
26✔
295
            config.force_rebuild = args_copy.force_rebuild;
×
296
        if (args_copy.threshold > 0.0)
26✔
297
            config.defaults.threshold_pct = args_copy.threshold;
×
298
        if (args_copy.time_interval_ms > 0.0)
26✔
299
            config.defaults.time_interval_ms = args_copy.time_interval_ms;
26✔
300

301
        config.resolve();
26!
302

303
        if (config.executor_threads == 0) {
26!
304
            config.executor_threads = dftracer_utils_hardware_concurrency();
26!
305
        }
13✔
306
        if (config.checkpoint_size == 0) {
26✔
307
            config.checkpoint_size =
26✔
308
                indexer::internal::Indexer::DEFAULT_CHECKPOINT_SIZE;
309
        }
13✔
310

311
        using composites::dft::indexing::IndexResolverUtility;
312
        using composites::dft::indexing::ResolverInput;
313
        using indexer::IndexBatchBuilderUtility;
314
        using indexer::IndexBuildBatchConfig;
315

316
        Runtime *rt = resolve_runtime(self);
26!
317

318
        auto task = [config, output_ptr, rt]() -> CoroTask<void> {
169!
319
            auto resolve_and_build =
39✔
320
                [&config](
169!
321
                    CoroScope &scope, const std::string &path,
322
                    const std::string &index_dir,
323
                    std::vector<std::string> &out_files) -> CoroTask<void> {
13!
324
                IndexResolverUtility resolver;
13!
325
                ResolverInput resolve_input;
13✔
326
                resolve_input.index_dir = index_dir;
13!
327
                resolve_input.require_checkpoints = !config.force_rebuild;
13✔
328
                if (fs::is_regular_file(path)) {
13!
329
                    resolve_input.files = {path};
12!
330
                } else {
12✔
331
                    resolve_input.directory = path;
1!
332
                }
333

334
                auto result = co_await resolver.process(resolve_input);
39!
335
                out_files = std::move(result.all_files);
39✔
336

337
                if (out_files.empty() || result.needs_checkpoint.empty()) {
39✔
338
                    co_return;
52✔
339
                }
340

341
                auto batch_cfg = std::make_shared<IndexBuildBatchConfig>();
39!
342
                batch_cfg->file_paths.reserve(result.needs_checkpoint.size());
39✔
343
                for (const auto &item : result.needs_checkpoint) {
27✔
344
                    batch_cfg->file_paths.push_back(item.file_path);
14!
345
                }
14✔
346
                batch_cfg->index_dir = index_dir;
13✔
347
                batch_cfg->checkpoint_size = config.checkpoint_size;
39✔
348
                batch_cfg->parallelism = config.executor_threads;
39✔
349
                batch_cfg->force_rebuild = config.force_rebuild;
39✔
350
                batch_cfg->use_batch_write = true;
39✔
351
                batch_cfg->rebuild_root_summaries = true;
39✔
352

353
                co_await IndexBatchBuilderUtility::process(
91!
354
                    &scope, std::move(batch_cfg));
39✔
355
            };
169!
356

357
            std::vector<std::string> baseline_files;
39✔
358
            std::vector<std::string> variant_files;
39✔
359

360
            bool shared_index =
78✔
361
                composites::dft::internal::determine_index_path(
117!
362
                    config.baseline, config.baseline_index_dir) ==
78✔
363
                composites::dft::internal::determine_index_path(
78!
364
                    config.variant, config.variant_index_dir);
39✔
365

366
            co_await run_coro_scope(
52!
367
                rt->executor(), [&](CoroScope &scope) -> CoroTask<void> {
169!
368
                    if (shared_index) {
39!
369
                        co_await resolve_and_build(scope, config.baseline,
104!
370
                                                   config.baseline_index_dir,
39✔
371
                                                   baseline_files);
39✔
372
                        if (config.baseline == config.variant) {
13!
373
                            variant_files = baseline_files;
13!
374
                        } else {
13✔
375
                            co_await resolve_and_build(scope, config.variant,
×
376
                                                       config.variant_index_dir,
377
                                                       variant_files);
378
                        }
379
                    } else {
13✔
380
                        scope.spawn([&](CoroScope &s) -> CoroTask<void> {
×
381
                            co_await resolve_and_build(
×
382
                                s, config.baseline, config.baseline_index_dir,
383
                                baseline_files);
384
                        });
×
385
                        co_await resolve_and_build(scope, config.variant,
×
386
                                                   config.variant_index_dir,
387
                                                   variant_files);
388
                    }
389
                });
52!
390

391
            if (baseline_files.empty()) {
39!
392
                throw DFTUtilsException(
×
393
                    ErrorCode::NOT_FOUND,
394
                    "No trace files found in baseline: " + config.baseline);
×
395
            }
396
            if (variant_files.empty()) {
39!
397
                throw DFTUtilsException(
×
398
                    ErrorCode::NOT_FOUND,
399
                    "No trace files found in variant: " + config.variant);
×
400
            }
401

402
            output_ptr->baseline_path = config.baseline;
39✔
403
            output_ptr->variant_path = config.variant;
13✔
404

405
            auto start_time = std::chrono::high_resolution_clock::now();
39✔
406

407
            std::size_t b_files_actual = 0;
39✔
408
            std::size_t v_files_actual = 0;
39✔
409
            bool metadata_set = false;
39✔
410

411
            for (auto &node : config.nodes) {
52!
412
                std::vector<const ComparisonNode *> visitors;
39✔
413
                flatten_nodes(node, visitors);
39!
414

415
                std::vector<ComparisonVisitorPair> pairs;
39✔
416
                pairs.reserve(visitors.size());
39!
417

418
                for (const auto *visitor : visitors) {
78!
419
                    std::optional<common::query::Query> query;
39✔
420
                    if (!visitor->composed_query.empty()) {
39✔
421
                        auto result = common::query::Query::from_string(
26!
422
                            visitor->composed_query);
13✔
423
                        if (!result) {
13!
424
                            throw DFTUtilsException(
×
425
                                ErrorCode::QUERY,
426
                                "Invalid query for node '" + visitor->name +
×
427
                                    "': " + result.error().format());
×
428
                        }
429
                        query = std::move(*result);
13!
430
                    }
13✔
431

432
                    AggregationConfig agg_cfg;
39!
433
                    agg_cfg.time_interval_us = static_cast<std::uint64_t>(
39✔
434
                        config.defaults.time_interval_ms * 1000.0);
39✔
435
                    agg_cfg.extra_group_keys = {};
39✔
436
                    agg_cfg.compute_statistics = true;
13✔
437
                    agg_cfg.compute_percentiles = true;
13✔
438
                    agg_cfg.percentiles = visitor->resolved_percentiles;
13✔
439
                    agg_cfg.sketch_accuracy = 0.01;
39✔
440
                    agg_cfg.track_process_parents = false;
39✔
441

442
                    auto [base_result, var_result] = co_await coro::when_all(
91!
443
                        run_aggregation(
78!
444
                            baseline_files, agg_cfg, query,
39!
445
                            config.baseline_index_dir, config.checkpoint_size,
39!
446
                            config.force_rebuild, config.executor_threads),
39✔
447
                        run_aggregation(
78!
448
                            variant_files, agg_cfg, query,
39!
449
                            config.variant_index_dir, config.checkpoint_size,
39!
450
                            config.force_rebuild, config.executor_threads));
39✔
451

452
                    if (!metadata_set) {
13!
453
                        b_files_actual = base_result.total_files_processed;
13✔
454
                        v_files_actual = var_result.total_files_processed;
13✔
455
                        output_ptr->baseline_file_count = b_files_actual;
13✔
456
                        output_ptr->variant_file_count = v_files_actual;
13✔
457
                        output_ptr->baseline_meta = extract_metadata(
26!
458
                            base_result.aggregations, b_files_actual);
13✔
459
                        output_ptr->variant_meta = extract_metadata(
26!
460
                            var_result.aggregations, v_files_actual);
13✔
461
                        metadata_set = true;
13✔
462
                    }
13✔
463

464
                    ComparisonVisitorPair pair;
13✔
465
                    pair.baseline = std::move(base_result);
13✔
466
                    pair.variant = std::move(var_result);
13✔
467
                    pair.node = *visitor;
13!
468
                    pairs.push_back(std::move(pair));
13!
469
                }
13!
470

471
                ComparisonUtilityInput cmp_input;
39✔
472
                cmp_input.visitors = std::move(pairs);
39✔
473
                cmp_input.root_node = node;
39!
474
                cmp_input.baseline_file_count = b_files_actual;
39✔
475
                cmp_input.variant_file_count = v_files_actual;
39✔
476

477
                ComparisonUtility cmp;
39!
478
                auto cmp_output = co_await cmp.process(cmp_input);
52!
479
                output_ptr->nodes.push_back(std::move(cmp_output->result));
13!
480
            }
13!
481

482
            // Inject metadata rows into root SUMMARY.
483
            auto meta_rows = build_metadata_metrics(output_ptr->baseline_meta,
26!
484
                                                    output_ptr->variant_meta);
13✔
485
            for (auto &node : output_ptr->nodes) {
26✔
486
                node.summary.metrics.insert(node.summary.metrics.begin(),
39!
487
                                            meta_rows.begin(), meta_rows.end());
26✔
488
            }
13✔
489

490
            auto end_time = std::chrono::high_resolution_clock::now();
13✔
491
            std::chrono::duration<double, std::milli> duration =
13✔
492
                end_time - start_time;
13!
493
            output_ptr->execution_time_ms = duration.count();
13!
494
        };
338!
495

496
        rt->submit(task(), "comparator").get();
26!
497
    });
52✔
498
}
26✔
499

500
// -----------------------------------------------------------------------
501
// compare() -- returns ArrowTable
502
// -----------------------------------------------------------------------
503

504
static PyObject *Comparator_compare(ComparatorObject *self, PyObject *args,
12✔
505
                                    PyObject *kwds) {
506
    ComparatorArgs cargs;
12✔
507
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
12!
508

509
    ComparisonOutput output;
12✔
510
    if (!run_comparison_pipeline(self, cargs, output)) {
12!
511
        return NULL;
×
512
    }
513

514
#ifdef DFTRACER_UTILS_ENABLE_ARROW
515
    auto arrow_result = output.to_arrow();
12!
516
    if (!arrow_result.valid()) {
12!
517
        PyErr_SetString(PyExc_RuntimeError,
×
518
                        "Failed to convert comparison output "
519
                        "to Arrow");
520
        return NULL;
×
521
    }
522
    return arrow_result_to_table(std::move(arrow_result));
12!
523
#else
524
    PyErr_SetString(PyExc_RuntimeError,
525
                    "dftracer-utils was built without Arrow support");
526
    return NULL;
527
#endif
528
}
12✔
529

530
// -----------------------------------------------------------------------
531
// compare_json() -- returns JSON string
532
// -----------------------------------------------------------------------
533

534
static PyObject *Comparator_compare_json(ComparatorObject *self, PyObject *args,
8✔
535
                                         PyObject *kwds) {
536
    ComparatorArgs cargs;
8✔
537
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
8!
538

539
    ComparisonOutput output;
8✔
540
    if (!run_comparison_pipeline(self, cargs, output)) {
8!
541
        return NULL;
×
542
    }
543

544
    TreeTableFormatter formatter;
8!
545
    std::string json = formatter.render_json(output);
8!
546
    return PyUnicode_FromStringAndSize(json.data(), (Py_ssize_t)json.size());
8!
547
}
8✔
548

549
// -----------------------------------------------------------------------
550
// compare_table() -- returns formatted table string
551
// -----------------------------------------------------------------------
552

553
static PyObject *Comparator_compare_table(ComparatorObject *self,
6✔
554
                                          PyObject *args, PyObject *kwds) {
555
    ComparatorArgs cargs;
6✔
556
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
6!
557

558
    ComparisonOutput output;
6✔
559
    if (!run_comparison_pipeline(self, cargs, output)) {
6!
560
        return NULL;
×
561
    }
562

563
    FormatterOptions fmt_opts;
6✔
564
    fmt_opts.use_color = false;
6✔
565
    fmt_opts.use_unicode = false;
6✔
566
    TreeTableFormatter formatter(fmt_opts);
6!
567

568
    // Render to a temporary file and read back as string
569
    char *buf = NULL;
6✔
570
    std::size_t buf_size = 0;
6✔
571
    FILE *memstream = open_memstream(&buf, &buf_size);
6!
572
    if (!memstream) {
6!
573
        PyErr_SetString(PyExc_RuntimeError, "Failed to create memory stream");
×
574
        return NULL;
×
575
    }
576

577
    formatter.render(memstream, output);
6!
578
    fflush(memstream);
6!
579
    fclose(memstream);
6!
580

581
    PyObject *result = PyUnicode_FromStringAndSize(buf, (Py_ssize_t)buf_size);
6!
582
    free(buf);
6!
583
    return result;
6✔
584
}
6✔
585

586
// -----------------------------------------------------------------------
587
// __call__ delegates to compare()
588
// -----------------------------------------------------------------------
589

590
static PyObject *Comparator_call(PyObject *self, PyObject *args,
2✔
591
                                 PyObject *kwds) {
592
    return Comparator_compare((ComparatorObject *)self, args, kwds);
2✔
593
}
594

595
// -----------------------------------------------------------------------
596
// Method table
597
// -----------------------------------------------------------------------
598

599
static const char *COMPARE_DOC =
600
    "compare(baseline, variant, query='', group_by='',\n"
601
    "        format='table', time_interval_ms=5000.0,\n"
602
    "        threshold=0.0, executor_threads=0,\n"
603
    "        index_dir='', force_rebuild=False, config='')\n"
604
    "--\n"
605
    "\n"
606
    "Run comparison pipeline, return materialized ArrowTable.\n"
607
    "\n"
608
    "Args:\n"
609
    "    baseline (str): Baseline trace file or directory.\n"
610
    "    variant (str): Variant trace file or directory.\n"
611
    "    query (str): Query filter (default: all events).\n"
612
    "    group_by (str): Comma-separated group keys.\n"
613
    "    format (str): Output format (default 'table').\n"
614
    "    time_interval_ms (float): Time bucket in ms "
615
    "(default 5000).\n"
616
    "    threshold (float): Hide changes below this pct.\n"
617
    "    executor_threads (int): Parallel threads (0=auto).\n"
618
    "    index_dir (str): Directory for .dftindex stores.\n"
619
    "    force_rebuild (bool): Force index rebuild.\n"
620
    "    config (str): JSON config file path.\n"
621
    "\n"
622
    "Returns:\n"
623
    "    ArrowTable: Comparison results.\n";
624

625
static const char *COMPARE_JSON_DOC =
626
    "compare_json(baseline, variant, query='', group_by='',\n"
627
    "             format='table', time_interval_ms=5000.0,\n"
628
    "             threshold=0.0, executor_threads=0,\n"
629
    "             index_dir='', force_rebuild=False, "
630
    "config='')\n"
631
    "--\n"
632
    "\n"
633
    "Run comparison pipeline, return JSON string.\n"
634
    "\n"
635
    "Args:\n"
636
    "    baseline (str): Baseline trace file or directory.\n"
637
    "    variant (str): Variant trace file or directory.\n"
638
    "    query (str): Query filter (default: all events).\n"
639
    "    group_by (str): Comma-separated group keys.\n"
640
    "    format (str): Output format (default 'table').\n"
641
    "    time_interval_ms (float): Time bucket in ms "
642
    "(default 5000).\n"
643
    "    threshold (float): Hide changes below this pct.\n"
644
    "    executor_threads (int): Parallel threads (0=auto).\n"
645
    "    index_dir (str): Directory for .dftindex stores.\n"
646
    "    force_rebuild (bool): Force index rebuild.\n"
647
    "    config (str): JSON config file path.\n"
648
    "\n"
649
    "Returns:\n"
650
    "    str: JSON representation of comparison results.\n";
651

652
static const char *COMPARE_TABLE_DOC =
653
    "compare_table(baseline, variant, query='', group_by='',\n"
654
    "              format='table', time_interval_ms=5000.0,\n"
655
    "              threshold=0.0, executor_threads=0,\n"
656
    "              index_dir='', force_rebuild=False, "
657
    "config='')\n"
658
    "--\n"
659
    "\n"
660
    "Run comparison pipeline, return formatted table string.\n"
661
    "\n"
662
    "Args:\n"
663
    "    baseline (str): Baseline trace file or directory.\n"
664
    "    variant (str): Variant trace file or directory.\n"
665
    "    query (str): Query filter (default: all events).\n"
666
    "    group_by (str): Comma-separated group keys.\n"
667
    "    format (str): Output format (default 'table').\n"
668
    "    time_interval_ms (float): Time bucket in ms "
669
    "(default 5000).\n"
670
    "    threshold (float): Hide changes below this pct.\n"
671
    "    executor_threads (int): Parallel threads (0=auto).\n"
672
    "    index_dir (str): Directory for .dftindex stores.\n"
673
    "    force_rebuild (bool): Force index rebuild.\n"
674
    "    config (str): JSON config file path.\n"
675
    "\n"
676
    "Returns:\n"
677
    "    str: Formatted ASCII table of comparison results.\n";
678

679
static PyMethodDef Comparator_methods[] = {
1✔
680
    {"compare", (PyCFunction)Comparator_compare, METH_VARARGS | METH_KEYWORDS,
2✔
681
     COMPARE_DOC},
1✔
682
    {"compare_json", (PyCFunction)Comparator_compare_json,
2✔
683
     METH_VARARGS | METH_KEYWORDS, COMPARE_JSON_DOC},
1✔
684
    {"compare_table", (PyCFunction)Comparator_compare_table,
2✔
685
     METH_VARARGS | METH_KEYWORDS, COMPARE_TABLE_DOC},
1✔
686
    {NULL}};
1✔
687

688
PyTypeObject ComparatorType = {
689
    PyVarObject_HEAD_INIT(
690
        NULL, 0) "dftracer_utils_ext.ComparatorUtility", /* tp_name */
691
    sizeof(ComparatorObject),                            /* tp_basicsize */
692
    0,                                                   /* tp_itemsize */
693
    (destructor)Comparator_dealloc,                      /* tp_dealloc */
694
    0,                                        /* tp_vectorcall_offset */
695
    0,                                        /* tp_getattr */
696
    0,                                        /* tp_setattr */
697
    0,                                        /* tp_as_async */
698
    0,                                        /* tp_repr */
699
    0,                                        /* tp_as_number */
700
    0,                                        /* tp_as_sequence */
701
    0,                                        /* tp_as_mapping */
702
    0,                                        /* tp_hash */
703
    Comparator_call,                          /* tp_call */
704
    0,                                        /* tp_str */
705
    0,                                        /* tp_getattro */
706
    0,                                        /* tp_setattro */
707
    0,                                        /* tp_as_buffer */
708
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
709
    "ComparatorUtility(runtime: Runtime | None = None)\n"
710
    "--\n\n"
711
    "Compare DFTracer trace metrics between baseline and "
712
    "variant.\n\n"
713
    "Args:\n"
714
    "    runtime (Runtime or None): Runtime for thread pool "
715
    "control.\n"
716
    "        If None, uses the default global Runtime.\n\n"
717
    "compare(baseline, variant, ...) -> ArrowTable\n"
718
    "    Run comparison and return a materialized Arrow "
719
    "table.\n\n"
720
    "compare_json(baseline, variant, ...) -> str\n"
721
    "    Run comparison and return JSON string.\n\n"
722
    "compare_table(baseline, variant, ...) -> str\n"
723
    "    Run comparison and return formatted table "
724
    "string.\n",               /* tp_doc */
725
    0,                         /* tp_traverse */
726
    0,                         /* tp_clear */
727
    0,                         /* tp_richcompare */
728
    0,                         /* tp_weaklistoffset */
729
    0,                         /* tp_iter */
730
    0,                         /* tp_iternext */
731
    Comparator_methods,        /* tp_methods */
732
    0,                         /* tp_members */
733
    0,                         /* tp_getset */
734
    0,                         /* tp_base */
735
    0,                         /* tp_dict */
736
    0,                         /* tp_descr_get */
737
    0,                         /* tp_descr_set */
738
    0,                         /* tp_dictoffset */
739
    (initproc)Comparator_init, /* tp_init */
740
    0,                         /* tp_alloc */
741
    Comparator_new,            /* tp_new */
742
};
743

744
int init_comparator(PyObject *m) {
2✔
745
    if (register_type(m, &ComparatorType, "ComparatorUtility") < 0) return -1;
2✔
746

747
    return 0;
2✔
748
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc