• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28521653886

01 Jul 2026 01:36PM UTC coverage: 50.92% (-1.4%) from 52.278%
28521653886

Pull #83

github

web-flow
Merge 9bdedb1e9 into 2efed6649
Pull Request #83: refactor and improve code QoL

31893 of 80049 branches covered (39.84%)

Branch coverage included in aggregate %.

789 of 1613 new or added lines in 87 files covered. (48.92%)

5007 existing lines in 181 files now uncovered.

32812 of 47024 relevant lines covered (69.78%)

9905.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.87
/src/dftracer/utils/python/utilities/comparator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <dftracer/utils/core/common/error.h>
3
#include <dftracer/utils/core/common/filesystem.h>
4
#include <dftracer/utils/core/common/platform_compat.h>
5
#include <dftracer/utils/core/coro/channel.h>
6
#include <dftracer/utils/core/coro/task.h>
7
#include <dftracer/utils/core/coro/when_all.h>
8
#include <dftracer/utils/core/pipeline/pipeline.h>
9
#include <dftracer/utils/core/pipeline/pipeline_config.h>
10
#include <dftracer/utils/core/runtime.h>
11
#include <dftracer/utils/core/tasks/coro_scope.h>
12
#include <dftracer/utils/core/tasks/task.h>
13
#include <dftracer/utils/python/arrow_helpers.h>
14
#include <dftracer/utils/python/py_runtime_mixin.h>
15
#include <dftracer/utils/python/py_type_helpers.h>
16
#include <dftracer/utils/python/runtime.h>
17
#include <dftracer/utils/python/utilities/comparator.h>
18
#include <dftracer/utils/utilities/common/query/query.h>
19
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregators.h>
20
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_config.h>
21
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_result.h>
22
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_utility.h>
23
#include <dftracer/utils/utilities/composites/dft/comparator/tree_table_formatter.h>
24
#include <dftracer/utils/utilities/composites/dft/indexing/index_resolver_utility.h>
25
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
26
#include <dftracer/utils/utilities/composites/dft/metadata_collector_utility.h>
27
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
28
#include <dftracer/utils/utilities/indexer/index_builder_utility.h>
29
#include <dftracer/utils/utilities/indexer/internal/indexer.h>
30

31
#include <atomic>
32
#include <chrono>
33
#include <cstdio>
34
#include <ctime>
35
#include <stdexcept>
36
#include <string>
37
#include <vector>
38

39
using dftracer::utils::Runtime;
40
using dftracer::utils::coro::CoroTask;
41
using namespace dftracer::utils;
42
using namespace dftracer::utils::utilities;
43
using namespace dftracer::utils::utilities::composites::dft::aggregators;
44
using namespace dftracer::utils::utilities::composites::dft::comparator;
45

46
#include <dftracer/utils/core/common/config.h>
47
#ifdef DFTRACER_UTILS_ENABLE_ARROW
48
using dftracer::utils::python::arrow_result_to_table;
49
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
50
#endif
51

52
static Runtime *get_runtime(ComparatorObject *self) {
13✔
53
    return resolve_runtime(self);
13✔
54
}
55

56
static void Comparator_dealloc(ComparatorObject *self) {
13✔
57
    runtime_backed_dealloc(self);
13✔
58
}
13✔
59

60
static PyObject *Comparator_new(PyTypeObject *type, PyObject *args,
13✔
61
                                PyObject *kwds) {
62
    return runtime_backed_new<ComparatorObject>(type, args, kwds);
13✔
63
}
64

65
static int Comparator_init(ComparatorObject *self, PyObject *args,
13✔
66
                           PyObject *kwds) {
67
    return runtime_backed_init(self, args, kwds);
13✔
68
}
69

70
// -----------------------------------------------------------------------
71
// Helpers
72
// -----------------------------------------------------------------------
73

74
struct ComparatorArgs {
39!
75
    std::string baseline;
76
    std::string variant;
77
    std::string query;
78
    std::string group_by;
79
    std::string format;
80
    double time_interval_ms = 5000.0;
13✔
81
    double threshold = 0.0;
13✔
82
    std::size_t executor_threads = 0;
13✔
83
    std::string baseline_index_dir;
84
    std::string variant_index_dir;
85
    bool force_rebuild = false;
13✔
86
    std::string config_path;
87
};
88

89
static int parse_comparator_args(PyObject *args, PyObject *kwds,
13✔
90
                                 ComparatorArgs &out) {
91
    static const char *kwlist[] = {"baseline",
92
                                   "variant",
93
                                   "query",
94
                                   "group_by",
95
                                   "format",
96
                                   "time_interval_ms",
97
                                   "threshold",
98
                                   "executor_threads",
99
                                   "baseline_index_dir",
100
                                   "variant_index_dir",
101
                                   "force_rebuild",
102
                                   "config",
103
                                   NULL};
104

105
    const char *baseline = NULL;
13✔
106
    const char *variant = NULL;
13✔
107
    const char *query = "";
13✔
108
    const char *group_by = "";
13✔
109
    const char *format = "table";
13✔
110
    double time_interval_ms = 5000.0;
13✔
111
    double threshold = 0.0;
13✔
112
    Py_ssize_t executor_threads = 0;
13✔
113
    const char *baseline_index_dir = "";
13✔
114
    const char *variant_index_dir = "";
13✔
115
    int force_rebuild = 0;
13✔
116
    const char *config = "";
13✔
117

118
    if (!PyArg_ParseTupleAndKeywords(
13!
119
            args, kwds, "ss|sssddnssps", (char **)kwlist, &baseline, &variant,
13✔
120
            &query, &group_by, &format, &time_interval_ms, &threshold,
121
            &executor_threads, &baseline_index_dir, &variant_index_dir,
122
            &force_rebuild, &config))
123
        return -1;
×
124

125
    out.baseline = baseline;
13✔
126
    out.variant = variant;
13✔
127
    out.query = query;
13✔
128
    out.group_by = group_by;
13✔
129
    out.format = format;
13✔
130
    out.time_interval_ms = time_interval_ms;
13✔
131
    out.threshold = threshold;
13✔
132
    out.executor_threads = static_cast<std::size_t>(executor_threads);
13✔
133
    out.baseline_index_dir = baseline_index_dir;
13✔
134
    out.variant_index_dir = variant_index_dir;
13✔
135
    out.force_rebuild = force_rebuild != 0;
13✔
136
    out.config_path = config;
13✔
137

138
    return 0;
13✔
139
}
13✔
140

141
namespace {
142

143
void flatten_nodes(const ComparisonNode &node,
13✔
144
                   std::vector<const ComparisonNode *> &out) {
145
    out.push_back(&node);
13✔
146
    for (const auto &child : node.children) {
13!
147
        flatten_nodes(child, out);
×
148
    }
149
}
13✔
150

151
CoroTask<EventAggregatorOutput> run_aggregation(
130!
152
    std::vector<std::string> input_files, AggregationConfig agg_config,
153
    std::optional<common::query::Query> query, std::string index_dir,
154
    std::size_t checkpoint_size, bool force_rebuild,
155
    std::size_t executor_threads) {
26!
156
    constexpr std::size_t CHUNK_SIZE_MB = 4;
26✔
157
    constexpr std::size_t BATCH_SIZE_MB = 4;
26✔
158

159
    auto pipeline_config = PipelineConfig()
52!
160
                               .with_name("DFTracer Comparator Aggregation")
26!
161
                               .with_compute_threads(executor_threads)
26!
162
                               .with_watchdog(false);
26!
163
    Pipeline pipeline(pipeline_config);
26!
164

165
    EventAggregator merger;
26!
166
    std::atomic<int> global_chunk_idx{0};
26✔
167

168
    auto streaming_task = make_task(
52!
169
        [&](CoroScope &ctx) -> CoroTask<void> {
208!
170
            auto chunk_chan = coro::make_channel<ChunkAggregatorInput>(0);
78!
171
            auto result_chan = coro::make_channel<ChunkAggregationOutput>(2);
78!
172

173
            co_await ctx.scope([&](CoroScope &scope) -> CoroTask<void> {
260!
174
                for (const auto &file_path : input_files) {
54✔
175
                    auto *global_chunk_idx_ptr = &global_chunk_idx;
28✔
176
                    scope.spawn([file_path, ch = chunk_chan->producer(),
446!
177
                                 index_dir, checkpoint_size, force_rebuild,
28!
178
                                 agg_config, query, global_chunk_idx_ptr](
28!
179
                                    CoroScope & /*fctx*/) mutable
180
                                    -> CoroTask<void> {
28!
181
                        [[maybe_unused]] auto producer_guard = ch.guard();
82!
182

183
                        std::string index_path =
82✔
184
                            composites::dft::internal::determine_index_path(
82!
185
                                file_path, index_dir);
82✔
186

187
                        auto meta_input =
82✔
188
                            composites::dft::MetadataCollectorUtilityInput::
164!
189
                                from_file(file_path)
82!
190
                                    .with_checkpoint_size(checkpoint_size)
82✔
191
                                    .with_force_rebuild(force_rebuild)
28!
192
                                    .with_index(index_path);
28✔
193
                        auto metadata =
82✔
194
                            co_await composites::dft::MetadataCollectorUtility{}
219!
195
                                .process(meta_input);
82!
196

197
                        if (!metadata.success) {
84!
UNCOV
198
                            co_return;
×
199
                        }
200

201
                        FileChunkMapperUtility file_mapper;
84!
202
                        auto mapper_input =
84✔
203
                            FileChunkMapperInput::from_metadata(metadata)
168!
204
                                .with_config(agg_config)
84✔
205
                                .with_checkpoint_size(checkpoint_size)
28!
206
                                .with_target_chunk_size(CHUNK_SIZE_MB)
28!
207
                                .with_batch_size(BATCH_SIZE_MB * 1024 * 1024);
28!
208
                        mapper_input.query = query;
84!
209
                        auto file_chunks =
84✔
210
                            co_await file_mapper.process(mapper_input);
112!
211

212
                        int start_idx = global_chunk_idx_ptr->fetch_add(
168✔
213
                            static_cast<int>(file_chunks.size()));
84✔
214
                        for (int i = 0;
112✔
215
                             i < static_cast<int>(file_chunks.size()); ++i) {
112✔
216
                            file_chunks[i].chunk_index = start_idx + i;
28✔
217
                        }
28✔
218

219
                        for (auto &chunk : file_chunks) {
112!
220
                            if (!co_await ch.send(std::move(chunk))) {
112!
UNCOV
221
                                co_return;
×
222
                            }
223
                        }
28!
224
                        co_return;
28✔
225
                    });
690✔
226
                }
28✔
227

228
                for (std::size_t w = 0; w < executor_threads; ++w) {
104✔
229
                    (void)w;
230
                    scope.spawn([chunk_chan, rp = result_chan->producer(),
676!
231
                                 result_chan](
78✔
232
                                    CoroScope &wctx) mutable -> CoroTask<void> {
81✔
233
                        [[maybe_unused]] auto producer_guard = rp.guard();
75!
234
                        while (auto input = co_await wctx.receive(chunk_chan)) {
504!
235
                            ChunkAggregatorUtility agg;
84!
236
                            auto output = co_await agg.process(*input);
112!
237
                            if (!co_await result_chan->send(
112!
238
                                    std::move(output))) {
UNCOV
239
                                co_return;
×
240
                            }
241
                        }
217✔
242
                        co_return;
78✔
243
                    });
771✔
244
                }
78✔
245

246
                auto *merger_ptr = &merger;
26✔
247
                scope.spawn([result_chan,
306!
248
                             merger_ptr](CoroScope &mctx) -> CoroTask<void> {
52!
249
                    while (auto output = co_await mctx.receive(result_chan)) {
173!
250
                        merger_ptr->merge_chunk(std::move(*output));
28!
251
                    }
54✔
252
                    co_return;
26✔
253
                });
124✔
254

255
                co_return;
52✔
256
            });
26✔
257

258
            co_return;
26✔
259
        },
78!
260
        "StreamingAggregate");
26!
261

262
    EventAggregatorOutput result;
26✔
263
    auto post_task = make_task(
52!
264
        [&](CoroScope & /*ctx*/) -> CoroTask<bool> {
156!
265
            result = merger.finalize();
26!
266
            co_return result.success;
52!
UNCOV
267
        },
×
268
        "Finalize");
26!
269

270
    post_task->depends_on(streaming_task);
26!
271
    pipeline.set_source(streaming_task);
26!
272
    pipeline.set_destination(post_task);
26!
273
    pipeline.execute();
26!
274

275
    co_return result;
52!
276
}
26✔
277

278
}  // namespace
279

280
static bool run_comparison_pipeline(ComparatorObject *self,
13✔
281
                                    const ComparatorArgs &cargs,
282
                                    ComparisonOutput &output) {
283
    ComparatorArgs args_copy = cargs;
13✔
284
    auto *output_ptr = &output;
13✔
285

286
    return run_blocking([&] {
26!
287
        ComparisonConfig config;
13✔
288
        if (!args_copy.config_path.empty()) {
13!
289
            std::string parse_error;
×
UNCOV
290
            auto parsed = ComparisonConfig::from_json_file(
×
291
                args_copy.config_path, parse_error);
×
292
            if (!parsed) {
×
NEW
293
                throw DFTUtilsException(ErrorCode::PARSE,
×
NEW
294
                                        "Config error: " + parse_error);
×
295
            }
296
            config = std::move(*parsed);
×
297
        } else {
×
298
            config = ComparisonConfig::from_cli(
13!
299
                args_copy.baseline, args_copy.variant, args_copy.query,
13✔
300
                args_copy.group_by);
13✔
301
        }
302

303
        config.format = args_copy.format;
13!
304
        config.no_color = true;
13✔
305
        if (args_copy.executor_threads > 0)
13!
306
            config.executor_threads = args_copy.executor_threads;
×
307
        if (!args_copy.baseline_index_dir.empty())
13!
308
            config.baseline_index_dir = args_copy.baseline_index_dir;
×
309
        if (!args_copy.variant_index_dir.empty())
13!
310
            config.variant_index_dir = args_copy.variant_index_dir;
×
311
        if (args_copy.force_rebuild)
13!
312
            config.force_rebuild = args_copy.force_rebuild;
×
313
        if (args_copy.threshold > 0.0)
13!
314
            config.defaults.threshold_pct = args_copy.threshold;
×
315
        if (args_copy.time_interval_ms > 0.0)
13!
316
            config.defaults.time_interval_ms = args_copy.time_interval_ms;
13✔
317

318
        config.resolve();
13!
319

320
        if (config.executor_threads == 0) {
13!
321
            config.executor_threads = dftracer_utils_hardware_concurrency();
13!
322
        }
13✔
323
        if (config.checkpoint_size == 0) {
13!
324
            config.checkpoint_size =
13✔
325
                indexer::internal::Indexer::DEFAULT_CHECKPOINT_SIZE;
326
        }
13✔
327

328
        using composites::dft::indexing::IndexResolverUtility;
329
        using composites::dft::indexing::ResolverInput;
330
        using indexer::IndexBatchBuilderUtility;
331
        using indexer::IndexBuildBatchConfig;
332

333
        Runtime *rt = get_runtime(self);
13!
334

335
        auto task = [config, output_ptr, rt]() -> CoroTask<void> {
156!
336
            auto resolve_and_build =
39✔
337
                [&config](
156!
338
                    CoroScope &scope, const std::string &path,
339
                    const std::string &index_dir,
340
                    std::vector<std::string> &out_files) -> CoroTask<void> {
13!
341
                IndexResolverUtility resolver;
13!
342
                ResolverInput resolve_input;
13✔
343
                resolve_input.index_dir = index_dir;
13!
344
                resolve_input.require_checkpoints = !config.force_rebuild;
13✔
345
                if (fs::is_regular_file(path)) {
13!
346
                    resolve_input.files = {path};
12!
347
                } else {
12✔
348
                    resolve_input.directory = path;
1!
349
                }
350

351
                auto result = co_await resolver.process(resolve_input);
39!
352
                out_files = std::move(result.all_files);
39✔
353

354
                if (out_files.empty() || result.needs_checkpoint.empty()) {
39✔
355
                    co_return;
52✔
356
                }
357

358
                auto batch_cfg = std::make_shared<IndexBuildBatchConfig>();
39!
359
                batch_cfg->file_paths.reserve(result.needs_checkpoint.size());
39✔
360
                for (const auto &item : result.needs_checkpoint) {
27✔
361
                    batch_cfg->file_paths.push_back(item.file_path);
14!
362
                }
14✔
363
                batch_cfg->index_dir = index_dir;
13✔
364
                batch_cfg->checkpoint_size = config.checkpoint_size;
39✔
365
                batch_cfg->parallelism = config.executor_threads;
39✔
366
                batch_cfg->force_rebuild = config.force_rebuild;
39✔
367
                batch_cfg->use_batch_write = true;
39✔
368
                batch_cfg->rebuild_root_summaries = true;
39✔
369

370
                co_await IndexBatchBuilderUtility::process(
91!
371
                    &scope, std::move(batch_cfg));
39✔
372
            };
143!
373

374
            std::vector<std::string> baseline_files;
39✔
375
            std::vector<std::string> variant_files;
39✔
376

377
            bool shared_index =
78✔
378
                composites::dft::internal::determine_index_path(
117!
379
                    config.baseline, config.baseline_index_dir) ==
78✔
380
                composites::dft::internal::determine_index_path(
78!
381
                    config.variant, config.variant_index_dir);
39✔
382

383
            co_await run_coro_scope(
52!
384
                rt->executor(), [&](CoroScope &scope) -> CoroTask<void> {
156!
385
                    if (shared_index) {
39!
386
                        co_await resolve_and_build(scope, config.baseline,
104!
387
                                                   config.baseline_index_dir,
39✔
388
                                                   baseline_files);
39✔
389
                        if (config.baseline == config.variant) {
13!
390
                            variant_files = baseline_files;
13!
391
                        } else {
13✔
UNCOV
392
                            co_await resolve_and_build(scope, config.variant,
×
UNCOV
393
                                                       config.variant_index_dir,
×
UNCOV
394
                                                       variant_files);
×
395
                        }
396
                    } else {
13✔
397
                        scope.spawn([&](CoroScope &s) -> CoroTask<void> {
×
UNCOV
398
                            co_await resolve_and_build(
×
UNCOV
399
                                s, config.baseline, config.baseline_index_dir,
×
UNCOV
400
                                baseline_files);
×
401
                        });
×
UNCOV
402
                        co_await resolve_and_build(scope, config.variant,
×
UNCOV
403
                                                   config.variant_index_dir,
×
UNCOV
404
                                                   variant_files);
×
405
                    }
406
                });
26✔
407

408
            if (baseline_files.empty()) {
39!
NEW
409
                throw DFTUtilsException(
×
410
                    ErrorCode::NOT_FOUND,
NEW
411
                    "No trace files found in baseline: " + config.baseline);
×
412
            }
413
            if (variant_files.empty()) {
39!
NEW
414
                throw DFTUtilsException(
×
415
                    ErrorCode::NOT_FOUND,
NEW
416
                    "No trace files found in variant: " + config.variant);
×
417
            }
418

419
            output_ptr->baseline_path = config.baseline;
39✔
420
            output_ptr->variant_path = config.variant;
13✔
421

422
            auto start_time = std::chrono::high_resolution_clock::now();
39✔
423

424
            std::size_t b_files_actual = 0;
39✔
425
            std::size_t v_files_actual = 0;
39✔
426
            bool metadata_set = false;
39✔
427

428
            for (auto &node : config.nodes) {
52!
429
                std::vector<const ComparisonNode *> visitors;
39✔
430
                flatten_nodes(node, visitors);
39!
431

432
                std::vector<ComparisonVisitorPair> pairs;
39✔
433
                pairs.reserve(visitors.size());
39!
434

435
                for (const auto *visitor : visitors) {
78!
436
                    std::optional<common::query::Query> query;
39✔
437
                    if (!visitor->composed_query.empty()) {
39✔
438
                        auto result = common::query::Query::from_string(
26!
439
                            visitor->composed_query);
13✔
440
                        if (!result) {
13!
NEW
441
                            throw DFTUtilsException(
×
442
                                ErrorCode::QUERY,
NEW
443
                                "Invalid query for node '" + visitor->name +
×
NEW
444
                                    "': " + result.error().format());
×
445
                        }
446
                        query = std::move(*result);
13!
447
                    }
13✔
448

449
                    AggregationConfig agg_cfg;
39!
450
                    agg_cfg.time_interval_us = static_cast<std::uint64_t>(
39✔
451
                        config.defaults.time_interval_ms * 1000.0);
39✔
452
                    agg_cfg.extra_group_keys = {};
39✔
453
                    agg_cfg.compute_statistics = true;
13✔
454
                    agg_cfg.compute_percentiles = true;
13✔
455
                    agg_cfg.percentiles = visitor->resolved_percentiles;
13✔
456
                    agg_cfg.sketch_accuracy = 0.01;
39✔
457
                    agg_cfg.track_process_parents = false;
39✔
458

459
                    auto [base_result, var_result] = co_await coro::when_all(
91!
460
                        run_aggregation(
78!
461
                            baseline_files, agg_cfg, query,
39!
462
                            config.baseline_index_dir, config.checkpoint_size,
39!
463
                            config.force_rebuild, config.executor_threads),
39✔
464
                        run_aggregation(
78!
465
                            variant_files, agg_cfg, query,
39!
466
                            config.variant_index_dir, config.checkpoint_size,
39!
467
                            config.force_rebuild, config.executor_threads));
39✔
468

469
                    if (!metadata_set) {
13!
470
                        b_files_actual = base_result.total_files_processed;
13✔
471
                        v_files_actual = var_result.total_files_processed;
13✔
472
                        output_ptr->baseline_file_count = b_files_actual;
13✔
473
                        output_ptr->variant_file_count = v_files_actual;
13✔
474
                        output_ptr->baseline_meta = extract_metadata(
26!
475
                            base_result.aggregations, b_files_actual);
13✔
476
                        output_ptr->variant_meta = extract_metadata(
26!
477
                            var_result.aggregations, v_files_actual);
13✔
478
                        metadata_set = true;
13✔
479
                    }
13✔
480

481
                    ComparisonVisitorPair pair;
13✔
482
                    pair.baseline = std::move(base_result);
13✔
483
                    pair.variant = std::move(var_result);
13✔
484
                    pair.node = *visitor;
13!
485
                    pairs.push_back(std::move(pair));
13!
486
                }
13!
487

488
                ComparisonUtilityInput cmp_input;
39✔
489
                cmp_input.visitors = std::move(pairs);
39✔
490
                cmp_input.root_node = node;
39!
491
                cmp_input.baseline_file_count = b_files_actual;
39✔
492
                cmp_input.variant_file_count = v_files_actual;
39✔
493

494
                ComparisonUtility cmp;
39!
495
                auto cmp_output = co_await cmp.process(cmp_input);
52!
496
                output_ptr->nodes.push_back(std::move(cmp_output.result));
13!
497
            }
13!
498

499
            // Inject metadata rows into root SUMMARY.
500
            auto meta_rows = build_metadata_metrics(output_ptr->baseline_meta,
26!
501
                                                    output_ptr->variant_meta);
13✔
502
            for (auto &node : output_ptr->nodes) {
26✔
503
                node.summary.metrics.insert(node.summary.metrics.begin(),
39!
504
                                            meta_rows.begin(), meta_rows.end());
26✔
505
            }
13✔
506

507
            auto end_time = std::chrono::high_resolution_clock::now();
13✔
508
            std::chrono::duration<double, std::milli> duration =
13✔
509
                end_time - start_time;
13!
510
            output_ptr->execution_time_ms = duration.count();
13!
511
        };
299✔
512

513
        rt->submit(task(), "comparator").get();
13!
514
    });
13✔
515
}
13✔
516

517
// -----------------------------------------------------------------------
518
// compare() -- returns ArrowTable
519
// -----------------------------------------------------------------------
520

521
static PyObject *Comparator_compare(ComparatorObject *self, PyObject *args,
6✔
522
                                    PyObject *kwds) {
523
    ComparatorArgs cargs;
6✔
524
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
6!
525

526
    ComparisonOutput output;
6✔
527
    if (!run_comparison_pipeline(self, cargs, output)) {
6!
UNCOV
528
        return NULL;
×
529
    }
530

531
#ifdef DFTRACER_UTILS_ENABLE_ARROW
532
    auto arrow_result = output.to_arrow();
6!
533
    if (!arrow_result.valid()) {
6!
534
        PyErr_SetString(PyExc_RuntimeError,
×
535
                        "Failed to convert comparison output "
536
                        "to Arrow");
537
        return NULL;
×
538
    }
539
    return arrow_result_to_table(std::move(arrow_result));
6!
540
#else
541
    PyErr_SetString(PyExc_RuntimeError,
542
                    "dftracer-utils was built without Arrow support");
543
    return NULL;
544
#endif
545
}
6✔
546

547
// -----------------------------------------------------------------------
548
// compare_json() -- returns JSON string
549
// -----------------------------------------------------------------------
550

551
static PyObject *Comparator_compare_json(ComparatorObject *self, PyObject *args,
4✔
552
                                         PyObject *kwds) {
553
    ComparatorArgs cargs;
4✔
554
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
4!
555

556
    ComparisonOutput output;
4✔
557
    if (!run_comparison_pipeline(self, cargs, output)) {
4!
UNCOV
558
        return NULL;
×
559
    }
560

561
    TreeTableFormatter formatter;
4!
562
    std::string json = formatter.render_json(output);
4!
563
    return PyUnicode_FromStringAndSize(json.data(), (Py_ssize_t)json.size());
4!
564
}
4✔
565

566
// -----------------------------------------------------------------------
567
// compare_table() -- returns formatted table string
568
// -----------------------------------------------------------------------
569

570
static PyObject *Comparator_compare_table(ComparatorObject *self,
3✔
571
                                          PyObject *args, PyObject *kwds) {
572
    ComparatorArgs cargs;
3✔
573
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
3!
574

575
    ComparisonOutput output;
3✔
576
    if (!run_comparison_pipeline(self, cargs, output)) {
3!
UNCOV
577
        return NULL;
×
578
    }
579

580
    FormatterOptions fmt_opts;
3✔
581
    fmt_opts.use_color = false;
3✔
582
    fmt_opts.use_unicode = false;
3✔
583
    TreeTableFormatter formatter(fmt_opts);
3!
584

585
    // Render to a temporary file and read back as string
586
    char *buf = NULL;
3✔
587
    std::size_t buf_size = 0;
3✔
588
    FILE *memstream = open_memstream(&buf, &buf_size);
3!
589
    if (!memstream) {
3!
590
        PyErr_SetString(PyExc_RuntimeError, "Failed to create memory stream");
×
591
        return NULL;
×
592
    }
593

594
    formatter.render(memstream, output);
3!
595
    fflush(memstream);
3!
596
    fclose(memstream);
3!
597

598
    PyObject *result = PyUnicode_FromStringAndSize(buf, (Py_ssize_t)buf_size);
3!
599
    free(buf);
3!
600
    return result;
3✔
601
}
3✔
602

603
// -----------------------------------------------------------------------
604
// __call__ delegates to compare()
605
// -----------------------------------------------------------------------
606

607
static PyObject *Comparator_call(PyObject *self, PyObject *args,
1✔
608
                                 PyObject *kwds) {
609
    return Comparator_compare((ComparatorObject *)self, args, kwds);
1✔
610
}
611

612
// -----------------------------------------------------------------------
613
// Method table
614
// -----------------------------------------------------------------------
615

616
static const char *COMPARE_DOC =
617
    "compare(baseline, variant, query='', group_by='',\n"
618
    "        format='table', time_interval_ms=5000.0,\n"
619
    "        threshold=0.0, executor_threads=0,\n"
620
    "        index_dir='', force_rebuild=False, config='')\n"
621
    "--\n"
622
    "\n"
623
    "Run comparison pipeline, return materialized ArrowTable.\n"
624
    "\n"
625
    "Args:\n"
626
    "    baseline (str): Baseline trace file or directory.\n"
627
    "    variant (str): Variant trace file or directory.\n"
628
    "    query (str): Query filter (default: all events).\n"
629
    "    group_by (str): Comma-separated group keys.\n"
630
    "    format (str): Output format (default 'table').\n"
631
    "    time_interval_ms (float): Time bucket in ms "
632
    "(default 5000).\n"
633
    "    threshold (float): Hide changes below this pct.\n"
634
    "    executor_threads (int): Parallel threads (0=auto).\n"
635
    "    index_dir (str): Directory for .dftindex stores.\n"
636
    "    force_rebuild (bool): Force index rebuild.\n"
637
    "    config (str): JSON config file path.\n"
638
    "\n"
639
    "Returns:\n"
640
    "    ArrowTable: Comparison results.\n";
641

642
static const char *COMPARE_JSON_DOC =
643
    "compare_json(baseline, variant, query='', group_by='',\n"
644
    "             format='table', time_interval_ms=5000.0,\n"
645
    "             threshold=0.0, executor_threads=0,\n"
646
    "             index_dir='', force_rebuild=False, "
647
    "config='')\n"
648
    "--\n"
649
    "\n"
650
    "Run comparison pipeline, return JSON string.\n"
651
    "\n"
652
    "Args:\n"
653
    "    baseline (str): Baseline trace file or directory.\n"
654
    "    variant (str): Variant trace file or directory.\n"
655
    "    query (str): Query filter (default: all events).\n"
656
    "    group_by (str): Comma-separated group keys.\n"
657
    "    format (str): Output format (default 'table').\n"
658
    "    time_interval_ms (float): Time bucket in ms "
659
    "(default 5000).\n"
660
    "    threshold (float): Hide changes below this pct.\n"
661
    "    executor_threads (int): Parallel threads (0=auto).\n"
662
    "    index_dir (str): Directory for .dftindex stores.\n"
663
    "    force_rebuild (bool): Force index rebuild.\n"
664
    "    config (str): JSON config file path.\n"
665
    "\n"
666
    "Returns:\n"
667
    "    str: JSON representation of comparison results.\n";
668

669
static const char *COMPARE_TABLE_DOC =
670
    "compare_table(baseline, variant, query='', group_by='',\n"
671
    "              format='table', time_interval_ms=5000.0,\n"
672
    "              threshold=0.0, executor_threads=0,\n"
673
    "              index_dir='', force_rebuild=False, "
674
    "config='')\n"
675
    "--\n"
676
    "\n"
677
    "Run comparison pipeline, return formatted table string.\n"
678
    "\n"
679
    "Args:\n"
680
    "    baseline (str): Baseline trace file or directory.\n"
681
    "    variant (str): Variant trace file or directory.\n"
682
    "    query (str): Query filter (default: all events).\n"
683
    "    group_by (str): Comma-separated group keys.\n"
684
    "    format (str): Output format (default 'table').\n"
685
    "    time_interval_ms (float): Time bucket in ms "
686
    "(default 5000).\n"
687
    "    threshold (float): Hide changes below this pct.\n"
688
    "    executor_threads (int): Parallel threads (0=auto).\n"
689
    "    index_dir (str): Directory for .dftindex stores.\n"
690
    "    force_rebuild (bool): Force index rebuild.\n"
691
    "    config (str): JSON config file path.\n"
692
    "\n"
693
    "Returns:\n"
694
    "    str: Formatted ASCII table of comparison results.\n";
695

696
static PyMethodDef Comparator_methods[] = {
1✔
697
    {"compare", (PyCFunction)Comparator_compare, METH_VARARGS | METH_KEYWORDS,
2✔
698
     COMPARE_DOC},
1✔
699
    {"compare_json", (PyCFunction)Comparator_compare_json,
2✔
700
     METH_VARARGS | METH_KEYWORDS, COMPARE_JSON_DOC},
1✔
701
    {"compare_table", (PyCFunction)Comparator_compare_table,
2✔
702
     METH_VARARGS | METH_KEYWORDS, COMPARE_TABLE_DOC},
1✔
703
    {NULL}};
1✔
704

705
PyTypeObject ComparatorType = {
706
    PyVarObject_HEAD_INIT(
707
        NULL, 0) "dftracer_utils_ext.ComparatorUtility", /* tp_name */
708
    sizeof(ComparatorObject),                            /* tp_basicsize */
709
    0,                                                   /* tp_itemsize */
710
    (destructor)Comparator_dealloc,                      /* tp_dealloc */
711
    0,                                        /* tp_vectorcall_offset */
712
    0,                                        /* tp_getattr */
713
    0,                                        /* tp_setattr */
714
    0,                                        /* tp_as_async */
715
    0,                                        /* tp_repr */
716
    0,                                        /* tp_as_number */
717
    0,                                        /* tp_as_sequence */
718
    0,                                        /* tp_as_mapping */
719
    0,                                        /* tp_hash */
720
    Comparator_call,                          /* tp_call */
721
    0,                                        /* tp_str */
722
    0,                                        /* tp_getattro */
723
    0,                                        /* tp_setattro */
724
    0,                                        /* tp_as_buffer */
725
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
726
    "ComparatorUtility(runtime: Runtime | None = None)\n"
727
    "--\n\n"
728
    "Compare DFTracer trace metrics between baseline and "
729
    "variant.\n\n"
730
    "Args:\n"
731
    "    runtime (Runtime or None): Runtime for thread pool "
732
    "control.\n"
733
    "        If None, uses the default global Runtime.\n\n"
734
    "compare(baseline, variant, ...) -> ArrowTable\n"
735
    "    Run comparison and return a materialized Arrow "
736
    "table.\n\n"
737
    "compare_json(baseline, variant, ...) -> str\n"
738
    "    Run comparison and return JSON string.\n\n"
739
    "compare_table(baseline, variant, ...) -> str\n"
740
    "    Run comparison and return formatted table "
741
    "string.\n",               /* tp_doc */
742
    0,                         /* tp_traverse */
743
    0,                         /* tp_clear */
744
    0,                         /* tp_richcompare */
745
    0,                         /* tp_weaklistoffset */
746
    0,                         /* tp_iter */
747
    0,                         /* tp_iternext */
748
    Comparator_methods,        /* tp_methods */
749
    0,                         /* tp_members */
750
    0,                         /* tp_getset */
751
    0,                         /* tp_base */
752
    0,                         /* tp_dict */
753
    0,                         /* tp_descr_get */
754
    0,                         /* tp_descr_set */
755
    0,                         /* tp_dictoffset */
756
    (initproc)Comparator_init, /* tp_init */
757
    0,                         /* tp_alloc */
758
    Comparator_new,            /* tp_new */
759
};
760

761
int init_comparator(PyObject *m) {
1✔
762
    if (register_type(m, &ComparatorType, "ComparatorUtility") < 0) return -1;
1!
763

764
    return 0;
1✔
765
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc