• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23561935370

25 Mar 2026 08:15PM UTC coverage: 51.009% (+0.8%) from 50.205%
23561935370

Pull #57

github

web-flow
Merge f8aaf6b1d into f837e5ce7
Pull Request #57: feat(comparator): add pairwise traces comparator

22032 of 55983 branches covered (39.35%)

Branch coverage included in aggregate %.

1642 of 1888 new or added lines in 26 files covered. (86.97%)

6 existing lines in 3 files now uncovered.

19366 of 25175 relevant lines covered (76.93%)

439564.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.62
/src/dftracer/utils/python/utilities/comparator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <dftracer/utils/core/common/filesystem.h>
3
#include <dftracer/utils/core/common/platform_compat.h>
4
#include <dftracer/utils/core/coro/channel.h>
5
#include <dftracer/utils/core/coro/task.h>
6
#include <dftracer/utils/core/coro/when_all.h>
7
#include <dftracer/utils/core/pipeline/pipeline.h>
8
#include <dftracer/utils/core/pipeline/pipeline_config.h>
9
#include <dftracer/utils/core/runtime.h>
10
#include <dftracer/utils/core/tasks/coro_scope.h>
11
#include <dftracer/utils/core/tasks/task.h>
12
#include <dftracer/utils/python/arrow_helpers.h>
13
#include <dftracer/utils/python/runtime.h>
14
#include <dftracer/utils/python/utilities/comparator.h>
15
#include <dftracer/utils/utilities/common/query/query.h>
16
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregators.h>
17
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_config.h>
18
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_result.h>
19
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_utility.h>
20
#include <dftracer/utils/utilities/composites/dft/comparator/tree_table_formatter.h>
21
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
22
#include <dftracer/utils/utilities/composites/dft/metadata_collector_utility.h>
23
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
24
#include <dftracer/utils/utilities/indexer/index_builder_utility.h>
25
#include <dftracer/utils/utilities/indexer/internal/indexer.h>
26

27
#include <atomic>
28
#include <chrono>
29
#include <cstdio>
30
#include <ctime>
31
#include <string>
32
#include <thread>
33
#include <unordered_set>
34
#include <vector>
35

36
using dftracer::utils::Runtime;
37
using dftracer::utils::coro::CoroTask;
38
using namespace dftracer::utils;
39
using namespace dftracer::utils::utilities;
40
using namespace dftracer::utils::utilities::composites::dft::aggregators;
41
using namespace dftracer::utils::utilities::composites::dft::comparator;
42

43
#ifdef DFTRACER_UTILS_ENABLE_ARROW
44
using dftracer::utils::python::arrow_result_to_table;
45
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
46
#endif
47

48
static Runtime *get_runtime(ComparatorObject *self) {
26✔
49
    if (self->runtime_obj)
26!
NEW
50
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
×
51
    return get_default_runtime();
26✔
52
}
13✔
53

54
static void Comparator_dealloc(ComparatorObject *self) {
26✔
55
    Py_XDECREF(self->runtime_obj);
26✔
56
    Py_TYPE(self)->tp_free((PyObject *)self);
26✔
57
}
26✔
58

59
static PyObject *Comparator_new(PyTypeObject *type, PyObject *args,
26✔
60
                                PyObject *kwds) {
61
    ComparatorObject *self = (ComparatorObject *)type->tp_alloc(type, 0);
26✔
62
    if (self) {
26✔
63
        self->runtime_obj = NULL;
26✔
64
    }
13✔
65
    return (PyObject *)self;
26✔
66
}
67

68
static int Comparator_init(ComparatorObject *self, PyObject *args,
26✔
69
                           PyObject *kwds) {
70
    static const char *kwlist[] = {"runtime", NULL};
71
    PyObject *runtime_arg = NULL;
26✔
72

73
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char **)kwlist,
26!
74
                                     &runtime_arg)) {
NEW
75
        return -1;
×
76
    }
77

78
    if (runtime_arg && runtime_arg != Py_None) {
26!
NEW
79
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
NEW
80
            Py_INCREF(runtime_arg);
×
NEW
81
            self->runtime_obj = runtime_arg;
×
82
        } else {
NEW
83
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
NEW
84
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
×
NEW
85
                self->runtime_obj = native;
×
86
            } else {
NEW
87
                Py_XDECREF(native);
×
NEW
88
                PyErr_SetString(PyExc_TypeError,
×
89
                                "runtime must be a Runtime instance or None");
NEW
90
                return -1;
×
91
            }
92
        }
93
    }
94

95
    return 0;
26✔
96
}
13✔
97

98
// -----------------------------------------------------------------------
99
// Helpers
100
// -----------------------------------------------------------------------
101

102
struct ComparatorArgs {
39!
103
    std::string baseline;
104
    std::string variant;
105
    std::string query;
106
    std::string group_by;
107
    std::string format;
108
    double time_interval_ms = 5000.0;
13✔
109
    double threshold = 0.0;
13✔
110
    std::size_t executor_threads = 0;
13✔
111
    std::string index_dir;
112
    bool force_rebuild = false;
13✔
113
    std::string config_path;
114
};
115

116
static int parse_comparator_args(PyObject *args, PyObject *kwds,
26✔
117
                                 ComparatorArgs &out) {
118
    static const char *kwlist[] = {
119
        "baseline",  "variant",          "query",     "group_by",
120
        "format",    "time_interval_ms", "threshold", "executor_threads",
121
        "index_dir", "force_rebuild",    "config",    NULL};
122

123
    const char *baseline = NULL;
26✔
124
    const char *variant = NULL;
26✔
125
    const char *query = "";
26✔
126
    const char *group_by = "";
26✔
127
    const char *format = "table";
26✔
128
    double time_interval_ms = 5000.0;
26✔
129
    double threshold = 0.0;
26✔
130
    Py_ssize_t executor_threads = 0;
26✔
131
    const char *index_dir = "";
26✔
132
    int force_rebuild = 0;
26✔
133
    const char *config = "";
26✔
134

135
    if (!PyArg_ParseTupleAndKeywords(
26!
136
            args, kwds, "ss|sssddnsps", (char **)kwlist, &baseline, &variant,
13✔
137
            &query, &group_by, &format, &time_interval_ms, &threshold,
138
            &executor_threads, &index_dir, &force_rebuild, &config))
NEW
139
        return -1;
×
140

141
    out.baseline = baseline;
26!
142
    out.variant = variant;
26!
143
    out.query = query;
26!
144
    out.group_by = group_by;
26!
145
    out.format = format;
26!
146
    out.time_interval_ms = time_interval_ms;
26✔
147
    out.threshold = threshold;
26✔
148
    out.executor_threads = static_cast<std::size_t>(executor_threads);
26✔
149
    out.index_dir = index_dir;
26!
150
    out.force_rebuild = force_rebuild != 0;
26✔
151
    out.config_path = config;
26!
152

153
    return 0;
26✔
154
}
13✔
155

156
namespace {
157

158
void flatten_nodes(const ComparisonNode &node,
26✔
159
                   std::vector<const ComparisonNode *> &out) {
160
    out.push_back(&node);
26!
161
    for (const auto &child : node.children) {
26!
NEW
162
        flatten_nodes(child, out);
×
163
    }
164
}
26✔
165

166
CoroTask<EventAggregatorUtilityOutput> run_aggregation(
156!
167
    std::vector<std::string> input_files, AggregationConfig agg_config,
168
    std::optional<common::query::Query> query, std::string index_dir,
169
    std::size_t checkpoint_size, bool force_rebuild,
170
    std::size_t executor_threads) {
26!
171
    constexpr std::size_t CHUNK_SIZE_MB = 4;
26✔
172
    constexpr std::size_t BATCH_SIZE_MB = 4;
26✔
173

174
    auto pipeline_config = PipelineConfig()
52!
175
                               .with_name("DFTracer Comparator Aggregation")
26!
176
                               .with_compute_threads(executor_threads)
26!
177
                               .with_watchdog(false);
26!
178
    Pipeline pipeline(pipeline_config);
26!
179

180
    EventAggregatorUtility merger;
26!
181
    std::atomic<int> global_chunk_idx{0};
26✔
182

183
    auto streaming_task = make_task(
26!
184
        [&](CoroScope &ctx) -> CoroTask<void> {
234!
185
            auto chunk_chan = coro::make_channel<ChunkAggregatorInput>(0);
78!
186
            auto result_chan = coro::make_channel<ChunkAggregationOutput>(8);
78!
187

188
            co_await ctx.scope([&](CoroScope &scope) -> CoroTask<void> {
286!
189
                for (const auto &file_path : input_files) {
54✔
190
                    auto *global_chunk_idx_ptr = &global_chunk_idx;
28✔
191
                    scope.spawn([file_path, ch = chunk_chan->producer(),
476!
192
                                 index_dir, checkpoint_size, force_rebuild,
28!
193
                                 agg_config, query, global_chunk_idx_ptr](
28!
194
                                    CoroScope & /*fctx*/) mutable
195
                                    -> CoroTask<void> {
28!
196
                        [[maybe_unused]] auto producer_guard = ch.guard();
84!
197

198
                        std::string idx_path =
84✔
199
                            composites::dft::internal::determine_index_path(
84!
200
                                file_path, index_dir);
84✔
201

202
                        auto meta_input =
84✔
203
                            composites::dft::MetadataCollectorUtilityInput::
168!
204
                                from_file(file_path)
84!
205
                                    .with_checkpoint_size(checkpoint_size)
84✔
206
                                    .with_force_rebuild(force_rebuild)
28!
207
                                    .with_index(idx_path);
28✔
208
                        auto metadata =
84✔
209
                            co_await composites::dft::MetadataCollectorUtility{}
224!
210
                                .process(meta_input);
84!
211

212
                        if (!metadata.success) {
84!
213
                            co_return;
214
                        }
215

216
                        FileChunkMapperUtility file_mapper;
84!
217
                        auto mapper_input =
84✔
218
                            FileChunkMapperInput::from_metadata(metadata)
168!
219
                                .with_config(agg_config)
84✔
220
                                .with_checkpoint_size(checkpoint_size)
28!
221
                                .with_target_chunk_size(CHUNK_SIZE_MB)
28!
222
                                .with_batch_size(BATCH_SIZE_MB * 1024 * 1024);
28!
223
                        mapper_input.query = query;
84!
224
                        auto file_chunks =
84✔
225
                            co_await file_mapper.process(mapper_input);
112!
226

227
                        int start_idx = global_chunk_idx_ptr->fetch_add(
168✔
228
                            static_cast<int>(file_chunks.size()));
84✔
229
                        for (int i = 0;
112✔
230
                             i < static_cast<int>(file_chunks.size()); ++i) {
112✔
231
                            file_chunks[i].chunk_index = start_idx + i;
28✔
232
                        }
28✔
233

234
                        for (auto &chunk : file_chunks) {
112!
235
                            if (!co_await ch.send(std::move(chunk))) {
112!
236
                                co_return;
237
                            }
238
                        }
28!
239
                        co_return;
28✔
240
                    });
756!
241
                }
28✔
242

243
                for (std::size_t w = 0; w < executor_threads; ++w) {
104✔
244
                    (void)w;
245
                    scope.spawn([chunk_chan, rp = result_chan->producer(),
789!
246
                                 result_chan](
78✔
247
                                    CoroScope &wctx) mutable -> CoroTask<void> {
75✔
248
                        [[maybe_unused]] auto producer_guard = rp.guard();
68!
249
                        while (auto input = co_await wctx.receive(chunk_chan)) {
499!
250
                            ChunkAggregatorUtility agg;
84!
251
                            auto output = co_await agg.process(*input);
112!
252
                            if (!co_await result_chan->send(
112!
253
                                    std::move(output))) {
254
                                co_return;
255
                            }
256
                        }
218✔
257
                        co_return;
78✔
258
                    });
985!
259
                }
78✔
260

261
                auto *merger_ptr = &merger;
26✔
262
                scope.spawn([result_chan,
320!
263
                             merger_ptr](CoroScope &mctx) -> CoroTask<void> {
52!
264
                    while (auto output = co_await mctx.receive(result_chan)) {
164!
265
                        merger_ptr->merge_chunk(std::move(*output));
28!
266
                    }
54✔
267
                    co_return;
26✔
268
                });
164!
269

270
                co_return;
52✔
271
            });
78!
272

273
            co_return;
26✔
274
        },
130!
275
        "StreamingAggregate");
26!
276

277
    EventAggregatorUtilityOutput result;
26✔
278
    auto post_task = make_task(
26!
279
        [&](CoroScope & /*ctx*/) -> CoroTask<bool> {
182!
280
            result = merger.finalize();
26!
281
            co_return result.success;
52!
282
        },
52!
283
        "Finalize");
26!
284

285
    post_task->depends_on(streaming_task);
26!
286
    pipeline.set_source(streaming_task);
26!
287
    pipeline.set_destination(post_task);
26!
288
    pipeline.execute();
26!
289

290
    co_return result;
52!
291
}
78!
292

293
}  // namespace
294

295
static int run_comparison_pipeline(ComparatorObject *self,
26✔
296
                                   const ComparatorArgs &cargs,
297
                                   ComparisonOutput &output,
298
                                   std::string &error_msg) {
299
    ComparatorArgs args_copy = cargs;
26!
300
    auto *output_ptr = &output;
26✔
301

302
    Py_BEGIN_ALLOW_THREADS try {
26!
303
        ComparisonConfig config;
26!
304
        if (!args_copy.config_path.empty()) {
26✔
NEW
305
            std::string parse_error;
×
306
            auto parsed = ComparisonConfig::from_json_file(
×
NEW
307
                args_copy.config_path, parse_error);
×
NEW
308
            if (!parsed) {
×
NEW
309
                error_msg = "Config error: " + parse_error;
×
NEW
310
                goto done;
×
311
            }
NEW
312
            config = std::move(*parsed);
×
NEW
313
        } else {
×
314
            config = ComparisonConfig::from_cli(
39!
315
                args_copy.baseline, args_copy.variant, args_copy.query,
13✔
316
                args_copy.group_by);
26✔
317
        }
318

319
        config.format = args_copy.format;
26!
320
        config.no_color = true;
26✔
321
        if (args_copy.executor_threads > 0)
26✔
NEW
322
            config.executor_threads = args_copy.executor_threads;
×
323
        if (!args_copy.index_dir.empty())
26!
NEW
324
            config.index_dir = args_copy.index_dir;
×
325
        if (args_copy.force_rebuild)
26✔
NEW
326
            config.force_rebuild = args_copy.force_rebuild;
×
327
        if (args_copy.threshold > 0.0)
26✔
NEW
328
            config.defaults.threshold_pct = args_copy.threshold;
×
329
        if (args_copy.time_interval_ms > 0.0)
26✔
330
            config.defaults.time_interval_ms = args_copy.time_interval_ms;
26✔
331

332
        config.resolve();
26!
333

334
        if (config.executor_threads == 0) {
26!
335
            config.executor_threads = dftracer_utils_hardware_concurrency();
26!
336
        }
13✔
337
        if (config.checkpoint_size == 0) {
26✔
338
            config.checkpoint_size =
26✔
339
                indexer::internal::Indexer::DEFAULT_CHECKPOINT_SIZE;
340
        }
13✔
341

342
        // Create temp index dir if needed
343
        std::string temp_index_dir;
26✔
344
        if (config.index_dir.empty()) {
26!
345
            auto temp_path = fs::temp_directory_path();
26!
346
            temp_path /=
13!
347
                "dftracer_cmp_py_" + std::to_string(std::time(nullptr)) + "_" +
52!
348
                std::to_string(static_cast<int>(
39!
349
                    std::hash<std::thread::id>{}(std::this_thread::get_id())));
39!
350
            temp_index_dir = temp_path.string();
26!
351
            fs::create_directories(temp_index_dir);
26!
352
            config.index_dir = temp_index_dir;
26!
353
        }
26✔
354

355
        // Enumerate files
356
        auto enumerate_files =
13✔
357
            [](std::string path) -> CoroTask<std::vector<std::string>> {
160!
358
            std::vector<std::string> files;
30✔
359
            if (fs::is_regular_file(path)) {
30!
360
                files.push_back(path);
24!
361
                co_return files;
50!
362
            }
363
            filesystem::PatternDirectoryScannerUtility scanner;
6!
364
            filesystem::PatternDirectoryScannerUtilityInput scan_input{
12!
365
                path, {".pfw", ".pfw.gz"}, false};
6!
366
            auto entries = co_await scanner.process(scan_input);
8!
367
            files.reserve(entries.size());
2!
368
            for (const auto &e : entries) {
6✔
369
                files.push_back(e.path.string());
4!
370
            }
4✔
371
            co_return files;
2!
372
        };
90!
373

374
        Runtime *rt = get_runtime(self);
26!
375

376
        auto *error_msg_ptr = &error_msg;
26✔
377
        auto task = [config, output_ptr, enumerate_files,
247!
378
                     error_msg_ptr]() -> CoroTask<void> {
26!
379
            auto baseline_files = co_await enumerate_files(config.baseline);
52!
380
            auto variant_files = co_await enumerate_files(config.variant);
52!
381

382
            if (baseline_files.empty()) {
39!
383
                *error_msg_ptr =
384
                    "No trace files found in baseline: " + config.baseline;
×
385
                co_return;
386
            }
387
            if (variant_files.empty()) {
39!
388
                *error_msg_ptr =
389
                    "No trace files found in variant: " + config.variant;
×
390
                co_return;
391
            }
392

393
            // Build indexes upfront
394
            {
395
                std::unordered_set<std::string> seen;
39✔
396
                std::vector<std::string> all_files;
39✔
397
                for (const auto &f : baseline_files) {
53✔
398
                    if (seen.insert(f).second) all_files.push_back(f);
14!
399
                }
14✔
400
                for (const auto &f : variant_files) {
53✔
401
                    if (seen.insert(f).second) all_files.push_back(f);
14!
402
                }
14✔
403
                std::vector<indexer::IndexBuildConfig> idx_configs;
39✔
404
                idx_configs.reserve(all_files.size());
39!
405
                for (const auto &file_path : all_files) {
53✔
406
                    idx_configs.push_back(
14!
407
                        indexer::IndexBuildConfig::for_file(file_path)
14!
408
                            .with_checkpoint_size(config.checkpoint_size)
14!
409
                            .with_force_rebuild(config.force_rebuild)
14!
410
                            .with_index_dir(config.index_dir));
14!
411
                }
14✔
412
                std::vector<CoroTask<indexer::IndexBuildResult>> idx_tasks;
39✔
413
                idx_tasks.reserve(idx_configs.size());
39!
414
                for (const auto &cfg : idx_configs) {
53✔
415
                    idx_tasks.push_back(
14!
416
                        indexer::IndexBuilderUtility{}.process(cfg));
14!
417
                }
14✔
418
                co_await coro::when_all(std::move(idx_tasks));
52!
419
            }
13✔
420

421
            output_ptr->baseline_path = config.baseline;
39✔
422
            output_ptr->variant_path = config.variant;
13✔
423
            output_ptr->baseline_file_count = baseline_files.size();
39✔
424
            output_ptr->variant_file_count = variant_files.size();
39✔
425

426
            auto start_time = std::chrono::high_resolution_clock::now();
39✔
427

428
            for (auto &node : config.nodes) {
52!
429
                std::vector<const ComparisonNode *> visitors;
39✔
430
                flatten_nodes(node, visitors);
39!
431

432
                std::vector<ComparisonVisitorPair> pairs;
39✔
433
                pairs.reserve(visitors.size());
39!
434

435
                for (const auto *visitor : visitors) {
78!
436
                    std::optional<common::query::Query> query;
39✔
437
                    if (!visitor->composed_query.empty()) {
39✔
438
                        auto result = common::query::Query::from_string(
26!
439
                            visitor->composed_query);
13✔
440
                        if (!result) {
13!
441
                            *error_msg_ptr = "Invalid query for node '" +
×
442
                                             visitor->name +
×
443
                                             "': " + result.error().format();
×
444
                            co_return;
445
                        }
446
                        query = std::move(*result);
13!
447
                    }
13!
448

449
                    AggregationConfig agg_cfg;
39!
450
                    agg_cfg.time_interval_us = static_cast<std::uint64_t>(
39✔
451
                        config.defaults.time_interval_ms * 1000.0);
39✔
452
                    agg_cfg.extra_group_keys = {};
39✔
453
                    agg_cfg.compute_statistics = true;
13✔
454
                    agg_cfg.compute_percentiles = true;
13✔
455
                    agg_cfg.percentiles = visitor->resolved_percentiles;
13✔
456
                    agg_cfg.sketch_accuracy = 0.01;
39✔
457
                    agg_cfg.track_process_parents = false;
39✔
458

459
                    auto [base_result, var_result] = co_await coro::when_all(
117!
460
                        run_aggregation(
78!
461
                            baseline_files, agg_cfg, query, config.index_dir,
39!
462
                            config.checkpoint_size, config.force_rebuild,
39✔
463
                            config.executor_threads),
39✔
464
                        run_aggregation(
78!
465
                            variant_files, agg_cfg, query, config.index_dir,
39!
466
                            config.checkpoint_size, config.force_rebuild,
39✔
467
                            config.executor_threads));
39✔
468

469
                    if (pairs.empty()) {
13!
470
                        output_ptr->baseline_meta = extract_metadata(
26!
471
                            base_result.aggregations, baseline_files.size());
13✔
472
                        output_ptr->variant_meta = extract_metadata(
26!
473
                            var_result.aggregations, variant_files.size());
13✔
474
                    }
13✔
475

476
                    ComparisonVisitorPair pair;
13✔
477
                    pair.baseline = std::move(base_result);
13✔
478
                    pair.variant = std::move(var_result);
13✔
479
                    pair.node = *visitor;
13!
480
                    pairs.push_back(std::move(pair));
13!
481
                }
13!
482

483
                ComparisonUtilityInput cmp_input;
39✔
484
                cmp_input.visitors = std::move(pairs);
39✔
485
                cmp_input.root_node = node;
39!
486
                cmp_input.baseline_file_count = baseline_files.size();
39✔
487
                cmp_input.variant_file_count = variant_files.size();
39✔
488

489
                ComparisonUtility cmp;
39!
490
                auto cmp_output = co_await cmp.process(cmp_input);
52!
491
                output_ptr->nodes.push_back(std::move(cmp_output.result));
13!
492
            }
13!
493

494
            // Inject metadata rows into root SUMMARY.
495
            auto meta_rows = build_metadata_metrics(output_ptr->baseline_meta,
26!
496
                                                    output_ptr->variant_meta);
13✔
497
            for (auto &node : output_ptr->nodes) {
26✔
498
                node.summary.metrics.insert(node.summary.metrics.begin(),
39!
499
                                            meta_rows.begin(), meta_rows.end());
26✔
500
            }
13✔
501

502
            auto end_time = std::chrono::high_resolution_clock::now();
13✔
503
            std::chrono::duration<double, std::milli> duration =
13✔
504
                end_time - start_time;
13!
505
            output_ptr->execution_time_ms = duration.count();
13!
506
        };
416!
507

508
        rt->submit(task(), "comparator").get();
26!
509

510
        // Clean up temp index dir
511
        if (!temp_index_dir.empty() && fs::exists(temp_index_dir)) {
39!
512
            fs::remove_all(temp_index_dir);
26!
513
        }
13✔
514
    } catch (const std::exception &e) {
26!
NEW
515
        error_msg = e.what();
×
516
    }
13!
517
done:
13✔
518
    Py_END_ALLOW_THREADS
26!
519

520
        return error_msg.empty()
26✔
521
        ? 0
13!
522
        : -1;
13✔
523
}
26✔
524

525
// -----------------------------------------------------------------------
526
// compare() -- returns ArrowTable
527
// -----------------------------------------------------------------------
528

529
static PyObject *Comparator_compare(ComparatorObject *self, PyObject *args,
12✔
530
                                    PyObject *kwds) {
531
    ComparatorArgs cargs;
12✔
532
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
12!
533

534
    ComparisonOutput output;
12✔
535
    std::string error_msg;
12✔
536
    if (run_comparison_pipeline(self, cargs, output, error_msg) < 0) {
12!
NEW
537
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
538
        return NULL;
×
539
    }
540

541
#ifdef DFTRACER_UTILS_ENABLE_ARROW
542
    auto arrow_result = output.to_arrow();
12!
543
    if (!arrow_result.valid()) {
12!
NEW
544
        PyErr_SetString(PyExc_RuntimeError,
×
545
                        "Failed to convert comparison output "
546
                        "to Arrow");
NEW
547
        return NULL;
×
548
    }
549
    return arrow_result_to_table(std::move(arrow_result));
12!
550
#else
551
    PyErr_SetString(PyExc_RuntimeError,
552
                    "dftracer-utils was built without Arrow support");
553
    return NULL;
554
#endif
555
}
12✔
556

557
// -----------------------------------------------------------------------
558
// compare_json() -- returns JSON string
559
// -----------------------------------------------------------------------
560

561
static PyObject *Comparator_compare_json(ComparatorObject *self, PyObject *args,
8✔
562
                                         PyObject *kwds) {
563
    ComparatorArgs cargs;
8✔
564
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
8!
565

566
    ComparisonOutput output;
8✔
567
    std::string error_msg;
8✔
568
    if (run_comparison_pipeline(self, cargs, output, error_msg) < 0) {
8!
NEW
569
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
570
        return NULL;
×
571
    }
572

573
    TreeTableFormatter formatter;
8!
574
    std::string json = formatter.render_json(output);
8!
575
    return PyUnicode_FromStringAndSize(json.data(), (Py_ssize_t)json.size());
8!
576
}
8✔
577

578
// -----------------------------------------------------------------------
579
// compare_table() -- returns formatted table string
580
// -----------------------------------------------------------------------
581

582
static PyObject *Comparator_compare_table(ComparatorObject *self,
6✔
583
                                          PyObject *args, PyObject *kwds) {
584
    ComparatorArgs cargs;
6✔
585
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
6!
586

587
    ComparisonOutput output;
6✔
588
    std::string error_msg;
6✔
589
    if (run_comparison_pipeline(self, cargs, output, error_msg) < 0) {
6!
NEW
590
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
591
        return NULL;
×
592
    }
593

594
    FormatterOptions fmt_opts;
6✔
595
    fmt_opts.use_color = false;
6✔
596
    fmt_opts.use_unicode = false;
6✔
597
    TreeTableFormatter formatter(fmt_opts);
6!
598

599
    // Render to a temporary file and read back as string
600
    char *buf = NULL;
6✔
601
    std::size_t buf_size = 0;
6✔
602
    FILE *memstream = open_memstream(&buf, &buf_size);
6!
603
    if (!memstream) {
6!
NEW
604
        PyErr_SetString(PyExc_RuntimeError, "Failed to create memory stream");
×
NEW
605
        return NULL;
×
606
    }
607

608
    formatter.render(memstream, output);
6!
609
    fflush(memstream);
6!
610
    fclose(memstream);
6!
611

612
    PyObject *result = PyUnicode_FromStringAndSize(buf, (Py_ssize_t)buf_size);
6!
613
    free(buf);
6!
614
    return result;
6✔
615
}
6✔
616

617
// -----------------------------------------------------------------------
618
// __call__ delegates to compare()
619
// -----------------------------------------------------------------------
620

621
static PyObject *Comparator_call(PyObject *self, PyObject *args,
2✔
622
                                 PyObject *kwds) {
623
    return Comparator_compare((ComparatorObject *)self, args, kwds);
2✔
624
}
625

626
// -----------------------------------------------------------------------
627
// Method table
628
// -----------------------------------------------------------------------
629

630
static const char *COMPARE_DOC =
631
    "compare(baseline, variant, query='', group_by='',\n"
632
    "        format='table', time_interval_ms=5000.0,\n"
633
    "        threshold=0.0, executor_threads=0,\n"
634
    "        index_dir='', force_rebuild=False, config='')\n"
635
    "--\n"
636
    "\n"
637
    "Run comparison pipeline, return materialized ArrowTable.\n"
638
    "\n"
639
    "Args:\n"
640
    "    baseline (str): Baseline trace file or directory.\n"
641
    "    variant (str): Variant trace file or directory.\n"
642
    "    query (str): Query filter (default: all events).\n"
643
    "    group_by (str): Comma-separated group keys.\n"
644
    "    format (str): Output format (default 'table').\n"
645
    "    time_interval_ms (float): Time bucket in ms "
646
    "(default 5000).\n"
647
    "    threshold (float): Hide changes below this pct.\n"
648
    "    executor_threads (int): Parallel threads (0=auto).\n"
649
    "    index_dir (str): Index sidecar directory.\n"
650
    "    force_rebuild (bool): Force index rebuild.\n"
651
    "    config (str): JSON config file path.\n"
652
    "\n"
653
    "Returns:\n"
654
    "    ArrowTable: Comparison results.\n";
655

656
static const char *COMPARE_JSON_DOC =
657
    "compare_json(baseline, variant, query='', group_by='',\n"
658
    "             format='table', time_interval_ms=5000.0,\n"
659
    "             threshold=0.0, executor_threads=0,\n"
660
    "             index_dir='', force_rebuild=False, "
661
    "config='')\n"
662
    "--\n"
663
    "\n"
664
    "Run comparison pipeline, return JSON string.\n"
665
    "\n"
666
    "Args:\n"
667
    "    baseline (str): Baseline trace file or directory.\n"
668
    "    variant (str): Variant trace file or directory.\n"
669
    "    query (str): Query filter (default: all events).\n"
670
    "    group_by (str): Comma-separated group keys.\n"
671
    "    format (str): Output format (default 'table').\n"
672
    "    time_interval_ms (float): Time bucket in ms "
673
    "(default 5000).\n"
674
    "    threshold (float): Hide changes below this pct.\n"
675
    "    executor_threads (int): Parallel threads (0=auto).\n"
676
    "    index_dir (str): Index sidecar directory.\n"
677
    "    force_rebuild (bool): Force index rebuild.\n"
678
    "    config (str): JSON config file path.\n"
679
    "\n"
680
    "Returns:\n"
681
    "    str: JSON representation of comparison results.\n";
682

683
static const char *COMPARE_TABLE_DOC =
684
    "compare_table(baseline, variant, query='', group_by='',\n"
685
    "              format='table', time_interval_ms=5000.0,\n"
686
    "              threshold=0.0, executor_threads=0,\n"
687
    "              index_dir='', force_rebuild=False, "
688
    "config='')\n"
689
    "--\n"
690
    "\n"
691
    "Run comparison pipeline, return formatted table string.\n"
692
    "\n"
693
    "Args:\n"
694
    "    baseline (str): Baseline trace file or directory.\n"
695
    "    variant (str): Variant trace file or directory.\n"
696
    "    query (str): Query filter (default: all events).\n"
697
    "    group_by (str): Comma-separated group keys.\n"
698
    "    format (str): Output format (default 'table').\n"
699
    "    time_interval_ms (float): Time bucket in ms "
700
    "(default 5000).\n"
701
    "    threshold (float): Hide changes below this pct.\n"
702
    "    executor_threads (int): Parallel threads (0=auto).\n"
703
    "    index_dir (str): Index sidecar directory.\n"
704
    "    force_rebuild (bool): Force index rebuild.\n"
705
    "    config (str): JSON config file path.\n"
706
    "\n"
707
    "Returns:\n"
708
    "    str: Formatted ASCII table of comparison results.\n";
709

710
static PyMethodDef Comparator_methods[] = {
1✔
711
    {"compare", (PyCFunction)Comparator_compare, METH_VARARGS | METH_KEYWORDS,
2✔
712
     COMPARE_DOC},
1✔
713
    {"compare_json", (PyCFunction)Comparator_compare_json,
2✔
714
     METH_VARARGS | METH_KEYWORDS, COMPARE_JSON_DOC},
1✔
715
    {"compare_table", (PyCFunction)Comparator_compare_table,
2✔
716
     METH_VARARGS | METH_KEYWORDS, COMPARE_TABLE_DOC},
1✔
717
    {NULL}};
1✔
718

719
PyTypeObject ComparatorType = {
720
    PyVarObject_HEAD_INIT(
721
        NULL, 0) "dftracer_utils_ext.ComparatorUtility", /* tp_name */
722
    sizeof(ComparatorObject),                            /* tp_basicsize */
723
    0,                                                   /* tp_itemsize */
724
    (destructor)Comparator_dealloc,                      /* tp_dealloc */
725
    0,                                        /* tp_vectorcall_offset */
726
    0,                                        /* tp_getattr */
727
    0,                                        /* tp_setattr */
728
    0,                                        /* tp_as_async */
729
    0,                                        /* tp_repr */
730
    0,                                        /* tp_as_number */
731
    0,                                        /* tp_as_sequence */
732
    0,                                        /* tp_as_mapping */
733
    0,                                        /* tp_hash */
734
    Comparator_call,                          /* tp_call */
735
    0,                                        /* tp_str */
736
    0,                                        /* tp_getattro */
737
    0,                                        /* tp_setattro */
738
    0,                                        /* tp_as_buffer */
739
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
740
    "ComparatorUtility(runtime: Runtime | None = None)\n"
741
    "--\n\n"
742
    "Compare DFTracer trace metrics between baseline and "
743
    "variant.\n\n"
744
    "Args:\n"
745
    "    runtime (Runtime or None): Runtime for thread pool "
746
    "control.\n"
747
    "        If None, uses the default global Runtime.\n\n"
748
    "compare(baseline, variant, ...) -> ArrowTable\n"
749
    "    Run comparison and return a materialized Arrow "
750
    "table.\n\n"
751
    "compare_json(baseline, variant, ...) -> str\n"
752
    "    Run comparison and return JSON string.\n\n"
753
    "compare_table(baseline, variant, ...) -> str\n"
754
    "    Run comparison and return formatted table "
755
    "string.\n",               /* tp_doc */
756
    0,                         /* tp_traverse */
757
    0,                         /* tp_clear */
758
    0,                         /* tp_richcompare */
759
    0,                         /* tp_weaklistoffset */
760
    0,                         /* tp_iter */
761
    0,                         /* tp_iternext */
762
    Comparator_methods,        /* tp_methods */
763
    0,                         /* tp_members */
764
    0,                         /* tp_getset */
765
    0,                         /* tp_base */
766
    0,                         /* tp_dict */
767
    0,                         /* tp_descr_get */
768
    0,                         /* tp_descr_set */
769
    0,                         /* tp_dictoffset */
770
    (initproc)Comparator_init, /* tp_init */
771
    0,                         /* tp_alloc */
772
    Comparator_new,            /* tp_new */
773
};
774

775
int init_comparator(PyObject *m) {
2✔
776
    if (PyType_Ready(&ComparatorType) < 0) return -1;
2✔
777

778
    Py_INCREF(&ComparatorType);
1✔
779
    if (PyModule_AddObject(m, "ComparatorUtility",
3!
780
                           (PyObject *)&ComparatorType) < 0) {
2!
781
        Py_DECREF(&ComparatorType);
782
        Py_DECREF(m);
NEW
783
        return -1;
×
784
    }
785

786
    return 0;
2✔
787
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc