• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23531027933

25 Mar 2026 08:05AM UTC coverage: 48.592% (-1.5%) from 50.098%
23531027933

Pull #57

github

web-flow
Merge d1070e289 into 38f9f3616
Pull Request #57: feat(comparator): add pairwise traces comparator

18900 of 49456 branches covered (38.22%)

Branch coverage included in aggregate %.

1604 of 1954 new or added lines in 25 files covered. (82.09%)

3407 existing lines in 135 files now uncovered.

18487 of 27485 relevant lines covered (67.26%)

240991.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.17
/src/dftracer/utils/python/utilities/comparator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <dftracer/utils/core/common/filesystem.h>
3
#include <dftracer/utils/core/common/platform_compat.h>
4
#include <dftracer/utils/core/coro/channel.h>
5
#include <dftracer/utils/core/coro/task.h>
6
#include <dftracer/utils/core/coro/when_all.h>
7
#include <dftracer/utils/core/pipeline/pipeline.h>
8
#include <dftracer/utils/core/pipeline/pipeline_config.h>
9
#include <dftracer/utils/core/runtime.h>
10
#include <dftracer/utils/core/tasks/coro_scope.h>
11
#include <dftracer/utils/core/tasks/task.h>
12
#include <dftracer/utils/python/arrow_helpers.h>
13
#include <dftracer/utils/python/runtime.h>
14
#include <dftracer/utils/python/utilities/comparator.h>
15
#include <dftracer/utils/utilities/common/query/query.h>
16
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregators.h>
17
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_config.h>
18
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_result.h>
19
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_utility.h>
20
#include <dftracer/utils/utilities/composites/dft/comparator/tree_table_formatter.h>
21
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
22
#include <dftracer/utils/utilities/composites/dft/metadata_collector_utility.h>
23
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
24
#include <dftracer/utils/utilities/indexer/index_builder_utility.h>
25
#include <dftracer/utils/utilities/indexer/internal/indexer.h>
26

27
#include <atomic>
28
#include <chrono>
29
#include <cstdio>
30
#include <ctime>
31
#include <string>
32
#include <thread>
33
#include <unordered_set>
34
#include <vector>
35

36
using dftracer::utils::Runtime;
37
using dftracer::utils::coro::CoroTask;
38
using namespace dftracer::utils;
39
using namespace dftracer::utils::utilities;
40
using namespace dftracer::utils::utilities::composites::dft::aggregators;
41
using namespace dftracer::utils::utilities::composites::dft::comparator;
42

43
#ifdef DFTRACER_UTILS_ENABLE_ARROW
44
using dftracer::utils::python::arrow_result_to_table;
45
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
46
#endif
47

48
static Runtime *get_runtime(ComparatorObject *self) {
13✔
49
    if (self->runtime_obj)
13!
NEW
50
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
×
51
    return get_default_runtime();
13✔
52
}
13✔
53

54
static void Comparator_dealloc(ComparatorObject *self) {
13✔
55
    Py_XDECREF(self->runtime_obj);
13✔
56
    Py_TYPE(self)->tp_free((PyObject *)self);
13✔
57
}
13✔
58

59
static PyObject *Comparator_new(PyTypeObject *type, PyObject *args,
13✔
60
                                PyObject *kwds) {
61
    ComparatorObject *self = (ComparatorObject *)type->tp_alloc(type, 0);
13✔
62
    if (self) {
13!
63
        self->runtime_obj = NULL;
13✔
64
    }
13✔
65
    return (PyObject *)self;
13✔
66
}
67

68
static int Comparator_init(ComparatorObject *self, PyObject *args,
13✔
69
                           PyObject *kwds) {
70
    static const char *kwlist[] = {"runtime", NULL};
71
    PyObject *runtime_arg = NULL;
13✔
72

73
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char **)kwlist,
13!
74
                                     &runtime_arg)) {
NEW
75
        return -1;
×
76
    }
77

78
    if (runtime_arg && runtime_arg != Py_None) {
13!
NEW
79
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
NEW
80
            Py_INCREF(runtime_arg);
×
NEW
81
            self->runtime_obj = runtime_arg;
×
NEW
82
        } else {
×
NEW
83
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
NEW
84
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
×
NEW
85
                self->runtime_obj = native;
×
NEW
86
            } else {
×
NEW
87
                Py_XDECREF(native);
×
NEW
88
                PyErr_SetString(PyExc_TypeError,
×
89
                                "runtime must be a Runtime instance or None");
NEW
90
                return -1;
×
91
            }
92
        }
NEW
93
    }
×
94

95
    return 0;
13✔
96
}
13✔
97

98
// -----------------------------------------------------------------------
99
// Helpers
100
// -----------------------------------------------------------------------
101

102
struct ComparatorArgs {
39!
103
    std::string baseline;
104
    std::string variant;
105
    std::string query;
106
    std::string group_by;
107
    std::string format;
108
    double time_interval_ms = 5000.0;
13✔
109
    double threshold = 0.0;
13✔
110
    std::size_t executor_threads = 0;
13✔
111
    std::string index_dir;
112
    bool force_rebuild = false;
13✔
113
    std::string config_path;
114
};
115

116
static int parse_comparator_args(PyObject *args, PyObject *kwds,
13✔
117
                                 ComparatorArgs &out) {
118
    static const char *kwlist[] = {
119
        "baseline",  "variant",          "query",     "group_by",
120
        "format",    "time_interval_ms", "threshold", "executor_threads",
121
        "index_dir", "force_rebuild",    "config",    NULL};
122

123
    const char *baseline = NULL;
13✔
124
    const char *variant = NULL;
13✔
125
    const char *query = "";
13✔
126
    const char *group_by = "";
13✔
127
    const char *format = "table";
13✔
128
    double time_interval_ms = 5000.0;
13✔
129
    double threshold = 0.0;
13✔
130
    Py_ssize_t executor_threads = 0;
13✔
131
    const char *index_dir = "";
13✔
132
    int force_rebuild = 0;
13✔
133
    const char *config = "";
13✔
134

135
    if (!PyArg_ParseTupleAndKeywords(
13!
136
            args, kwds, "ss|sssddnsps", (char **)kwlist, &baseline, &variant,
13✔
137
            &query, &group_by, &format, &time_interval_ms, &threshold,
138
            &executor_threads, &index_dir, &force_rebuild, &config))
NEW
139
        return -1;
×
140

141
    out.baseline = baseline;
13✔
142
    out.variant = variant;
13✔
143
    out.query = query;
13✔
144
    out.group_by = group_by;
13✔
145
    out.format = format;
13✔
146
    out.time_interval_ms = time_interval_ms;
13✔
147
    out.threshold = threshold;
13✔
148
    out.executor_threads = static_cast<std::size_t>(executor_threads);
13✔
149
    out.index_dir = index_dir;
13✔
150
    out.force_rebuild = force_rebuild != 0;
13✔
151
    out.config_path = config;
13✔
152

153
    return 0;
13✔
154
}
13✔
155

156
namespace {
157

158
void flatten_nodes(const ComparisonNode &node,
13✔
159
                   std::vector<const ComparisonNode *> &out) {
160
    out.push_back(&node);
13✔
161
    for (const auto &child : node.children) {
13!
NEW
162
        flatten_nodes(child, out);
×
163
    }
164
}
13✔
165

166
CoroTask<EventAggregatorUtilityOutput> run_aggregation(
130!
167
    std::vector<std::string> input_files, AggregationConfig agg_config,
168
    std::optional<common::query::Query> query, std::string index_dir,
169
    std::size_t checkpoint_size, bool force_rebuild,
170
    std::size_t executor_threads) {
26!
171
    constexpr std::size_t CHUNK_SIZE_MB = 4;
26✔
172
    constexpr std::size_t BATCH_SIZE_MB = 4;
26✔
173

174
    auto pipeline_config = PipelineConfig()
52!
175
                               .with_name("DFTracer Comparator Aggregation")
26!
176
                               .with_compute_threads(executor_threads)
26!
177
                               .with_watchdog(false);
26!
178
    Pipeline pipeline(pipeline_config);
26!
179

180
    EventAggregatorUtility merger;
26!
181
    std::atomic<int> global_chunk_idx{0};
26✔
182

183
    auto streaming_task = make_task(
26!
184
        [&](CoroScope &ctx) -> CoroTask<void> {
208!
185
            auto chunk_chan = coro::make_channel<ChunkAggregatorInput>(0);
78!
186
            auto result_chan = coro::make_channel<ChunkAggregationOutput>(8);
78!
187

188
            co_await ctx.scope([&](CoroScope &scope) -> CoroTask<void> {
260!
189
                for (const auto &file_path : input_files) {
54✔
190
                    auto *global_chunk_idx_ptr = &global_chunk_idx;
28✔
191
                    scope.spawn([file_path, ch = chunk_chan->producer(),
448!
192
                                 index_dir, checkpoint_size, force_rebuild,
28!
193
                                 agg_config, query, global_chunk_idx_ptr](
28!
194
                                    CoroScope & /*fctx*/) mutable
195
                                    -> CoroTask<void> {
28!
196
                        [[maybe_unused]] auto producer_guard = ch.guard();
84!
197

198
                        std::string idx_path =
84✔
199
                            composites::dft::internal::determine_index_path(
84!
200
                                file_path, index_dir);
84✔
201

202
                        auto meta_input =
84✔
203
                            composites::dft::MetadataCollectorUtilityInput::
168!
204
                                from_file(file_path)
84!
205
                                    .with_checkpoint_size(checkpoint_size)
84✔
206
                                    .with_force_rebuild(force_rebuild)
28!
207
                                    .with_index(idx_path);
28✔
208
                        auto metadata =
84✔
209
                            co_await composites::dft::MetadataCollectorUtility{}
224!
210
                                .process(meta_input);
84!
211

212
                        if (!metadata.success) {
84!
NEW
213
                            co_return;
×
214
                        }
215

216
                        FileChunkMapperUtility file_mapper;
84!
217
                        auto mapper_input =
84✔
218
                            FileChunkMapperInput::from_metadata(metadata)
168!
219
                                .with_config(agg_config)
84✔
220
                                .with_checkpoint_size(checkpoint_size)
28!
221
                                .with_target_chunk_size(CHUNK_SIZE_MB)
28!
222
                                .with_batch_size(BATCH_SIZE_MB * 1024 * 1024);
28!
223
                        mapper_input.query = query;
84!
224
                        auto file_chunks =
84✔
225
                            co_await file_mapper.process(mapper_input);
112!
226

227
                        int start_idx = global_chunk_idx_ptr->fetch_add(
168✔
228
                            static_cast<int>(file_chunks.size()));
84✔
229
                        for (int i = 0;
112✔
230
                             i < static_cast<int>(file_chunks.size()); ++i) {
112✔
231
                            file_chunks[i].chunk_index = start_idx + i;
28✔
232
                        }
28✔
233

234
                        for (auto &chunk : file_chunks) {
112!
235
                            if (!co_await ch.send(std::move(chunk))) {
112!
NEW
236
                                co_return;
×
237
                            }
238
                        }
28!
239
                        co_return;
28✔
240
                    });
700✔
241
                }
28✔
242

243
                for (std::size_t w = 0; w < executor_threads; ++w) {
104✔
244
                    (void)w;
245
                    scope.spawn([chunk_chan, rp = result_chan->producer(),
668!
246
                                 result_chan](
78✔
247
                                    CoroScope &wctx) mutable -> CoroTask<void> {
78✔
248
                        [[maybe_unused]] auto producer_guard = rp.guard();
78!
249
                        while (auto input = co_await wctx.receive(chunk_chan)) {
502!
250
                            ChunkAggregatorUtility agg;
84!
251
                            auto output = co_await agg.process(*input);
112!
252
                            if (!co_await result_chan->send(
112!
253
                                    std::move(output))) {
NEW
254
                                co_return;
×
255
                            }
256
                        }
218✔
257
                        co_return;
78✔
258
                    });
770✔
259
                }
78✔
260

261
                auto *merger_ptr = &merger;
26✔
262
                scope.spawn([result_chan,
290!
263
                             merger_ptr](CoroScope &mctx) -> CoroTask<void> {
52!
264
                    while (auto output = co_await mctx.receive(result_chan)) {
161!
265
                        merger_ptr->merge_chunk(std::move(*output));
28!
266
                    }
54✔
267
                    co_return;
26✔
268
                });
108✔
269

270
                co_return;
52✔
271
            });
26✔
272

273
            co_return;
26✔
274
        },
78!
275
        "StreamingAggregate");
26!
276

277
    EventAggregatorUtilityOutput result;
26✔
278
    auto post_task = make_task(
26!
279
        [&](CoroScope & /*ctx*/) -> CoroTask<bool> {
156!
280
            result = merger.finalize();
26!
281
            co_return result.success;
52!
NEW
282
        },
×
283
        "Finalize");
26!
284

285
    post_task->depends_on(streaming_task);
26!
286
    pipeline.set_source(streaming_task);
26!
287
    pipeline.set_destination(post_task);
26!
288
    pipeline.execute();
26!
289

290
    co_return result;
52!
291
}
26✔
292

293
}  // namespace
294

295
static int run_comparison_pipeline(ComparatorObject *self,
13✔
296
                                   const ComparatorArgs &cargs,
297
                                   ComparisonOutput &output,
298
                                   std::string &error_msg) {
299
    ComparatorArgs args_copy = cargs;
13✔
300
    auto *output_ptr = &output;
13✔
301

302
    Py_BEGIN_ALLOW_THREADS try {
13!
303
        ComparisonConfig config;
13!
304
        if (!args_copy.config_path.empty()) {
13!
NEW
305
            std::string parse_error;
×
NEW
306
            auto parsed = ComparisonConfig::from_json_file(
×
NEW
307
                args_copy.config_path, parse_error);
×
NEW
308
            if (!parsed) {
×
NEW
309
                error_msg = "Config error: " + parse_error;
×
NEW
310
                goto done;
×
311
            }
NEW
312
            config = std::move(*parsed);
×
NEW
313
        } else {
×
314
            config = ComparisonConfig::from_cli(
13!
315
                args_copy.baseline, args_copy.variant, args_copy.query,
13✔
316
                args_copy.group_by);
13✔
317
        }
318

319
        config.format = args_copy.format;
13!
320
        config.no_color = true;
13✔
321
        if (args_copy.executor_threads > 0)
13!
NEW
322
            config.executor_threads = args_copy.executor_threads;
×
323
        if (!args_copy.index_dir.empty())
13!
NEW
324
            config.index_dir = args_copy.index_dir;
×
325
        if (args_copy.force_rebuild)
13!
NEW
326
            config.force_rebuild = args_copy.force_rebuild;
×
327
        if (args_copy.threshold > 0.0)
13!
NEW
328
            config.defaults.threshold_pct = args_copy.threshold;
×
329
        if (args_copy.time_interval_ms > 0.0)
13!
330
            config.defaults.time_interval_ms = args_copy.time_interval_ms;
13✔
331

332
        config.resolve();
13!
333

334
        if (config.executor_threads == 0) {
13!
335
            config.executor_threads = dftracer_utils_hardware_concurrency();
13!
336
        }
13✔
337
        if (config.checkpoint_size == 0) {
13!
338
            config.checkpoint_size =
13✔
339
                indexer::internal::Indexer::DEFAULT_CHECKPOINT_SIZE;
340
        }
13✔
341

342
        // Create temp index dir if needed
343
        std::string temp_index_dir;
13✔
344
        if (config.index_dir.empty()) {
13!
345
            auto temp_path = fs::temp_directory_path();
13!
346
            temp_path /=
13!
347
                "dftracer_cmp_py_" + std::to_string(std::time(nullptr)) + "_" +
26!
348
                std::to_string(static_cast<int>(
13!
349
                    std::hash<std::thread::id>{}(std::this_thread::get_id())));
13✔
350
            temp_index_dir = temp_path.string();
13!
351
            fs::create_directories(temp_index_dir);
13!
352
            config.index_dir = temp_index_dir;
13!
353
        }
13✔
354

355
        // Enumerate files
356
        auto enumerate_files =
13✔
357
            [](std::string path) -> CoroTask<std::vector<std::string>> {
134!
358
            std::vector<std::string> files;
30✔
359
            if (fs::is_regular_file(path)) {
30!
360
                files.push_back(path);
24!
361
                co_return files;
50!
362
            }
363
            filesystem::PatternDirectoryScannerUtility scanner;
6!
364
            filesystem::PatternDirectoryScannerUtilityInput scan_input{
12!
365
                path, {".pfw", ".pfw.gz"}, false};
6!
366
            auto entries = co_await scanner.process(scan_input);
8!
367
            files.reserve(entries.size());
2!
368
            for (const auto &e : entries) {
6✔
369
                files.push_back(e.path.string());
4!
370
            }
4✔
371
            co_return files;
2!
372
        };
38!
373

374
        Runtime *rt = get_runtime(self);
13!
375

376
        auto *error_msg_ptr = &error_msg;
13✔
377
        auto task = [config, output_ptr, enumerate_files,
234!
378
                     error_msg_ptr]() -> CoroTask<void> {
26!
379
            auto baseline_files = co_await enumerate_files(config.baseline);
52!
380
            auto variant_files = co_await enumerate_files(config.variant);
52!
381

382
            if (baseline_files.empty()) {
39!
NEW
383
                *error_msg_ptr =
×
NEW
384
                    "No trace files found in baseline: " + config.baseline;
×
NEW
385
                co_return;
×
386
            }
387
            if (variant_files.empty()) {
39!
NEW
388
                *error_msg_ptr =
×
NEW
389
                    "No trace files found in variant: " + config.variant;
×
NEW
390
                co_return;
×
391
            }
392

393
            // Build indexes upfront
394
            {
395
                std::unordered_set<std::string> seen;
39✔
396
                std::vector<std::string> all_files;
39✔
397
                for (const auto &f : baseline_files) {
53✔
398
                    if (seen.insert(f).second) all_files.push_back(f);
14!
399
                }
14✔
400
                for (const auto &f : variant_files) {
53✔
401
                    if (seen.insert(f).second) all_files.push_back(f);
14!
402
                }
14✔
403
                std::vector<indexer::IndexBuildConfig> idx_configs;
39✔
404
                idx_configs.reserve(all_files.size());
39!
405
                for (const auto &file_path : all_files) {
53✔
406
                    idx_configs.push_back(
14!
407
                        indexer::IndexBuildConfig::for_file(file_path)
14!
408
                            .with_checkpoint_size(config.checkpoint_size)
14!
409
                            .with_force_rebuild(config.force_rebuild)
14!
410
                            .with_index_dir(config.index_dir));
14!
411
                }
14✔
412
                std::vector<CoroTask<indexer::IndexBuildResult>> idx_tasks;
39✔
413
                idx_tasks.reserve(idx_configs.size());
39!
414
                for (const auto &cfg : idx_configs) {
53✔
415
                    idx_tasks.push_back(
14!
416
                        indexer::IndexBuilderUtility{}.process(cfg));
14!
417
                }
14✔
418
                co_await coro::when_all(std::move(idx_tasks));
52!
419
            }
13✔
420

421
            output_ptr->baseline_path = config.baseline;
39✔
422
            output_ptr->variant_path = config.variant;
13✔
423
            output_ptr->baseline_file_count = baseline_files.size();
39✔
424
            output_ptr->variant_file_count = variant_files.size();
39✔
425

426
            auto start_time = std::chrono::high_resolution_clock::now();
39✔
427

428
            for (auto &node : config.nodes) {
52!
429
                std::vector<const ComparisonNode *> visitors;
39✔
430
                flatten_nodes(node, visitors);
39!
431

432
                std::vector<ComparisonVisitorPair> pairs;
39✔
433
                pairs.reserve(visitors.size());
39!
434

435
                for (const auto *visitor : visitors) {
78!
436
                    std::optional<common::query::Query> query;
39✔
437
                    if (!visitor->composed_query.empty()) {
39✔
438
                        auto result = common::query::Query::from_string(
26!
439
                            visitor->composed_query);
13✔
440
                        if (!result) {
13!
NEW
441
                            *error_msg_ptr = "Invalid query for node '" +
×
NEW
442
                                             visitor->name +
×
NEW
443
                                             "': " + result.error().format();
×
NEW
444
                            co_return;
×
445
                        }
446
                        query = std::move(*result);
13!
447
                    }
13!
448

449
                    AggregationConfig agg_cfg;
39!
450
                    agg_cfg.time_interval_us = static_cast<std::uint64_t>(
39✔
451
                        config.defaults.time_interval_ms * 1000.0);
39✔
452
                    agg_cfg.extra_group_keys = {};
39✔
453
                    agg_cfg.compute_statistics = true;
13✔
454
                    agg_cfg.compute_percentiles = true;
13✔
455
                    agg_cfg.percentiles = visitor->resolved_percentiles;
13✔
456
                    agg_cfg.sketch_accuracy = 0.01;
39✔
457
                    agg_cfg.track_process_parents = false;
39✔
458

459
                    auto [base_result, var_result] = co_await coro::when_all(
117!
460
                        run_aggregation(
78!
461
                            baseline_files, agg_cfg, query, config.index_dir,
39!
462
                            config.checkpoint_size, config.force_rebuild,
39✔
463
                            config.executor_threads),
39✔
464
                        run_aggregation(
78!
465
                            variant_files, agg_cfg, query, config.index_dir,
39!
466
                            config.checkpoint_size, config.force_rebuild,
39✔
467
                            config.executor_threads));
39✔
468

469
                    if (pairs.empty()) {
13!
470
                        output_ptr->baseline_meta = extract_metadata(
26!
471
                            base_result.aggregations, baseline_files.size());
13✔
472
                        output_ptr->variant_meta = extract_metadata(
26!
473
                            var_result.aggregations, variant_files.size());
13✔
474
                    }
13✔
475

476
                    ComparisonVisitorPair pair;
13✔
477
                    pair.baseline = std::move(base_result);
13✔
478
                    pair.variant = std::move(var_result);
13✔
479
                    pair.node = *visitor;
13!
480
                    pairs.push_back(std::move(pair));
13!
481
                }
13!
482

483
                ComparisonUtilityInput cmp_input;
39✔
484
                cmp_input.visitors = std::move(pairs);
39✔
485
                cmp_input.root_node = node;
39!
486
                cmp_input.baseline_file_count = baseline_files.size();
39✔
487
                cmp_input.variant_file_count = variant_files.size();
39✔
488

489
                ComparisonUtility cmp;
39!
490
                auto cmp_output = co_await cmp.process(cmp_input);
52!
491
                output_ptr->nodes.push_back(std::move(cmp_output.result));
13!
492
            }
13!
493

494
            auto end_time = std::chrono::high_resolution_clock::now();
13✔
495
            std::chrono::duration<double, std::milli> duration =
13✔
496
                end_time - start_time;
13!
497
            output_ptr->execution_time_ms = duration.count();
13!
498
        };
377✔
499

500
        rt->submit(task(), "comparator").get();
13!
501

502
        // Clean up temp index dir
503
        if (!temp_index_dir.empty() && fs::exists(temp_index_dir)) {
26!
504
            fs::remove_all(temp_index_dir);
13!
505
        }
13✔
506
    } catch (const std::exception &e) {
13!
NEW
507
        error_msg = e.what();
×
508
    }
13!
509
done:
510
    Py_END_ALLOW_THREADS
13!
511

512
        return error_msg.empty()
13✔
513
        ? 0
514
        : -1;
515
}
13✔
516

517
// -----------------------------------------------------------------------
518
// compare() -- returns ArrowTable
519
// -----------------------------------------------------------------------
520

521
static PyObject *Comparator_compare(ComparatorObject *self, PyObject *args,
6✔
522
                                    PyObject *kwds) {
523
    ComparatorArgs cargs;
6✔
524
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
6!
525

526
    ComparisonOutput output;
6✔
527
    std::string error_msg;
6✔
528
    if (run_comparison_pipeline(self, cargs, output, error_msg) < 0) {
6!
NEW
529
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
530
        return NULL;
×
531
    }
532

533
#ifdef DFTRACER_UTILS_ENABLE_ARROW
534
    auto arrow_result = output.to_arrow();
6!
535
    if (!arrow_result.valid()) {
6!
NEW
536
        PyErr_SetString(PyExc_RuntimeError,
×
537
                        "Failed to convert comparison output "
538
                        "to Arrow");
NEW
539
        return NULL;
×
540
    }
541
    return arrow_result_to_table(std::move(arrow_result));
6!
542
#else
543
    PyErr_SetString(PyExc_RuntimeError,
544
                    "dftracer-utils was built without Arrow support");
545
    return NULL;
546
#endif
547
}
6✔
548

549
// -----------------------------------------------------------------------
550
// compare_json() -- returns JSON string
551
// -----------------------------------------------------------------------
552

553
static PyObject *Comparator_compare_json(ComparatorObject *self, PyObject *args,
4✔
554
                                         PyObject *kwds) {
555
    ComparatorArgs cargs;
4✔
556
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
4!
557

558
    ComparisonOutput output;
4✔
559
    std::string error_msg;
4✔
560
    if (run_comparison_pipeline(self, cargs, output, error_msg) < 0) {
4!
NEW
561
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
562
        return NULL;
×
563
    }
564

565
    TreeTableFormatter formatter;
4!
566
    std::string json = formatter.render_json(output);
4!
567
    return PyUnicode_FromStringAndSize(json.data(), (Py_ssize_t)json.size());
4!
568
}
4✔
569

570
// -----------------------------------------------------------------------
571
// compare_table() -- returns formatted table string
572
// -----------------------------------------------------------------------
573

574
static PyObject *Comparator_compare_table(ComparatorObject *self,
3✔
575
                                          PyObject *args, PyObject *kwds) {
576
    ComparatorArgs cargs;
3✔
577
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
3!
578

579
    ComparisonOutput output;
3✔
580
    std::string error_msg;
3✔
581
    if (run_comparison_pipeline(self, cargs, output, error_msg) < 0) {
3!
NEW
582
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
583
        return NULL;
×
584
    }
585

586
    FormatterOptions fmt_opts;
3✔
587
    fmt_opts.use_color = false;
3✔
588
    fmt_opts.use_unicode = false;
3✔
589
    TreeTableFormatter formatter(fmt_opts);
3!
590

591
    // Render to a temporary file and read back as string
592
    char *buf = NULL;
3✔
593
    std::size_t buf_size = 0;
3✔
594
    FILE *memstream = open_memstream(&buf, &buf_size);
3!
595
    if (!memstream) {
3!
NEW
596
        PyErr_SetString(PyExc_RuntimeError, "Failed to create memory stream");
×
NEW
597
        return NULL;
×
598
    }
599

600
    formatter.render(memstream, output);
3!
601
    fflush(memstream);
3!
602
    fclose(memstream);
3!
603

604
    PyObject *result = PyUnicode_FromStringAndSize(buf, (Py_ssize_t)buf_size);
3!
605
    free(buf);
3!
606
    return result;
3✔
607
}
3✔
608

609
// -----------------------------------------------------------------------
610
// __call__ delegates to compare()
611
// -----------------------------------------------------------------------
612

613
static PyObject *Comparator_call(PyObject *self, PyObject *args,
1✔
614
                                 PyObject *kwds) {
615
    return Comparator_compare((ComparatorObject *)self, args, kwds);
1✔
616
}
617

618
// -----------------------------------------------------------------------
619
// Method table
620
// -----------------------------------------------------------------------
621

622
static const char *COMPARE_DOC =
623
    "compare(baseline, variant, query='', group_by='',\n"
624
    "        format='table', time_interval_ms=5000.0,\n"
625
    "        threshold=0.0, executor_threads=0,\n"
626
    "        index_dir='', force_rebuild=False, config='')\n"
627
    "--\n"
628
    "\n"
629
    "Run comparison pipeline, return materialized ArrowTable.\n"
630
    "\n"
631
    "Args:\n"
632
    "    baseline (str): Baseline trace file or directory.\n"
633
    "    variant (str): Variant trace file or directory.\n"
634
    "    query (str): Query filter (default: all events).\n"
635
    "    group_by (str): Comma-separated group keys.\n"
636
    "    format (str): Output format (default 'table').\n"
637
    "    time_interval_ms (float): Time bucket in ms "
638
    "(default 5000).\n"
639
    "    threshold (float): Hide changes below this pct.\n"
640
    "    executor_threads (int): Parallel threads (0=auto).\n"
641
    "    index_dir (str): Index sidecar directory.\n"
642
    "    force_rebuild (bool): Force index rebuild.\n"
643
    "    config (str): JSON config file path.\n"
644
    "\n"
645
    "Returns:\n"
646
    "    ArrowTable: Comparison results.\n";
647

648
static const char *COMPARE_JSON_DOC =
649
    "compare_json(baseline, variant, query='', group_by='',\n"
650
    "             format='table', time_interval_ms=5000.0,\n"
651
    "             threshold=0.0, executor_threads=0,\n"
652
    "             index_dir='', force_rebuild=False, "
653
    "config='')\n"
654
    "--\n"
655
    "\n"
656
    "Run comparison pipeline, return JSON string.\n"
657
    "\n"
658
    "Args:\n"
659
    "    baseline (str): Baseline trace file or directory.\n"
660
    "    variant (str): Variant trace file or directory.\n"
661
    "    query (str): Query filter (default: all events).\n"
662
    "    group_by (str): Comma-separated group keys.\n"
663
    "    format (str): Output format (default 'table').\n"
664
    "    time_interval_ms (float): Time bucket in ms "
665
    "(default 5000).\n"
666
    "    threshold (float): Hide changes below this pct.\n"
667
    "    executor_threads (int): Parallel threads (0=auto).\n"
668
    "    index_dir (str): Index sidecar directory.\n"
669
    "    force_rebuild (bool): Force index rebuild.\n"
670
    "    config (str): JSON config file path.\n"
671
    "\n"
672
    "Returns:\n"
673
    "    str: JSON representation of comparison results.\n";
674

675
static const char *COMPARE_TABLE_DOC =
676
    "compare_table(baseline, variant, query='', group_by='',\n"
677
    "              format='table', time_interval_ms=5000.0,\n"
678
    "              threshold=0.0, executor_threads=0,\n"
679
    "              index_dir='', force_rebuild=False, "
680
    "config='')\n"
681
    "--\n"
682
    "\n"
683
    "Run comparison pipeline, return formatted table string.\n"
684
    "\n"
685
    "Args:\n"
686
    "    baseline (str): Baseline trace file or directory.\n"
687
    "    variant (str): Variant trace file or directory.\n"
688
    "    query (str): Query filter (default: all events).\n"
689
    "    group_by (str): Comma-separated group keys.\n"
690
    "    format (str): Output format (default 'table').\n"
691
    "    time_interval_ms (float): Time bucket in ms "
692
    "(default 5000).\n"
693
    "    threshold (float): Hide changes below this pct.\n"
694
    "    executor_threads (int): Parallel threads (0=auto).\n"
695
    "    index_dir (str): Index sidecar directory.\n"
696
    "    force_rebuild (bool): Force index rebuild.\n"
697
    "    config (str): JSON config file path.\n"
698
    "\n"
699
    "Returns:\n"
700
    "    str: Formatted ASCII table of comparison results.\n";
701

702
static PyMethodDef Comparator_methods[] = {
1✔
703
    {"compare", (PyCFunction)Comparator_compare, METH_VARARGS | METH_KEYWORDS,
2✔
704
     COMPARE_DOC},
1✔
705
    {"compare_json", (PyCFunction)Comparator_compare_json,
2✔
706
     METH_VARARGS | METH_KEYWORDS, COMPARE_JSON_DOC},
1✔
707
    {"compare_table", (PyCFunction)Comparator_compare_table,
2✔
708
     METH_VARARGS | METH_KEYWORDS, COMPARE_TABLE_DOC},
1✔
709
    {NULL}};
1✔
710

711
PyTypeObject ComparatorType = {
712
    PyVarObject_HEAD_INIT(
713
        NULL, 0) "dftracer_utils_ext.ComparatorUtility", /* tp_name */
714
    sizeof(ComparatorObject),                            /* tp_basicsize */
715
    0,                                                   /* tp_itemsize */
716
    (destructor)Comparator_dealloc,                      /* tp_dealloc */
717
    0,                                        /* tp_vectorcall_offset */
718
    0,                                        /* tp_getattr */
719
    0,                                        /* tp_setattr */
720
    0,                                        /* tp_as_async */
721
    0,                                        /* tp_repr */
722
    0,                                        /* tp_as_number */
723
    0,                                        /* tp_as_sequence */
724
    0,                                        /* tp_as_mapping */
725
    0,                                        /* tp_hash */
726
    Comparator_call,                          /* tp_call */
727
    0,                                        /* tp_str */
728
    0,                                        /* tp_getattro */
729
    0,                                        /* tp_setattro */
730
    0,                                        /* tp_as_buffer */
731
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
732
    "ComparatorUtility(runtime: Runtime | None = None)\n"
733
    "--\n\n"
734
    "Compare DFTracer trace metrics between baseline and "
735
    "variant.\n\n"
736
    "Args:\n"
737
    "    runtime (Runtime or None): Runtime for thread pool "
738
    "control.\n"
739
    "        If None, uses the default global Runtime.\n\n"
740
    "compare(baseline, variant, ...) -> ArrowTable\n"
741
    "    Run comparison and return a materialized Arrow "
742
    "table.\n\n"
743
    "compare_json(baseline, variant, ...) -> str\n"
744
    "    Run comparison and return JSON string.\n\n"
745
    "compare_table(baseline, variant, ...) -> str\n"
746
    "    Run comparison and return formatted table "
747
    "string.\n",               /* tp_doc */
748
    0,                         /* tp_traverse */
749
    0,                         /* tp_clear */
750
    0,                         /* tp_richcompare */
751
    0,                         /* tp_weaklistoffset */
752
    0,                         /* tp_iter */
753
    0,                         /* tp_iternext */
754
    Comparator_methods,        /* tp_methods */
755
    0,                         /* tp_members */
756
    0,                         /* tp_getset */
757
    0,                         /* tp_base */
758
    0,                         /* tp_dict */
759
    0,                         /* tp_descr_get */
760
    0,                         /* tp_descr_set */
761
    0,                         /* tp_dictoffset */
762
    (initproc)Comparator_init, /* tp_init */
763
    0,                         /* tp_alloc */
764
    Comparator_new,            /* tp_new */
765
};
766

767
int init_comparator(PyObject *m) {
1✔
768
    if (PyType_Ready(&ComparatorType) < 0) return -1;
1!
769

770
    Py_INCREF(&ComparatorType);
1✔
771
    if (PyModule_AddObject(m, "ComparatorUtility",
2!
772
                           (PyObject *)&ComparatorType) < 0) {
1✔
NEW
773
        Py_DECREF(&ComparatorType);
×
NEW
774
        Py_DECREF(m);
×
NEW
775
        return -1;
×
776
    }
777

778
    return 0;
1✔
779
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc