• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28356348514

29 Jun 2026 07:40AM UTC coverage: 52.174% (-0.1%) from 52.278%
28356348514

Pull #83

github

web-flow
Merge 278203630 into 2efed6649
Pull Request #83: refactor and improve code QoL

37276 of 92891 branches covered (40.13%)

Branch coverage included in aggregate %.

671 of 1173 new or added lines in 58 files covered. (57.2%)

66 existing lines in 30 files now uncovered.

33619 of 42991 relevant lines covered (78.2%)

20387.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.43
/src/dftracer/utils/python/utilities/comparator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <dftracer/utils/core/common/filesystem.h>
3
#include <dftracer/utils/core/common/platform_compat.h>
4
#include <dftracer/utils/core/coro/channel.h>
5
#include <dftracer/utils/core/coro/task.h>
6
#include <dftracer/utils/core/coro/when_all.h>
7
#include <dftracer/utils/core/pipeline/pipeline.h>
8
#include <dftracer/utils/core/pipeline/pipeline_config.h>
9
#include <dftracer/utils/core/runtime.h>
10
#include <dftracer/utils/core/tasks/coro_scope.h>
11
#include <dftracer/utils/core/tasks/task.h>
12
#include <dftracer/utils/python/arrow_helpers.h>
13
#include <dftracer/utils/python/py_runtime_mixin.h>
14
#include <dftracer/utils/python/py_type_helpers.h>
15
#include <dftracer/utils/python/runtime.h>
16
#include <dftracer/utils/python/utilities/comparator.h>
17
#include <dftracer/utils/utilities/common/query/query.h>
18
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregators.h>
19
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_config.h>
20
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_result.h>
21
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_utility.h>
22
#include <dftracer/utils/utilities/composites/dft/comparator/tree_table_formatter.h>
23
#include <dftracer/utils/utilities/composites/dft/indexing/index_resolver_utility.h>
24
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
25
#include <dftracer/utils/utilities/composites/dft/metadata_collector_utility.h>
26
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
27
#include <dftracer/utils/utilities/indexer/index_builder_utility.h>
28
#include <dftracer/utils/utilities/indexer/internal/indexer.h>
29

30
#include <atomic>
31
#include <chrono>
32
#include <cstdio>
33
#include <ctime>
34
#include <stdexcept>
35
#include <string>
36
#include <vector>
37

38
using dftracer::utils::Runtime;
39
using dftracer::utils::coro::CoroTask;
40
using namespace dftracer::utils;
41
using namespace dftracer::utils::utilities;
42
using namespace dftracer::utils::utilities::composites::dft::aggregators;
43
using namespace dftracer::utils::utilities::composites::dft::comparator;
44

45
#include <dftracer/utils/core/common/config.h>
46
#ifdef DFTRACER_UTILS_ENABLE_ARROW
47
using dftracer::utils::python::arrow_result_to_table;
48
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
49
#endif
50

51
static Runtime *get_runtime(ComparatorObject *self) {
26✔
52
    return resolve_runtime(self);
26✔
53
}
54

55
static void Comparator_dealloc(ComparatorObject *self) {
26✔
56
    runtime_backed_dealloc(self);
26✔
57
}
26✔
58

59
static PyObject *Comparator_new(PyTypeObject *type, PyObject *args,
26✔
60
                                PyObject *kwds) {
61
    return runtime_backed_new<ComparatorObject>(type, args, kwds);
26✔
62
}
63

64
static int Comparator_init(ComparatorObject *self, PyObject *args,
26✔
65
                           PyObject *kwds) {
66
    return runtime_backed_init(self, args, kwds);
26✔
67
}
68

69
// -----------------------------------------------------------------------
70
// Helpers
71
// -----------------------------------------------------------------------
72

73
struct ComparatorArgs {
39!
74
    std::string baseline;
75
    std::string variant;
76
    std::string query;
77
    std::string group_by;
78
    std::string format;
79
    double time_interval_ms = 5000.0;
13✔
80
    double threshold = 0.0;
13✔
81
    std::size_t executor_threads = 0;
13✔
82
    std::string baseline_index_dir;
83
    std::string variant_index_dir;
84
    bool force_rebuild = false;
13✔
85
    std::string config_path;
86
};
87

88
static int parse_comparator_args(PyObject *args, PyObject *kwds,
26✔
89
                                 ComparatorArgs &out) {
90
    static const char *kwlist[] = {"baseline",
91
                                   "variant",
92
                                   "query",
93
                                   "group_by",
94
                                   "format",
95
                                   "time_interval_ms",
96
                                   "threshold",
97
                                   "executor_threads",
98
                                   "baseline_index_dir",
99
                                   "variant_index_dir",
100
                                   "force_rebuild",
101
                                   "config",
102
                                   NULL};
103

104
    const char *baseline = NULL;
26✔
105
    const char *variant = NULL;
26✔
106
    const char *query = "";
26✔
107
    const char *group_by = "";
26✔
108
    const char *format = "table";
26✔
109
    double time_interval_ms = 5000.0;
26✔
110
    double threshold = 0.0;
26✔
111
    Py_ssize_t executor_threads = 0;
26✔
112
    const char *baseline_index_dir = "";
26✔
113
    const char *variant_index_dir = "";
26✔
114
    int force_rebuild = 0;
26✔
115
    const char *config = "";
26✔
116

117
    if (!PyArg_ParseTupleAndKeywords(
26!
118
            args, kwds, "ss|sssddnssps", (char **)kwlist, &baseline, &variant,
13✔
119
            &query, &group_by, &format, &time_interval_ms, &threshold,
120
            &executor_threads, &baseline_index_dir, &variant_index_dir,
121
            &force_rebuild, &config))
122
        return -1;
×
123

124
    out.baseline = baseline;
26!
125
    out.variant = variant;
26!
126
    out.query = query;
26!
127
    out.group_by = group_by;
26!
128
    out.format = format;
26!
129
    out.time_interval_ms = time_interval_ms;
26✔
130
    out.threshold = threshold;
26✔
131
    out.executor_threads = static_cast<std::size_t>(executor_threads);
26✔
132
    out.baseline_index_dir = baseline_index_dir;
26!
133
    out.variant_index_dir = variant_index_dir;
26!
134
    out.force_rebuild = force_rebuild != 0;
26✔
135
    out.config_path = config;
26!
136

137
    return 0;
26✔
138
}
13✔
139

140
namespace {
141

142
void flatten_nodes(const ComparisonNode &node,
26✔
143
                   std::vector<const ComparisonNode *> &out) {
144
    out.push_back(&node);
26!
145
    for (const auto &child : node.children) {
26!
146
        flatten_nodes(child, out);
×
147
    }
148
}
26✔
149

150
CoroTask<EventAggregatorOutput> run_aggregation(
156!
151
    std::vector<std::string> input_files, AggregationConfig agg_config,
152
    std::optional<common::query::Query> query, std::string index_dir,
153
    std::size_t checkpoint_size, bool force_rebuild,
154
    std::size_t executor_threads) {
26!
155
    constexpr std::size_t CHUNK_SIZE_MB = 4;
26✔
156
    constexpr std::size_t BATCH_SIZE_MB = 4;
26✔
157

158
    auto pipeline_config = PipelineConfig()
52!
159
                               .with_name("DFTracer Comparator Aggregation")
26!
160
                               .with_compute_threads(executor_threads)
26!
161
                               .with_watchdog(false);
26!
162
    Pipeline pipeline(pipeline_config);
26!
163

164
    EventAggregator merger;
26!
165
    std::atomic<int> global_chunk_idx{0};
26✔
166

167
    auto streaming_task = make_task(
52!
168
        [&](CoroScope &ctx) -> CoroTask<void> {
234!
169
            auto chunk_chan = coro::make_channel<ChunkAggregatorInput>(0);
78!
170
            auto result_chan = coro::make_channel<ChunkAggregationOutput>(2);
78!
171

172
            co_await ctx.scope([&](CoroScope &scope) -> CoroTask<void> {
286!
173
                for (const auto &file_path : input_files) {
54✔
174
                    auto *global_chunk_idx_ptr = &global_chunk_idx;
28✔
175
                    scope.spawn([file_path, ch = chunk_chan->producer(),
472!
176
                                 index_dir, checkpoint_size, force_rebuild,
28!
177
                                 agg_config, query, global_chunk_idx_ptr](
28!
178
                                    CoroScope & /*fctx*/) mutable
179
                                    -> CoroTask<void> {
28!
180
                        [[maybe_unused]] auto producer_guard = ch.guard();
80!
181

182
                        std::string index_path =
80✔
183
                            composites::dft::internal::determine_index_path(
80!
184
                                file_path, index_dir);
80✔
185

186
                        auto meta_input =
80✔
187
                            composites::dft::MetadataCollectorUtilityInput::
160!
188
                                from_file(file_path)
80!
189
                                    .with_checkpoint_size(checkpoint_size)
80✔
190
                                    .with_force_rebuild(force_rebuild)
28!
191
                                    .with_index(index_path);
28✔
192
                        auto metadata =
80✔
193
                            co_await composites::dft::MetadataCollectorUtility{}
214!
194
                                .process(meta_input);
80!
195

196
                        if (!metadata.success) {
84!
197
                            co_return;
198
                        }
199

200
                        FileChunkMapperUtility file_mapper;
84!
201
                        auto mapper_input =
84✔
202
                            FileChunkMapperInput::from_metadata(metadata)
168!
203
                                .with_config(agg_config)
84✔
204
                                .with_checkpoint_size(checkpoint_size)
28!
205
                                .with_target_chunk_size(CHUNK_SIZE_MB)
28!
206
                                .with_batch_size(BATCH_SIZE_MB * 1024 * 1024);
28!
207
                        mapper_input.query = query;
84!
208
                        auto file_chunks =
84✔
209
                            co_await file_mapper.process(mapper_input);
112!
210

211
                        int start_idx = global_chunk_idx_ptr->fetch_add(
168✔
212
                            static_cast<int>(file_chunks.size()));
84✔
213
                        for (int i = 0;
112✔
214
                             i < static_cast<int>(file_chunks.size()); ++i) {
112✔
215
                            file_chunks[i].chunk_index = start_idx + i;
28✔
216
                        }
28✔
217

218
                        for (auto &chunk : file_chunks) {
112!
219
                            if (!co_await ch.send(std::move(chunk))) {
112!
220
                                co_return;
221
                            }
222
                        }
28!
223
                        co_return;
28✔
224
                    });
736!
225
                }
28✔
226

227
                for (std::size_t w = 0; w < executor_threads; ++w) {
104✔
228
                    (void)w;
229
                    scope.spawn([chunk_chan, rp = result_chan->producer(),
779!
230
                                 result_chan](
78✔
231
                                    CoroScope &wctx) mutable -> CoroTask<void> {
79✔
232
                        [[maybe_unused]] auto producer_guard = rp.guard();
77!
233
                        while (auto input = co_await wctx.receive(chunk_chan)) {
503!
234
                            ChunkAggregatorUtility agg;
84!
235
                            auto output = co_await agg.process(*input);
112!
236
                            if (!co_await result_chan->send(
112!
237
                                    std::move(output))) {
238
                                co_return;
239
                            }
240
                        }
216✔
241
                        co_return;
78✔
242
                    });
981!
243
                }
78✔
244

245
                auto *merger_ptr = &merger;
26✔
246
                scope.spawn([result_chan,
336!
247
                             merger_ptr](CoroScope &mctx) -> CoroTask<void> {
52!
248
                    while (auto output = co_await mctx.receive(result_chan)) {
176!
249
                        merger_ptr->merge_chunk(std::move(*output));
28!
250
                    }
54✔
251
                    co_return;
26✔
252
                });
180!
253

254
                co_return;
52✔
255
            });
78!
256

257
            co_return;
26✔
258
        },
130!
259
        "StreamingAggregate");
26!
260

261
    EventAggregatorOutput result;
26✔
262
    auto post_task = make_task(
52!
263
        [&](CoroScope & /*ctx*/) -> CoroTask<bool> {
182!
264
            result = merger.finalize();
26!
265
            co_return result.success;
52!
266
        },
52!
267
        "Finalize");
26!
268

269
    post_task->depends_on(streaming_task);
26!
270
    pipeline.set_source(streaming_task);
26!
271
    pipeline.set_destination(post_task);
26!
272
    pipeline.execute();
26!
273

274
    co_return result;
52!
275
}
78!
276

277
}  // namespace
278

279
static bool run_comparison_pipeline(ComparatorObject *self,
26✔
280
                                    const ComparatorArgs &cargs,
281
                                    ComparisonOutput &output) {
282
    ComparatorArgs args_copy = cargs;
26!
283
    auto *output_ptr = &output;
26✔
284

285
    return run_blocking([&] {
39!
286
        ComparisonConfig config;
26!
287
        if (!args_copy.config_path.empty()) {
26✔
288
            std::string parse_error;
×
289
            auto parsed = ComparisonConfig::from_json_file(
×
290
                args_copy.config_path, parse_error);
×
291
            if (!parsed) {
×
NEW
292
                throw std::runtime_error("Config error: " + parse_error);
×
293
            }
294
            config = std::move(*parsed);
×
295
        } else {
×
296
            config = ComparisonConfig::from_cli(
26!
297
                args_copy.baseline, args_copy.variant, args_copy.query,
26✔
298
                args_copy.group_by);
26!
299
        }
300

301
        config.format = args_copy.format;
26!
302
        config.no_color = true;
26✔
303
        if (args_copy.executor_threads > 0)
26✔
304
            config.executor_threads = args_copy.executor_threads;
×
305
        if (!args_copy.baseline_index_dir.empty())
26!
306
            config.baseline_index_dir = args_copy.baseline_index_dir;
×
307
        if (!args_copy.variant_index_dir.empty())
26!
308
            config.variant_index_dir = args_copy.variant_index_dir;
×
309
        if (args_copy.force_rebuild)
26✔
310
            config.force_rebuild = args_copy.force_rebuild;
×
311
        if (args_copy.threshold > 0.0)
26✔
312
            config.defaults.threshold_pct = args_copy.threshold;
×
313
        if (args_copy.time_interval_ms > 0.0)
26✔
314
            config.defaults.time_interval_ms = args_copy.time_interval_ms;
26✔
315

316
        config.resolve();
26!
317

318
        if (config.executor_threads == 0) {
26!
319
            config.executor_threads = dftracer_utils_hardware_concurrency();
26!
320
        }
13✔
321
        if (config.checkpoint_size == 0) {
26✔
322
            config.checkpoint_size =
26✔
323
                indexer::internal::Indexer::DEFAULT_CHECKPOINT_SIZE;
324
        }
13✔
325

326
        using composites::dft::indexing::IndexResolverUtility;
327
        using composites::dft::indexing::ResolverInput;
328
        using indexer::IndexBatchBuilderUtility;
329
        using indexer::IndexBuildBatchConfig;
330

331
        Runtime *rt = get_runtime(self);
26!
332

333
        auto task = [config, output_ptr, rt]() -> CoroTask<void> {
169!
334
            auto resolve_and_build =
39✔
335
                [&config](
169!
336
                    CoroScope &scope, const std::string &path,
337
                    const std::string &index_dir,
338
                    std::vector<std::string> &out_files) -> CoroTask<void> {
13!
339
                IndexResolverUtility resolver;
13!
340
                ResolverInput resolve_input;
13✔
341
                resolve_input.index_dir = index_dir;
13!
342
                resolve_input.require_checkpoints = !config.force_rebuild;
13✔
343
                if (fs::is_regular_file(path)) {
13!
344
                    resolve_input.files = {path};
12!
345
                } else {
12✔
346
                    resolve_input.directory = path;
1!
347
                }
348

349
                auto result = co_await resolver.process(resolve_input);
39!
350
                out_files = std::move(result.all_files);
39✔
351

352
                if (out_files.empty() || result.needs_checkpoint.empty()) {
39✔
353
                    co_return;
52✔
354
                }
355

356
                auto batch_cfg = std::make_shared<IndexBuildBatchConfig>();
39!
357
                batch_cfg->file_paths.reserve(result.needs_checkpoint.size());
39✔
358
                for (const auto &item : result.needs_checkpoint) {
27✔
359
                    batch_cfg->file_paths.push_back(item.file_path);
14!
360
                }
14✔
361
                batch_cfg->index_dir = index_dir;
13✔
362
                batch_cfg->checkpoint_size = config.checkpoint_size;
39✔
363
                batch_cfg->parallelism = config.executor_threads;
39✔
364
                batch_cfg->force_rebuild = config.force_rebuild;
39✔
365
                batch_cfg->use_batch_write = true;
39✔
366
                batch_cfg->rebuild_root_summaries = true;
39✔
367

368
                co_await IndexBatchBuilderUtility::process(
91!
369
                    &scope, std::move(batch_cfg));
39✔
370
            };
169!
371

372
            std::vector<std::string> baseline_files;
39✔
373
            std::vector<std::string> variant_files;
39✔
374

375
            bool shared_index =
78✔
376
                composites::dft::internal::determine_index_path(
117!
377
                    config.baseline, config.baseline_index_dir) ==
78✔
378
                composites::dft::internal::determine_index_path(
78!
379
                    config.variant, config.variant_index_dir);
39✔
380

381
            co_await run_coro_scope(
52!
382
                rt->executor(), [&](CoroScope &scope) -> CoroTask<void> {
169!
383
                    if (shared_index) {
39!
384
                        co_await resolve_and_build(scope, config.baseline,
104!
385
                                                   config.baseline_index_dir,
39✔
386
                                                   baseline_files);
39✔
387
                        if (config.baseline == config.variant) {
13!
388
                            variant_files = baseline_files;
13!
389
                        } else {
13✔
390
                            co_await resolve_and_build(scope, config.variant,
×
391
                                                       config.variant_index_dir,
392
                                                       variant_files);
393
                        }
394
                    } else {
13✔
395
                        scope.spawn([&](CoroScope &s) -> CoroTask<void> {
×
396
                            co_await resolve_and_build(
×
397
                                s, config.baseline, config.baseline_index_dir,
398
                                baseline_files);
399
                        });
×
400
                        co_await resolve_and_build(scope, config.variant,
×
401
                                                   config.variant_index_dir,
402
                                                   variant_files);
403
                    }
404
                });
52!
405

406
            if (baseline_files.empty()) {
39!
407
                throw std::runtime_error("No trace files found in baseline: " +
×
408
                                         config.baseline);
409
            }
410
            if (variant_files.empty()) {
39!
411
                throw std::runtime_error("No trace files found in variant: " +
×
412
                                         config.variant);
413
            }
414

415
            output_ptr->baseline_path = config.baseline;
39✔
416
            output_ptr->variant_path = config.variant;
13✔
417

418
            auto start_time = std::chrono::high_resolution_clock::now();
39✔
419

420
            std::size_t b_files_actual = 0;
39✔
421
            std::size_t v_files_actual = 0;
39✔
422
            bool metadata_set = false;
39✔
423

424
            for (auto &node : config.nodes) {
52!
425
                std::vector<const ComparisonNode *> visitors;
39✔
426
                flatten_nodes(node, visitors);
39!
427

428
                std::vector<ComparisonVisitorPair> pairs;
39✔
429
                pairs.reserve(visitors.size());
39!
430

431
                for (const auto *visitor : visitors) {
78!
432
                    std::optional<common::query::Query> query;
39✔
433
                    if (!visitor->composed_query.empty()) {
39✔
434
                        auto result = common::query::Query::from_string(
26!
435
                            visitor->composed_query);
13✔
436
                        if (!result) {
13!
437
                            throw std::runtime_error(
×
438
                                "Invalid query for node '" + visitor->name +
×
439
                                "': " + result.error().format());
×
440
                        }
441
                        query = std::move(*result);
13!
442
                    }
13✔
443

444
                    AggregationConfig agg_cfg;
39!
445
                    agg_cfg.time_interval_us = static_cast<std::uint64_t>(
39✔
446
                        config.defaults.time_interval_ms * 1000.0);
39✔
447
                    agg_cfg.extra_group_keys = {};
39✔
448
                    agg_cfg.compute_statistics = true;
13✔
449
                    agg_cfg.compute_percentiles = true;
13✔
450
                    agg_cfg.percentiles = visitor->resolved_percentiles;
13✔
451
                    agg_cfg.sketch_accuracy = 0.01;
39✔
452
                    agg_cfg.track_process_parents = false;
39✔
453

454
                    auto [base_result, var_result] = co_await coro::when_all(
91!
455
                        run_aggregation(
78!
456
                            baseline_files, agg_cfg, query,
39!
457
                            config.baseline_index_dir, config.checkpoint_size,
39!
458
                            config.force_rebuild, config.executor_threads),
39✔
459
                        run_aggregation(
78!
460
                            variant_files, agg_cfg, query,
39!
461
                            config.variant_index_dir, config.checkpoint_size,
39!
462
                            config.force_rebuild, config.executor_threads));
39✔
463

464
                    if (!metadata_set) {
13!
465
                        b_files_actual = base_result.total_files_processed;
13✔
466
                        v_files_actual = var_result.total_files_processed;
13✔
467
                        output_ptr->baseline_file_count = b_files_actual;
13✔
468
                        output_ptr->variant_file_count = v_files_actual;
13✔
469
                        output_ptr->baseline_meta = extract_metadata(
26!
470
                            base_result.aggregations, b_files_actual);
13✔
471
                        output_ptr->variant_meta = extract_metadata(
26!
472
                            var_result.aggregations, v_files_actual);
13✔
473
                        metadata_set = true;
13✔
474
                    }
13✔
475

476
                    ComparisonVisitorPair pair;
13✔
477
                    pair.baseline = std::move(base_result);
13✔
478
                    pair.variant = std::move(var_result);
13✔
479
                    pair.node = *visitor;
13!
480
                    pairs.push_back(std::move(pair));
13!
481
                }
13!
482

483
                ComparisonUtilityInput cmp_input;
39✔
484
                cmp_input.visitors = std::move(pairs);
39✔
485
                cmp_input.root_node = node;
39!
486
                cmp_input.baseline_file_count = b_files_actual;
39✔
487
                cmp_input.variant_file_count = v_files_actual;
39✔
488

489
                ComparisonUtility cmp;
39!
490
                auto cmp_output = co_await cmp.process(cmp_input);
52!
491
                output_ptr->nodes.push_back(std::move(cmp_output.result));
13!
492
            }
13!
493

494
            // Inject metadata rows into root SUMMARY.
495
            auto meta_rows = build_metadata_metrics(output_ptr->baseline_meta,
26!
496
                                                    output_ptr->variant_meta);
13✔
497
            for (auto &node : output_ptr->nodes) {
26✔
498
                node.summary.metrics.insert(node.summary.metrics.begin(),
39!
499
                                            meta_rows.begin(), meta_rows.end());
26✔
500
            }
13✔
501

502
            auto end_time = std::chrono::high_resolution_clock::now();
13✔
503
            std::chrono::duration<double, std::milli> duration =
13✔
504
                end_time - start_time;
13!
505
            output_ptr->execution_time_ms = duration.count();
13!
506
        };
338!
507

508
        rt->submit(task(), "comparator").get();
26!
509
    });
52✔
510
}
26✔
511

512
// -----------------------------------------------------------------------
513
// compare() -- returns ArrowTable
514
// -----------------------------------------------------------------------
515

516
static PyObject *Comparator_compare(ComparatorObject *self, PyObject *args,
12✔
517
                                    PyObject *kwds) {
518
    ComparatorArgs cargs;
12✔
519
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
12!
520

521
    ComparisonOutput output;
12✔
522
    if (!run_comparison_pipeline(self, cargs, output)) {
12!
UNCOV
523
        return NULL;
×
524
    }
525

526
#ifdef DFTRACER_UTILS_ENABLE_ARROW
527
    auto arrow_result = output.to_arrow();
12!
528
    if (!arrow_result.valid()) {
12!
529
        PyErr_SetString(PyExc_RuntimeError,
×
530
                        "Failed to convert comparison output "
531
                        "to Arrow");
532
        return NULL;
×
533
    }
534
    return arrow_result_to_table(std::move(arrow_result));
12!
535
#else
536
    PyErr_SetString(PyExc_RuntimeError,
537
                    "dftracer-utils was built without Arrow support");
538
    return NULL;
539
#endif
540
}
12✔
541

542
// -----------------------------------------------------------------------
543
// compare_json() -- returns JSON string
544
// -----------------------------------------------------------------------
545

546
static PyObject *Comparator_compare_json(ComparatorObject *self, PyObject *args,
8✔
547
                                         PyObject *kwds) {
548
    ComparatorArgs cargs;
8✔
549
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
8!
550

551
    ComparisonOutput output;
8✔
552
    if (!run_comparison_pipeline(self, cargs, output)) {
8!
UNCOV
553
        return NULL;
×
554
    }
555

556
    TreeTableFormatter formatter;
8!
557
    std::string json = formatter.render_json(output);
8!
558
    return PyUnicode_FromStringAndSize(json.data(), (Py_ssize_t)json.size());
8!
559
}
8✔
560

561
// -----------------------------------------------------------------------
562
// compare_table() -- returns formatted table string
563
// -----------------------------------------------------------------------
564

565
static PyObject *Comparator_compare_table(ComparatorObject *self,
6✔
566
                                          PyObject *args, PyObject *kwds) {
567
    ComparatorArgs cargs;
6✔
568
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
6!
569

570
    ComparisonOutput output;
6✔
571
    if (!run_comparison_pipeline(self, cargs, output)) {
6!
UNCOV
572
        return NULL;
×
573
    }
574

575
    FormatterOptions fmt_opts;
6✔
576
    fmt_opts.use_color = false;
6✔
577
    fmt_opts.use_unicode = false;
6✔
578
    TreeTableFormatter formatter(fmt_opts);
6!
579

580
    // Render to a temporary file and read back as string
581
    char *buf = NULL;
6✔
582
    std::size_t buf_size = 0;
6✔
583
    FILE *memstream = open_memstream(&buf, &buf_size);
6!
584
    if (!memstream) {
6!
585
        PyErr_SetString(PyExc_RuntimeError, "Failed to create memory stream");
×
586
        return NULL;
×
587
    }
588

589
    formatter.render(memstream, output);
6!
590
    fflush(memstream);
6!
591
    fclose(memstream);
6!
592

593
    PyObject *result = PyUnicode_FromStringAndSize(buf, (Py_ssize_t)buf_size);
6!
594
    free(buf);
6!
595
    return result;
6✔
596
}
6✔
597

598
// -----------------------------------------------------------------------
599
// __call__ delegates to compare()
600
// -----------------------------------------------------------------------
601

602
static PyObject *Comparator_call(PyObject *self, PyObject *args,
2✔
603
                                 PyObject *kwds) {
604
    return Comparator_compare((ComparatorObject *)self, args, kwds);
2✔
605
}
606

607
// -----------------------------------------------------------------------
608
// Method table
609
// -----------------------------------------------------------------------
610

611
static const char *COMPARE_DOC =
612
    "compare(baseline, variant, query='', group_by='',\n"
613
    "        format='table', time_interval_ms=5000.0,\n"
614
    "        threshold=0.0, executor_threads=0,\n"
615
    "        index_dir='', force_rebuild=False, config='')\n"
616
    "--\n"
617
    "\n"
618
    "Run comparison pipeline, return materialized ArrowTable.\n"
619
    "\n"
620
    "Args:\n"
621
    "    baseline (str): Baseline trace file or directory.\n"
622
    "    variant (str): Variant trace file or directory.\n"
623
    "    query (str): Query filter (default: all events).\n"
624
    "    group_by (str): Comma-separated group keys.\n"
625
    "    format (str): Output format (default 'table').\n"
626
    "    time_interval_ms (float): Time bucket in ms "
627
    "(default 5000).\n"
628
    "    threshold (float): Hide changes below this pct.\n"
629
    "    executor_threads (int): Parallel threads (0=auto).\n"
630
    "    index_dir (str): Directory for .dftindex stores.\n"
631
    "    force_rebuild (bool): Force index rebuild.\n"
632
    "    config (str): JSON config file path.\n"
633
    "\n"
634
    "Returns:\n"
635
    "    ArrowTable: Comparison results.\n";
636

637
static const char *COMPARE_JSON_DOC =
638
    "compare_json(baseline, variant, query='', group_by='',\n"
639
    "             format='table', time_interval_ms=5000.0,\n"
640
    "             threshold=0.0, executor_threads=0,\n"
641
    "             index_dir='', force_rebuild=False, "
642
    "config='')\n"
643
    "--\n"
644
    "\n"
645
    "Run comparison pipeline, return JSON string.\n"
646
    "\n"
647
    "Args:\n"
648
    "    baseline (str): Baseline trace file or directory.\n"
649
    "    variant (str): Variant trace file or directory.\n"
650
    "    query (str): Query filter (default: all events).\n"
651
    "    group_by (str): Comma-separated group keys.\n"
652
    "    format (str): Output format (default 'table').\n"
653
    "    time_interval_ms (float): Time bucket in ms "
654
    "(default 5000).\n"
655
    "    threshold (float): Hide changes below this pct.\n"
656
    "    executor_threads (int): Parallel threads (0=auto).\n"
657
    "    index_dir (str): Directory for .dftindex stores.\n"
658
    "    force_rebuild (bool): Force index rebuild.\n"
659
    "    config (str): JSON config file path.\n"
660
    "\n"
661
    "Returns:\n"
662
    "    str: JSON representation of comparison results.\n";
663

664
static const char *COMPARE_TABLE_DOC =
665
    "compare_table(baseline, variant, query='', group_by='',\n"
666
    "              format='table', time_interval_ms=5000.0,\n"
667
    "              threshold=0.0, executor_threads=0,\n"
668
    "              index_dir='', force_rebuild=False, "
669
    "config='')\n"
670
    "--\n"
671
    "\n"
672
    "Run comparison pipeline, return formatted table string.\n"
673
    "\n"
674
    "Args:\n"
675
    "    baseline (str): Baseline trace file or directory.\n"
676
    "    variant (str): Variant trace file or directory.\n"
677
    "    query (str): Query filter (default: all events).\n"
678
    "    group_by (str): Comma-separated group keys.\n"
679
    "    format (str): Output format (default 'table').\n"
680
    "    time_interval_ms (float): Time bucket in ms "
681
    "(default 5000).\n"
682
    "    threshold (float): Hide changes below this pct.\n"
683
    "    executor_threads (int): Parallel threads (0=auto).\n"
684
    "    index_dir (str): Directory for .dftindex stores.\n"
685
    "    force_rebuild (bool): Force index rebuild.\n"
686
    "    config (str): JSON config file path.\n"
687
    "\n"
688
    "Returns:\n"
689
    "    str: Formatted ASCII table of comparison results.\n";
690

691
static PyMethodDef Comparator_methods[] = {
1✔
692
    {"compare", (PyCFunction)Comparator_compare, METH_VARARGS | METH_KEYWORDS,
2✔
693
     COMPARE_DOC},
1✔
694
    {"compare_json", (PyCFunction)Comparator_compare_json,
2✔
695
     METH_VARARGS | METH_KEYWORDS, COMPARE_JSON_DOC},
1✔
696
    {"compare_table", (PyCFunction)Comparator_compare_table,
2✔
697
     METH_VARARGS | METH_KEYWORDS, COMPARE_TABLE_DOC},
1✔
698
    {NULL}};
1✔
699

700
PyTypeObject ComparatorType = {
701
    PyVarObject_HEAD_INIT(
702
        NULL, 0) "dftracer_utils_ext.ComparatorUtility", /* tp_name */
703
    sizeof(ComparatorObject),                            /* tp_basicsize */
704
    0,                                                   /* tp_itemsize */
705
    (destructor)Comparator_dealloc,                      /* tp_dealloc */
706
    0,                                        /* tp_vectorcall_offset */
707
    0,                                        /* tp_getattr */
708
    0,                                        /* tp_setattr */
709
    0,                                        /* tp_as_async */
710
    0,                                        /* tp_repr */
711
    0,                                        /* tp_as_number */
712
    0,                                        /* tp_as_sequence */
713
    0,                                        /* tp_as_mapping */
714
    0,                                        /* tp_hash */
715
    Comparator_call,                          /* tp_call */
716
    0,                                        /* tp_str */
717
    0,                                        /* tp_getattro */
718
    0,                                        /* tp_setattro */
719
    0,                                        /* tp_as_buffer */
720
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
721
    "ComparatorUtility(runtime: Runtime | None = None)\n"
722
    "--\n\n"
723
    "Compare DFTracer trace metrics between baseline and "
724
    "variant.\n\n"
725
    "Args:\n"
726
    "    runtime (Runtime or None): Runtime for thread pool "
727
    "control.\n"
728
    "        If None, uses the default global Runtime.\n\n"
729
    "compare(baseline, variant, ...) -> ArrowTable\n"
730
    "    Run comparison and return a materialized Arrow "
731
    "table.\n\n"
732
    "compare_json(baseline, variant, ...) -> str\n"
733
    "    Run comparison and return JSON string.\n\n"
734
    "compare_table(baseline, variant, ...) -> str\n"
735
    "    Run comparison and return formatted table "
736
    "string.\n",               /* tp_doc */
737
    0,                         /* tp_traverse */
738
    0,                         /* tp_clear */
739
    0,                         /* tp_richcompare */
740
    0,                         /* tp_weaklistoffset */
741
    0,                         /* tp_iter */
742
    0,                         /* tp_iternext */
743
    Comparator_methods,        /* tp_methods */
744
    0,                         /* tp_members */
745
    0,                         /* tp_getset */
746
    0,                         /* tp_base */
747
    0,                         /* tp_dict */
748
    0,                         /* tp_descr_get */
749
    0,                         /* tp_descr_set */
750
    0,                         /* tp_dictoffset */
751
    (initproc)Comparator_init, /* tp_init */
752
    0,                         /* tp_alloc */
753
    Comparator_new,            /* tp_new */
754
};
755

756
int init_comparator(PyObject *m) {
2✔
757
    if (register_type(m, &ComparatorType, "ComparatorUtility") < 0) return -1;
2✔
758

759
    return 0;
2✔
760
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc