• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 23529483807

25 Mar 2026 07:17AM UTC coverage: 48.515% (-1.6%) from 50.098%
23529483807

Pull #57

github

web-flow
Merge 5b1e117ad into 38f9f3616
Pull Request #57: feat(comparator): add pairwise traces comparator

18829 of 49412 branches covered (38.11%)

Branch coverage included in aggregate %.

1584 of 1933 new or added lines in 14 files covered. (81.95%)

3552 existing lines in 135 files now uncovered.

18474 of 27477 relevant lines covered (67.23%)

241072.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.25
/src/dftracer/utils/python/utilities/comparator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <dftracer/utils/core/common/filesystem.h>
3
#include <dftracer/utils/core/coro/channel.h>
4
#include <dftracer/utils/core/coro/task.h>
5
#include <dftracer/utils/core/coro/when_all.h>
6
#include <dftracer/utils/core/pipeline/pipeline.h>
7
#include <dftracer/utils/core/pipeline/pipeline_config.h>
8
#include <dftracer/utils/core/runtime.h>
9
#include <dftracer/utils/core/tasks/coro_scope.h>
10
#include <dftracer/utils/core/tasks/task.h>
11
#include <dftracer/utils/python/arrow_helpers.h>
12
#include <dftracer/utils/python/runtime.h>
13
#include <dftracer/utils/python/utilities/comparator.h>
14
#include <dftracer/utils/utilities/common/query/query.h>
15
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregators.h>
16
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_config.h>
17
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_result.h>
18
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_utility.h>
19
#include <dftracer/utils/utilities/composites/dft/comparator/tree_table_formatter.h>
20
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
21
#include <dftracer/utils/utilities/composites/dft/metadata_collector_utility.h>
22
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
23
#include <dftracer/utils/utilities/indexer/index_builder_utility.h>
24
#include <dftracer/utils/utilities/indexer/internal/indexer.h>
25

26
#include <atomic>
27
#include <chrono>
28
#include <cstdio>
29
#include <string>
30
#include <thread>
31
#include <unordered_set>
32
#include <vector>
33

34
using dftracer::utils::Runtime;
35
using dftracer::utils::coro::CoroTask;
36
using namespace dftracer::utils;
37
using namespace dftracer::utils::utilities;
38
using namespace dftracer::utils::utilities::composites::dft::aggregators;
39
using namespace dftracer::utils::utilities::composites::dft::comparator;
40

41
#ifdef DFTRACER_UTILS_ENABLE_ARROW
42
using dftracer::utils::python::arrow_result_to_table;
43
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
44
#endif
45

46
static Runtime *get_runtime(ComparatorObject *self) {
13✔
47
    if (self->runtime_obj)
13!
NEW
48
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
×
49
    return get_default_runtime();
13✔
50
}
13✔
51

52
static void Comparator_dealloc(ComparatorObject *self) {
13✔
53
    Py_XDECREF(self->runtime_obj);
13✔
54
    Py_TYPE(self)->tp_free((PyObject *)self);
13✔
55
}
13✔
56

57
static PyObject *Comparator_new(PyTypeObject *type, PyObject *args,
13✔
58
                                PyObject *kwds) {
59
    ComparatorObject *self = (ComparatorObject *)type->tp_alloc(type, 0);
13✔
60
    if (self) {
13!
61
        self->runtime_obj = NULL;
13✔
62
    }
13✔
63
    return (PyObject *)self;
13✔
64
}
65

66
static int Comparator_init(ComparatorObject *self, PyObject *args,
13✔
67
                           PyObject *kwds) {
68
    static const char *kwlist[] = {"runtime", NULL};
69
    PyObject *runtime_arg = NULL;
13✔
70

71
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char **)kwlist,
13!
72
                                     &runtime_arg)) {
NEW
73
        return -1;
×
74
    }
75

76
    if (runtime_arg && runtime_arg != Py_None) {
13!
NEW
77
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
NEW
78
            Py_INCREF(runtime_arg);
×
NEW
79
            self->runtime_obj = runtime_arg;
×
NEW
80
        } else {
×
NEW
81
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
NEW
82
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
×
NEW
83
                self->runtime_obj = native;
×
NEW
84
            } else {
×
NEW
85
                Py_XDECREF(native);
×
NEW
86
                PyErr_SetString(PyExc_TypeError,
×
87
                                "runtime must be a Runtime instance or None");
NEW
88
                return -1;
×
89
            }
90
        }
NEW
91
    }
×
92

93
    return 0;
13✔
94
}
13✔
95

96
// -----------------------------------------------------------------------
97
// Helpers
98
// -----------------------------------------------------------------------
99

100
struct ComparatorArgs {
39!
101
    std::string baseline;
102
    std::string variant;
103
    std::string query;
104
    std::string group_by;
105
    std::string format;
106
    double time_interval_ms = 5000.0;
13✔
107
    double threshold = 0.0;
13✔
108
    std::size_t executor_threads = 0;
13✔
109
    std::string index_dir;
110
    bool force_rebuild = false;
13✔
111
    std::string config_path;
112
};
113

114
static int parse_comparator_args(PyObject *args, PyObject *kwds,
13✔
115
                                 ComparatorArgs &out) {
116
    static const char *kwlist[] = {
117
        "baseline",  "variant",          "query",     "group_by",
118
        "format",    "time_interval_ms", "threshold", "executor_threads",
119
        "index_dir", "force_rebuild",    "config",    NULL};
120

121
    const char *baseline = NULL;
13✔
122
    const char *variant = NULL;
13✔
123
    const char *query = "";
13✔
124
    const char *group_by = "";
13✔
125
    const char *format = "table";
13✔
126
    double time_interval_ms = 5000.0;
13✔
127
    double threshold = 0.0;
13✔
128
    Py_ssize_t executor_threads = 0;
13✔
129
    const char *index_dir = "";
13✔
130
    int force_rebuild = 0;
13✔
131
    const char *config = "";
13✔
132

133
    if (!PyArg_ParseTupleAndKeywords(
13!
134
            args, kwds, "ss|sssddnsp", (char **)kwlist, &baseline, &variant,
13✔
135
            &query, &group_by, &format, &time_interval_ms, &threshold,
136
            &executor_threads, &index_dir, &force_rebuild, &config))
NEW
137
        return -1;
×
138

139
    out.baseline = baseline;
13✔
140
    out.variant = variant;
13✔
141
    out.query = query;
13✔
142
    out.group_by = group_by;
13✔
143
    out.format = format;
13✔
144
    out.time_interval_ms = time_interval_ms;
13✔
145
    out.threshold = threshold;
13✔
146
    out.executor_threads = static_cast<std::size_t>(executor_threads);
13✔
147
    out.index_dir = index_dir;
13✔
148
    out.force_rebuild = force_rebuild != 0;
13✔
149
    out.config_path = config;
13✔
150

151
    return 0;
13✔
152
}
13✔
153

154
namespace {
155

156
void flatten_nodes(const ComparisonNode &node,
13✔
157
                   std::vector<const ComparisonNode *> &out) {
158
    out.push_back(&node);
13✔
159
    for (const auto &child : node.children) {
13!
NEW
160
        flatten_nodes(child, out);
×
161
    }
162
}
13✔
163

164
CoroTask<EventAggregatorUtilityOutput> run_aggregation(
130!
165
    std::vector<std::string> input_files, AggregationConfig agg_config,
166
    std::optional<common::query::Query> query, std::string index_dir,
167
    std::size_t checkpoint_size, bool force_rebuild,
168
    std::size_t executor_threads) {
26!
169
    constexpr std::size_t CHUNK_SIZE_MB = 4;
26✔
170
    constexpr std::size_t BATCH_SIZE_MB = 4;
26✔
171

172
    auto pipeline_config = PipelineConfig()
52!
173
                               .with_name("DFTracer Comparator Aggregation")
26!
174
                               .with_compute_threads(executor_threads)
26!
175
                               .with_watchdog(false);
26!
176
    Pipeline pipeline(pipeline_config);
26!
177

178
    EventAggregatorUtility merger;
26!
179
    std::atomic<int> global_chunk_idx{0};
26✔
180

181
    auto streaming_task = make_task(
26!
182
        [&](CoroScope &ctx) -> CoroTask<void> {
208!
183
            auto chunk_chan = coro::make_channel<ChunkAggregatorInput>(0);
78!
184
            auto result_chan = coro::make_channel<ChunkAggregationOutput>(8);
78!
185

186
            co_await ctx.scope([&](CoroScope &scope) -> CoroTask<void> {
260!
187
                for (const auto &file_path : input_files) {
54✔
188
                    auto *global_chunk_idx_ptr = &global_chunk_idx;
28✔
189
                    scope.spawn([file_path, ch = chunk_chan->producer(),
448!
190
                                 index_dir, checkpoint_size, force_rebuild,
28!
191
                                 agg_config, query, global_chunk_idx_ptr](
28!
192
                                    CoroScope & /*fctx*/) mutable
193
                                    -> CoroTask<void> {
28!
194
                        [[maybe_unused]] auto producer_guard = ch.guard();
84!
195

196
                        std::string idx_path =
84✔
197
                            composites::dft::internal::determine_index_path(
84!
198
                                file_path, index_dir);
84✔
199

200
                        auto meta_input =
84✔
201
                            composites::dft::MetadataCollectorUtilityInput::
168!
202
                                from_file(file_path)
84!
203
                                    .with_checkpoint_size(checkpoint_size)
84✔
204
                                    .with_force_rebuild(force_rebuild)
28!
205
                                    .with_index(idx_path);
28✔
206
                        auto metadata =
84✔
207
                            co_await composites::dft::MetadataCollectorUtility{}
224!
208
                                .process(meta_input);
84!
209

210
                        if (!metadata.success) {
84!
NEW
211
                            co_return;
×
212
                        }
213

214
                        FileChunkMapperUtility file_mapper;
84!
215
                        auto mapper_input =
84✔
216
                            FileChunkMapperInput::from_metadata(metadata)
168!
217
                                .with_config(agg_config)
84✔
218
                                .with_checkpoint_size(checkpoint_size)
28!
219
                                .with_target_chunk_size(CHUNK_SIZE_MB)
28!
220
                                .with_batch_size(BATCH_SIZE_MB * 1024 * 1024);
28!
221
                        mapper_input.query = query;
84!
222
                        auto file_chunks =
84✔
223
                            co_await file_mapper.process(mapper_input);
112!
224

225
                        int start_idx = global_chunk_idx_ptr->fetch_add(
168✔
226
                            static_cast<int>(file_chunks.size()));
84✔
227
                        for (int i = 0;
112✔
228
                             i < static_cast<int>(file_chunks.size()); ++i) {
112✔
229
                            file_chunks[i].chunk_index = start_idx + i;
28✔
230
                        }
28✔
231

232
                        for (auto &chunk : file_chunks) {
112!
233
                            if (!co_await ch.send(std::move(chunk))) {
112!
NEW
234
                                co_return;
×
235
                            }
236
                        }
28!
237
                        co_return;
28✔
238
                    });
700✔
239
                }
28✔
240

241
                for (std::size_t w = 0; w < executor_threads; ++w) {
104✔
242
                    (void)w;
243
                    scope.spawn([chunk_chan, rp = result_chan->producer(),
687!
244
                                 result_chan](
78✔
245
                                    CoroScope &wctx) mutable -> CoroTask<void> {
74✔
246
                        [[maybe_unused]] auto producer_guard = rp.guard();
68!
247
                        while (auto input = co_await wctx.receive(chunk_chan)) {
498!
248
                            ChunkAggregatorUtility agg;
84!
249
                            auto output = co_await agg.process(*input);
112!
250
                            if (!co_await result_chan->send(
112!
251
                                    std::move(output))) {
NEW
252
                                co_return;
×
253
                            }
254
                        }
218✔
255
                        co_return;
78✔
256
                    });
780✔
257
                }
78✔
258

259
                auto *merger_ptr = &merger;
26✔
260
                scope.spawn([result_chan,
294!
261
                             merger_ptr](CoroScope &mctx) -> CoroTask<void> {
52!
262
                    while (auto output = co_await mctx.receive(result_chan)) {
164!
263
                        merger_ptr->merge_chunk(std::move(*output));
28!
264
                    }
54✔
265
                    co_return;
26✔
266
                });
112✔
267

268
                co_return;
52✔
269
            });
26✔
270

271
            co_return;
26✔
272
        },
78!
273
        "StreamingAggregate");
26!
274

275
    EventAggregatorUtilityOutput result;
26✔
276
    auto post_task = make_task(
26!
277
        [&](CoroScope & /*ctx*/) -> CoroTask<bool> {
156!
278
            result = merger.finalize();
26!
279
            co_return result.success;
52!
NEW
280
        },
×
281
        "Finalize");
26!
282

283
    post_task->depends_on(streaming_task);
26!
284
    pipeline.set_source(streaming_task);
26!
285
    pipeline.set_destination(post_task);
26!
286
    pipeline.execute();
26!
287

288
    co_return result;
52!
289
}
26✔
290

291
}  // namespace
292

293
static int run_comparison_pipeline(ComparatorObject *self,
13✔
294
                                   const ComparatorArgs &cargs,
295
                                   ComparisonOutput &output,
296
                                   std::string &error_msg) {
297
    ComparatorArgs args_copy = cargs;
13✔
298
    auto *output_ptr = &output;
13✔
299

300
    Py_BEGIN_ALLOW_THREADS try {
13!
301
        ComparisonConfig config;
13!
302
        if (!args_copy.config_path.empty()) {
13!
NEW
303
            std::string parse_error;
×
NEW
304
            auto parsed = ComparisonConfig::from_json_file(
×
NEW
305
                args_copy.config_path, parse_error);
×
NEW
306
            if (!parsed) {
×
NEW
307
                error_msg = "Config error: " + parse_error;
×
NEW
308
                goto done;
×
309
            }
NEW
310
            config = std::move(*parsed);
×
NEW
311
        } else {
×
312
            config = ComparisonConfig::from_cli(
13!
313
                args_copy.baseline, args_copy.variant, args_copy.query,
13✔
314
                args_copy.group_by);
13✔
315
        }
316

317
        config.format = args_copy.format;
13!
318
        config.no_color = true;
13✔
319
        if (args_copy.executor_threads > 0)
13!
NEW
320
            config.executor_threads = args_copy.executor_threads;
×
321
        if (!args_copy.index_dir.empty())
13!
NEW
322
            config.index_dir = args_copy.index_dir;
×
323
        if (args_copy.force_rebuild)
13!
NEW
324
            config.force_rebuild = args_copy.force_rebuild;
×
325
        if (args_copy.threshold > 0.0)
13!
NEW
326
            config.defaults.threshold_pct = args_copy.threshold;
×
327
        if (args_copy.time_interval_ms > 0.0)
13!
328
            config.defaults.time_interval_ms = args_copy.time_interval_ms;
13✔
329

330
        config.resolve();
13!
331

332
        if (config.executor_threads == 0) {
13!
333
            config.executor_threads = std::thread::hardware_concurrency();
13✔
334
        }
13✔
335
        if (config.checkpoint_size == 0) {
13!
336
            config.checkpoint_size =
13✔
337
                indexer::internal::Indexer::DEFAULT_CHECKPOINT_SIZE;
338
        }
13✔
339

340
        // Create temp index dir if needed
341
        std::string temp_index_dir;
13✔
342
        if (config.index_dir.empty()) {
13!
343
            auto temp_path = fs::temp_directory_path();
13!
344
            temp_path /=
13!
345
                "dftracer_cmp_py_" + std::to_string(std::time(nullptr)) + "_" +
26!
346
                std::to_string(static_cast<int>(
13!
347
                    std::hash<std::thread::id>{}(std::this_thread::get_id())));
13✔
348
            temp_index_dir = temp_path.string();
13!
349
            fs::create_directories(temp_index_dir);
13!
350
            config.index_dir = temp_index_dir;
13!
351
        }
13✔
352

353
        // Enumerate files
354
        auto enumerate_files =
13✔
355
            [](std::string path) -> CoroTask<std::vector<std::string>> {
134!
356
            std::vector<std::string> files;
30✔
357
            if (fs::is_regular_file(path)) {
30!
358
                files.push_back(path);
24!
359
                co_return files;
50!
360
            }
361
            filesystem::PatternDirectoryScannerUtility scanner;
6!
362
            filesystem::PatternDirectoryScannerUtilityInput scan_input{
12!
363
                path, {".pfw", ".pfw.gz"}, false};
6!
364
            auto entries = co_await scanner.process(scan_input);
8!
365
            files.reserve(entries.size());
2!
366
            for (const auto &e : entries) {
6✔
367
                files.push_back(e.path.string());
4!
368
            }
4✔
369
            co_return files;
2!
370
        };
38!
371

372
        Runtime *rt = get_runtime(self);
13!
373

374
        auto *error_msg_ptr = &error_msg;
13✔
375
        auto task = [config, output_ptr, enumerate_files,
234!
376
                     error_msg_ptr]() -> CoroTask<void> {
26!
377
            auto baseline_files = co_await enumerate_files(config.baseline);
52!
378
            auto variant_files = co_await enumerate_files(config.variant);
52!
379

380
            if (baseline_files.empty()) {
39!
NEW
381
                *error_msg_ptr =
×
NEW
382
                    "No trace files found in baseline: " + config.baseline;
×
NEW
383
                co_return;
×
384
            }
385
            if (variant_files.empty()) {
39!
NEW
386
                *error_msg_ptr =
×
NEW
387
                    "No trace files found in variant: " + config.variant;
×
NEW
388
                co_return;
×
389
            }
390

391
            // Build indexes upfront
392
            {
393
                std::unordered_set<std::string> seen;
39✔
394
                std::vector<std::string> all_files;
39✔
395
                for (const auto &f : baseline_files) {
53✔
396
                    if (seen.insert(f).second) all_files.push_back(f);
14!
397
                }
14✔
398
                for (const auto &f : variant_files) {
53✔
399
                    if (seen.insert(f).second) all_files.push_back(f);
14!
400
                }
14✔
401
                std::vector<indexer::IndexBuildConfig> idx_configs;
39✔
402
                idx_configs.reserve(all_files.size());
39!
403
                for (const auto &file_path : all_files) {
53✔
404
                    idx_configs.push_back(
14!
405
                        indexer::IndexBuildConfig::for_file(file_path)
14!
406
                            .with_checkpoint_size(config.checkpoint_size)
14!
407
                            .with_force_rebuild(config.force_rebuild)
14!
408
                            .with_index_dir(config.index_dir));
14!
409
                }
14✔
410
                std::vector<CoroTask<indexer::IndexBuildResult>> idx_tasks;
39✔
411
                idx_tasks.reserve(idx_configs.size());
39!
412
                for (const auto &cfg : idx_configs) {
53✔
413
                    idx_tasks.push_back(
14!
414
                        indexer::IndexBuilderUtility{}.process(cfg));
14!
415
                }
14✔
416
                co_await coro::when_all(std::move(idx_tasks));
52!
417
            }
13✔
418

419
            output_ptr->baseline_path = config.baseline;
39✔
420
            output_ptr->variant_path = config.variant;
13✔
421
            output_ptr->baseline_file_count = baseline_files.size();
39✔
422
            output_ptr->variant_file_count = variant_files.size();
39✔
423

424
            auto start_time = std::chrono::high_resolution_clock::now();
39✔
425

426
            for (auto &node : config.nodes) {
52!
427
                std::vector<const ComparisonNode *> visitors;
39✔
428
                flatten_nodes(node, visitors);
39!
429

430
                std::vector<ComparisonVisitorPair> pairs;
39✔
431
                pairs.reserve(visitors.size());
39!
432

433
                for (const auto *visitor : visitors) {
78!
434
                    std::optional<common::query::Query> query;
39✔
435
                    if (!visitor->composed_query.empty()) {
39✔
436
                        auto result = common::query::Query::from_string(
26!
437
                            visitor->composed_query);
13✔
438
                        if (!result) {
13!
NEW
439
                            *error_msg_ptr = "Invalid query for node '" +
×
NEW
440
                                             visitor->name +
×
NEW
441
                                             "': " + result.error().format();
×
NEW
442
                            co_return;
×
443
                        }
444
                        query = std::move(*result);
13!
445
                    }
13!
446

447
                    AggregationConfig agg_cfg;
39!
448
                    agg_cfg.time_interval_us = static_cast<std::uint64_t>(
39✔
449
                        config.defaults.time_interval_ms * 1000.0);
39✔
450
                    agg_cfg.extra_group_keys = {};
39✔
451
                    agg_cfg.compute_statistics = true;
13✔
452
                    agg_cfg.compute_percentiles = true;
13✔
453
                    agg_cfg.percentiles = visitor->resolved_percentiles;
13✔
454
                    agg_cfg.sketch_accuracy = 0.01;
39✔
455
                    agg_cfg.track_process_parents = false;
39✔
456

457
                    auto [base_result, var_result] = co_await coro::when_all(
117!
458
                        run_aggregation(
78!
459
                            baseline_files, agg_cfg, query, config.index_dir,
39!
460
                            config.checkpoint_size, config.force_rebuild,
39✔
461
                            config.executor_threads),
39✔
462
                        run_aggregation(
78!
463
                            variant_files, agg_cfg, query, config.index_dir,
39!
464
                            config.checkpoint_size, config.force_rebuild,
39✔
465
                            config.executor_threads));
39✔
466

467
                    if (pairs.empty()) {
13!
468
                        output_ptr->baseline_meta = extract_metadata(
26!
469
                            base_result.aggregations, baseline_files.size());
13✔
470
                        output_ptr->variant_meta = extract_metadata(
26!
471
                            var_result.aggregations, variant_files.size());
13✔
472
                    }
13✔
473

474
                    ComparisonVisitorPair pair;
13✔
475
                    pair.baseline = std::move(base_result);
13✔
476
                    pair.variant = std::move(var_result);
13✔
477
                    pair.node = *visitor;
13!
478
                    pairs.push_back(std::move(pair));
13!
479
                }
13!
480

481
                ComparisonUtilityInput cmp_input;
39✔
482
                cmp_input.visitors = std::move(pairs);
39✔
483
                cmp_input.root_node = node;
39!
484
                cmp_input.baseline_file_count = baseline_files.size();
39✔
485
                cmp_input.variant_file_count = variant_files.size();
39✔
486

487
                ComparisonUtility cmp;
39!
488
                auto cmp_output = co_await cmp.process(cmp_input);
52!
489
                output_ptr->nodes.push_back(std::move(cmp_output.result));
13!
490
            }
13!
491

492
            auto end_time = std::chrono::high_resolution_clock::now();
13✔
493
            std::chrono::duration<double, std::milli> duration =
13✔
494
                end_time - start_time;
13!
495
            output_ptr->execution_time_ms = duration.count();
13!
496
        };
377✔
497

498
        rt->submit(task(), "comparator").get();
13!
499

500
        // Clean up temp index dir
501
        if (!temp_index_dir.empty() && fs::exists(temp_index_dir)) {
26!
502
            fs::remove_all(temp_index_dir);
13!
503
        }
13✔
504
    } catch (const std::exception &e) {
13!
NEW
505
        error_msg = e.what();
×
506
    }
13!
507
done:
508
    Py_END_ALLOW_THREADS
13!
509

510
        return error_msg.empty()
13✔
511
        ? 0
512
        : -1;
513
}
13✔
514

515
// -----------------------------------------------------------------------
516
// compare() -- returns ArrowTable
517
// -----------------------------------------------------------------------
518

519
static PyObject *Comparator_compare(ComparatorObject *self, PyObject *args,
6✔
520
                                    PyObject *kwds) {
521
    ComparatorArgs cargs;
6✔
522
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
6!
523

524
    ComparisonOutput output;
6✔
525
    std::string error_msg;
6✔
526
    if (run_comparison_pipeline(self, cargs, output, error_msg) < 0) {
6!
NEW
527
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
528
        return NULL;
×
529
    }
530

531
#ifdef DFTRACER_UTILS_ENABLE_ARROW
532
    auto arrow_result = output.to_arrow();
6!
533
    if (!arrow_result.valid()) {
6!
NEW
534
        PyErr_SetString(PyExc_RuntimeError,
×
535
                        "Failed to convert comparison output "
536
                        "to Arrow");
NEW
537
        return NULL;
×
538
    }
539
    return arrow_result_to_table(std::move(arrow_result));
6!
540
#else
541
    PyErr_SetString(PyExc_RuntimeError,
542
                    "dftracer-utils was built without Arrow support");
543
    return NULL;
544
#endif
545
}
6✔
546

547
// -----------------------------------------------------------------------
548
// compare_json() -- returns JSON string
549
// -----------------------------------------------------------------------
550

551
static PyObject *Comparator_compare_json(ComparatorObject *self, PyObject *args,
4✔
552
                                         PyObject *kwds) {
553
    ComparatorArgs cargs;
4✔
554
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
4!
555

556
    ComparisonOutput output;
4✔
557
    std::string error_msg;
4✔
558
    if (run_comparison_pipeline(self, cargs, output, error_msg) < 0) {
4!
NEW
559
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
560
        return NULL;
×
561
    }
562

563
    TreeTableFormatter formatter;
4!
564
    std::string json = formatter.render_json(output);
4!
565
    return PyUnicode_FromStringAndSize(json.data(), (Py_ssize_t)json.size());
4!
566
}
4✔
567

568
// -----------------------------------------------------------------------
569
// compare_table() -- returns formatted table string
570
// -----------------------------------------------------------------------
571

572
static PyObject *Comparator_compare_table(ComparatorObject *self,
3✔
573
                                          PyObject *args, PyObject *kwds) {
574
    ComparatorArgs cargs;
3✔
575
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
3!
576

577
    ComparisonOutput output;
3✔
578
    std::string error_msg;
3✔
579
    if (run_comparison_pipeline(self, cargs, output, error_msg) < 0) {
3!
NEW
580
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
NEW
581
        return NULL;
×
582
    }
583

584
    FormatterOptions fmt_opts;
3✔
585
    fmt_opts.use_color = false;
3✔
586
    fmt_opts.use_unicode = false;
3✔
587
    TreeTableFormatter formatter(fmt_opts);
3!
588

589
    // Render to a temporary file and read back as string
590
    char *buf = NULL;
3✔
591
    std::size_t buf_size = 0;
3✔
592
    FILE *memstream = open_memstream(&buf, &buf_size);
3!
593
    if (!memstream) {
3!
NEW
594
        PyErr_SetString(PyExc_RuntimeError, "Failed to create memory stream");
×
NEW
595
        return NULL;
×
596
    }
597

598
    formatter.render(memstream, output);
3!
599
    fflush(memstream);
3!
600
    fclose(memstream);
3!
601

602
    PyObject *result = PyUnicode_FromStringAndSize(buf, (Py_ssize_t)buf_size);
3!
603
    free(buf);
3!
604
    return result;
3✔
605
}
3✔
606

607
// -----------------------------------------------------------------------
608
// __call__ delegates to compare()
609
// -----------------------------------------------------------------------
610

611
static PyObject *Comparator_call(PyObject *self, PyObject *args,
1✔
612
                                 PyObject *kwds) {
613
    return Comparator_compare((ComparatorObject *)self, args, kwds);
1✔
614
}
615

616
// -----------------------------------------------------------------------
617
// Method table
618
// -----------------------------------------------------------------------
619

620
static const char *COMPARE_DOC =
621
    "compare(baseline, variant, query='', group_by='',\n"
622
    "        format='table', time_interval_ms=5000.0,\n"
623
    "        threshold=0.0, executor_threads=0,\n"
624
    "        index_dir='', force_rebuild=False, config='')\n"
625
    "--\n"
626
    "\n"
627
    "Run comparison pipeline, return materialized ArrowTable.\n"
628
    "\n"
629
    "Args:\n"
630
    "    baseline (str): Baseline trace file or directory.\n"
631
    "    variant (str): Variant trace file or directory.\n"
632
    "    query (str): Query filter (default: all events).\n"
633
    "    group_by (str): Comma-separated group keys.\n"
634
    "    format (str): Output format (default 'table').\n"
635
    "    time_interval_ms (float): Time bucket in ms "
636
    "(default 5000).\n"
637
    "    threshold (float): Hide changes below this pct.\n"
638
    "    executor_threads (int): Parallel threads (0=auto).\n"
639
    "    index_dir (str): Index sidecar directory.\n"
640
    "    force_rebuild (bool): Force index rebuild.\n"
641
    "    config (str): JSON config file path.\n"
642
    "\n"
643
    "Returns:\n"
644
    "    ArrowTable: Comparison results.\n";
645

646
static const char *COMPARE_JSON_DOC =
647
    "compare_json(baseline, variant, query='', group_by='',\n"
648
    "             format='table', time_interval_ms=5000.0,\n"
649
    "             threshold=0.0, executor_threads=0,\n"
650
    "             index_dir='', force_rebuild=False, "
651
    "config='')\n"
652
    "--\n"
653
    "\n"
654
    "Run comparison pipeline, return JSON string.\n"
655
    "\n"
656
    "Args:\n"
657
    "    baseline (str): Baseline trace file or directory.\n"
658
    "    variant (str): Variant trace file or directory.\n"
659
    "    query (str): Query filter (default: all events).\n"
660
    "    group_by (str): Comma-separated group keys.\n"
661
    "    format (str): Output format (default 'table').\n"
662
    "    time_interval_ms (float): Time bucket in ms "
663
    "(default 5000).\n"
664
    "    threshold (float): Hide changes below this pct.\n"
665
    "    executor_threads (int): Parallel threads (0=auto).\n"
666
    "    index_dir (str): Index sidecar directory.\n"
667
    "    force_rebuild (bool): Force index rebuild.\n"
668
    "    config (str): JSON config file path.\n"
669
    "\n"
670
    "Returns:\n"
671
    "    str: JSON representation of comparison results.\n";
672

673
static const char *COMPARE_TABLE_DOC =
674
    "compare_table(baseline, variant, query='', group_by='',\n"
675
    "              format='table', time_interval_ms=5000.0,\n"
676
    "              threshold=0.0, executor_threads=0,\n"
677
    "              index_dir='', force_rebuild=False, "
678
    "config='')\n"
679
    "--\n"
680
    "\n"
681
    "Run comparison pipeline, return formatted table string.\n"
682
    "\n"
683
    "Args:\n"
684
    "    baseline (str): Baseline trace file or directory.\n"
685
    "    variant (str): Variant trace file or directory.\n"
686
    "    query (str): Query filter (default: all events).\n"
687
    "    group_by (str): Comma-separated group keys.\n"
688
    "    format (str): Output format (default 'table').\n"
689
    "    time_interval_ms (float): Time bucket in ms "
690
    "(default 5000).\n"
691
    "    threshold (float): Hide changes below this pct.\n"
692
    "    executor_threads (int): Parallel threads (0=auto).\n"
693
    "    index_dir (str): Index sidecar directory.\n"
694
    "    force_rebuild (bool): Force index rebuild.\n"
695
    "    config (str): JSON config file path.\n"
696
    "\n"
697
    "Returns:\n"
698
    "    str: Formatted ASCII table of comparison results.\n";
699

700
static PyMethodDef Comparator_methods[] = {
1✔
701
    {"compare", (PyCFunction)Comparator_compare, METH_VARARGS | METH_KEYWORDS,
2✔
702
     COMPARE_DOC},
1✔
703
    {"compare_json", (PyCFunction)Comparator_compare_json,
2✔
704
     METH_VARARGS | METH_KEYWORDS, COMPARE_JSON_DOC},
1✔
705
    {"compare_table", (PyCFunction)Comparator_compare_table,
2✔
706
     METH_VARARGS | METH_KEYWORDS, COMPARE_TABLE_DOC},
1✔
707
    {NULL}};
1✔
708

709
PyTypeObject ComparatorType = {
710
    PyVarObject_HEAD_INIT(
711
        NULL, 0) "dftracer_utils_ext.ComparatorUtility", /* tp_name */
712
    sizeof(ComparatorObject),                            /* tp_basicsize */
713
    0,                                                   /* tp_itemsize */
714
    (destructor)Comparator_dealloc,                      /* tp_dealloc */
715
    0,                                        /* tp_vectorcall_offset */
716
    0,                                        /* tp_getattr */
717
    0,                                        /* tp_setattr */
718
    0,                                        /* tp_as_async */
719
    0,                                        /* tp_repr */
720
    0,                                        /* tp_as_number */
721
    0,                                        /* tp_as_sequence */
722
    0,                                        /* tp_as_mapping */
723
    0,                                        /* tp_hash */
724
    Comparator_call,                          /* tp_call */
725
    0,                                        /* tp_str */
726
    0,                                        /* tp_getattro */
727
    0,                                        /* tp_setattro */
728
    0,                                        /* tp_as_buffer */
729
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
730
    "ComparatorUtility(runtime: Runtime | None = None)\n"
731
    "--\n\n"
732
    "Compare DFTracer trace metrics between baseline and "
733
    "variant.\n\n"
734
    "Args:\n"
735
    "    runtime (Runtime or None): Runtime for thread pool "
736
    "control.\n"
737
    "        If None, uses the default global Runtime.\n\n"
738
    "compare(baseline, variant, ...) -> ArrowTable\n"
739
    "    Run comparison and return a materialized Arrow "
740
    "table.\n\n"
741
    "compare_json(baseline, variant, ...) -> str\n"
742
    "    Run comparison and return JSON string.\n\n"
743
    "compare_table(baseline, variant, ...) -> str\n"
744
    "    Run comparison and return formatted table "
745
    "string.\n",               /* tp_doc */
746
    0,                         /* tp_traverse */
747
    0,                         /* tp_clear */
748
    0,                         /* tp_richcompare */
749
    0,                         /* tp_weaklistoffset */
750
    0,                         /* tp_iter */
751
    0,                         /* tp_iternext */
752
    Comparator_methods,        /* tp_methods */
753
    0,                         /* tp_members */
754
    0,                         /* tp_getset */
755
    0,                         /* tp_base */
756
    0,                         /* tp_dict */
757
    0,                         /* tp_descr_get */
758
    0,                         /* tp_descr_set */
759
    0,                         /* tp_dictoffset */
760
    (initproc)Comparator_init, /* tp_init */
761
    0,                         /* tp_alloc */
762
    Comparator_new,            /* tp_new */
763
};
764

765
int init_comparator(PyObject *m) {
1✔
766
    if (PyType_Ready(&ComparatorType) < 0) return -1;
1!
767

768
    Py_INCREF(&ComparatorType);
1✔
769
    if (PyModule_AddObject(m, "ComparatorUtility",
2!
770
                           (PyObject *)&ComparatorType) < 0) {
1✔
NEW
771
        Py_DECREF(&ComparatorType);
×
NEW
772
        Py_DECREF(m);
×
NEW
773
        return -1;
×
774
    }
775

776
    return 0;
1✔
777
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc