• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26932094552

04 Jun 2026 05:08AM UTC coverage: 49.905% (-2.3%) from 52.184%
26932094552

push

github

hariharan-devarajan
chore(utils): add portable to_chars_double fallback for macOS and update zstd handling

- Introduce to_chars_double wrapper that falls back to snprintf on macOS < 13.3
- Force CPM-built zstd on Apple to avoid deployment target mismatches
- Update version patch to 8

16076 of 43875 branches covered (36.64%)

Branch coverage included in aggregate %.

0 of 3 new or added lines in 1 file covered. (0.0%)

660 existing lines in 103 files now uncovered.

21461 of 31342 relevant lines covered (68.47%)

13056.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.77
/src/dftracer/utils/python/utilities/comparator.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <dftracer/utils/core/common/filesystem.h>
3
#include <dftracer/utils/core/common/platform_compat.h>
4
#include <dftracer/utils/core/coro/channel.h>
5
#include <dftracer/utils/core/coro/task.h>
6
#include <dftracer/utils/core/coro/when_all.h>
7
#include <dftracer/utils/core/pipeline/pipeline.h>
8
#include <dftracer/utils/core/pipeline/pipeline_config.h>
9
#include <dftracer/utils/core/runtime.h>
10
#include <dftracer/utils/core/tasks/coro_scope.h>
11
#include <dftracer/utils/core/tasks/task.h>
12
#include <dftracer/utils/python/arrow_helpers.h>
13
#include <dftracer/utils/python/runtime.h>
14
#include <dftracer/utils/python/utilities/comparator.h>
15
#include <dftracer/utils/utilities/common/query/query.h>
16
#include <dftracer/utils/utilities/composites/dft/aggregators/aggregators.h>
17
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_config.h>
18
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_result.h>
19
#include <dftracer/utils/utilities/composites/dft/comparator/comparison_utility.h>
20
#include <dftracer/utils/utilities/composites/dft/comparator/tree_table_formatter.h>
21
#include <dftracer/utils/utilities/composites/dft/indexing/index_resolver_utility.h>
22
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
23
#include <dftracer/utils/utilities/composites/dft/metadata_collector_utility.h>
24
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
25
#include <dftracer/utils/utilities/indexer/index_builder_utility.h>
26
#include <dftracer/utils/utilities/indexer/internal/indexer.h>
27

28
#include <atomic>
29
#include <chrono>
30
#include <cstdio>
31
#include <ctime>
32
#include <string>
33
#include <vector>
34

35
using dftracer::utils::Runtime;
36
using dftracer::utils::coro::CoroTask;
37
using namespace dftracer::utils;
38
using namespace dftracer::utils::utilities;
39
using namespace dftracer::utils::utilities::composites::dft::aggregators;
40
using namespace dftracer::utils::utilities::composites::dft::comparator;
41

42
#include <dftracer/utils/core/common/config.h>
43
#ifdef DFTRACER_UTILS_ENABLE_ARROW
44
using dftracer::utils::python::arrow_result_to_table;
45
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
46
#endif
47

48
static Runtime *get_runtime(ComparatorObject *self) {
13✔
49
    if (self->runtime_obj)
13!
50
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
×
51
    return get_default_runtime();
13✔
52
}
53

54
static void Comparator_dealloc(ComparatorObject *self) {
13✔
55
    Py_XDECREF(self->runtime_obj);
13✔
56
    Py_TYPE(self)->tp_free((PyObject *)self);
13✔
57
}
13✔
58

59
static PyObject *Comparator_new(PyTypeObject *type, PyObject *args,
13✔
60
                                PyObject *kwds) {
61
    ComparatorObject *self = (ComparatorObject *)type->tp_alloc(type, 0);
13✔
62
    if (self) {
13!
63
        self->runtime_obj = NULL;
13✔
64
    }
65
    return (PyObject *)self;
13✔
66
}
67

68
static int Comparator_init(ComparatorObject *self, PyObject *args,
13✔
69
                           PyObject *kwds) {
70
    static const char *kwlist[] = {"runtime", NULL};
71
    PyObject *runtime_arg = NULL;
13✔
72

73
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char **)kwlist,
13!
74
                                     &runtime_arg)) {
75
        return -1;
×
76
    }
77

78
    if (runtime_arg && runtime_arg != Py_None) {
13!
79
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
×
80
            Py_INCREF(runtime_arg);
×
81
            self->runtime_obj = runtime_arg;
×
82
        } else {
83
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
×
84
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
×
85
                self->runtime_obj = native;
×
86
            } else {
87
                Py_XDECREF(native);
×
88
                PyErr_SetString(PyExc_TypeError,
×
89
                                "runtime must be a Runtime instance or None");
90
                return -1;
×
91
            }
92
        }
93
    }
94

95
    return 0;
13✔
96
}
97

98
// -----------------------------------------------------------------------
99
// Helpers
100
// -----------------------------------------------------------------------
101

102
struct ComparatorArgs {
103
    std::string baseline;
104
    std::string variant;
105
    std::string query;
106
    std::string group_by;
107
    std::string format;
108
    double time_interval_ms = 5000.0;
109
    double threshold = 0.0;
110
    std::size_t executor_threads = 0;
111
    std::string baseline_index_dir;
112
    std::string variant_index_dir;
113
    bool force_rebuild = false;
114
    std::string config_path;
115
};
116

117
static int parse_comparator_args(PyObject *args, PyObject *kwds,
13✔
118
                                 ComparatorArgs &out) {
119
    static const char *kwlist[] = {"baseline",
120
                                   "variant",
121
                                   "query",
122
                                   "group_by",
123
                                   "format",
124
                                   "time_interval_ms",
125
                                   "threshold",
126
                                   "executor_threads",
127
                                   "baseline_index_dir",
128
                                   "variant_index_dir",
129
                                   "force_rebuild",
130
                                   "config",
131
                                   NULL};
132

133
    const char *baseline = NULL;
13✔
134
    const char *variant = NULL;
13✔
135
    const char *query = "";
13✔
136
    const char *group_by = "";
13✔
137
    const char *format = "table";
13✔
138
    double time_interval_ms = 5000.0;
13✔
139
    double threshold = 0.0;
13✔
140
    Py_ssize_t executor_threads = 0;
13✔
141
    const char *baseline_index_dir = "";
13✔
142
    const char *variant_index_dir = "";
13✔
143
    int force_rebuild = 0;
13✔
144
    const char *config = "";
13✔
145

146
    if (!PyArg_ParseTupleAndKeywords(
13!
147
            args, kwds, "ss|sssddnssps", (char **)kwlist, &baseline, &variant,
148
            &query, &group_by, &format, &time_interval_ms, &threshold,
149
            &executor_threads, &baseline_index_dir, &variant_index_dir,
150
            &force_rebuild, &config))
151
        return -1;
×
152

153
    out.baseline = baseline;
13!
154
    out.variant = variant;
13!
155
    out.query = query;
13!
156
    out.group_by = group_by;
13!
157
    out.format = format;
13!
158
    out.time_interval_ms = time_interval_ms;
13✔
159
    out.threshold = threshold;
13✔
160
    out.executor_threads = static_cast<std::size_t>(executor_threads);
13✔
161
    out.baseline_index_dir = baseline_index_dir;
13!
162
    out.variant_index_dir = variant_index_dir;
13!
163
    out.force_rebuild = force_rebuild != 0;
13✔
164
    out.config_path = config;
13!
165

166
    return 0;
13✔
167
}
168

169
namespace {
170

171
void flatten_nodes(const ComparisonNode &node,
13✔
172
                   std::vector<const ComparisonNode *> &out) {
173
    out.push_back(&node);
13!
174
    for (const auto &child : node.children) {
13!
175
        flatten_nodes(child, out);
×
176
    }
177
}
13✔
178

179
CoroTask<EventAggregatorOutput> run_aggregation(
26!
180
    std::vector<std::string> input_files, AggregationConfig agg_config,
181
    std::optional<common::query::Query> query, std::string index_dir,
182
    std::size_t checkpoint_size, bool force_rebuild,
183
    std::size_t executor_threads) {
184
    constexpr std::size_t CHUNK_SIZE_MB = 4;
185
    constexpr std::size_t BATCH_SIZE_MB = 4;
186

187
    auto pipeline_config = PipelineConfig()
188
                               .with_name("DFTracer Comparator Aggregation")
189
                               .with_compute_threads(executor_threads)
190
                               .with_watchdog(false);
191
    Pipeline pipeline(pipeline_config);
192

193
    EventAggregator merger;
194
    std::atomic<int> global_chunk_idx{0};
195

196
    auto streaming_task = make_task(
197
        [&](CoroScope &ctx) -> CoroTask<void> {
26!
198
            auto chunk_chan = coro::make_channel<ChunkAggregatorInput>(0);
199
            auto result_chan = coro::make_channel<ChunkAggregationOutput>(2);
200

201
            co_await ctx.scope([&](CoroScope &scope) -> CoroTask<void> {
26!
202
                for (const auto &file_path : input_files) {
203
                    auto *global_chunk_idx_ptr = &global_chunk_idx;
204
                    scope.spawn([file_path, ch = chunk_chan->producer(),
28!
205
                                 index_dir, checkpoint_size, force_rebuild,
206
                                 agg_config, query, global_chunk_idx_ptr](
207
                                    CoroScope & /*fctx*/) mutable
208
                                    -> CoroTask<void> {
209
                        [[maybe_unused]] auto producer_guard = ch.guard();
210

211
                        std::string index_path =
212
                            composites::dft::internal::determine_index_path(
213
                                file_path, index_dir);
214

215
                        auto meta_input =
216
                            composites::dft::MetadataCollectorUtilityInput::
217
                                from_file(file_path)
218
                                    .with_checkpoint_size(checkpoint_size)
219
                                    .with_force_rebuild(force_rebuild)
220
                                    .with_index(index_path);
221
                        auto metadata =
222
                            co_await composites::dft::MetadataCollectorUtility{}
223
                                .process(meta_input);
224

225
                        if (!metadata.success) {
226
                            co_return;
227
                        }
228

229
                        FileChunkMapperUtility file_mapper;
230
                        auto mapper_input =
231
                            FileChunkMapperInput::from_metadata(metadata)
232
                                .with_config(agg_config)
233
                                .with_checkpoint_size(checkpoint_size)
234
                                .with_target_chunk_size(CHUNK_SIZE_MB)
235
                                .with_batch_size(BATCH_SIZE_MB * 1024 * 1024);
236
                        mapper_input.query = query;
237
                        auto file_chunks =
238
                            co_await file_mapper.process(mapper_input);
239

240
                        int start_idx = global_chunk_idx_ptr->fetch_add(
241
                            static_cast<int>(file_chunks.size()));
242
                        for (int i = 0;
243
                             i < static_cast<int>(file_chunks.size()); ++i) {
244
                            file_chunks[i].chunk_index = start_idx + i;
245
                        }
246

247
                        for (auto &chunk : file_chunks) {
248
                            if (!co_await ch.send(std::move(chunk))) {
249
                                co_return;
250
                            }
251
                        }
252
                        co_return;
253
                    });
56!
254
                }
255

256
                for (std::size_t w = 0; w < executor_threads; ++w) {
257
                    (void)w;
258
                    scope.spawn([chunk_chan, rp = result_chan->producer(),
103!
259
                                 result_chan](
260
                                    CoroScope &wctx) mutable -> CoroTask<void> {
261
                        [[maybe_unused]] auto producer_guard = rp.guard();
262
                        while (auto input = co_await wctx.receive(chunk_chan)) {
263
                            ChunkAggregatorUtility agg;
264
                            auto output = co_await agg.process(*input);
265
                            if (!co_await result_chan->send(
266
                                    std::move(output))) {
267
                                co_return;
268
                            }
269
                        }
270
                        co_return;
271
                    });
203!
272
                }
273

274
                auto *merger_ptr = &merger;
275
                scope.spawn([result_chan,
26!
276
                             merger_ptr](CoroScope &mctx) -> CoroTask<void> {
277
                    while (auto output = co_await mctx.receive(result_chan)) {
278
                        merger_ptr->merge_chunk(std::move(*output));
279
                    }
280
                    co_return;
281
                });
52!
282

283
                co_return;
284
            });
52!
285

286
            co_return;
287
        },
52!
288
        "StreamingAggregate");
289

290
    EventAggregatorOutput result;
291
    auto post_task = make_task(
292
        [&](CoroScope & /*ctx*/) -> CoroTask<bool> {
26!
293
            result = merger.finalize();
294
            co_return result.success;
295
        },
52!
296
        "Finalize");
297

298
    post_task->depends_on(streaming_task);
299
    pipeline.set_source(streaming_task);
300
    pipeline.set_destination(post_task);
301
    pipeline.execute();
302

303
    co_return result;
304
}
52!
305

306
}  // namespace
307

308
static int run_comparison_pipeline(ComparatorObject *self,
13✔
309
                                   const ComparatorArgs &cargs,
310
                                   ComparisonOutput &output,
311
                                   std::string &error_msg) {
312
    ComparatorArgs args_copy = cargs;
13!
313
    auto *output_ptr = &output;
13✔
314

315
    Py_BEGIN_ALLOW_THREADS try {
13!
316
        ComparisonConfig config;
13!
317
        if (!args_copy.config_path.empty()) {
13!
318
            std::string parse_error;
×
319
            auto parsed = ComparisonConfig::from_json_file(
320
                args_copy.config_path, parse_error);
×
321
            if (!parsed) {
×
322
                error_msg = "Config error: " + parse_error;
×
323
                goto done;
×
324
            }
325
            config = std::move(*parsed);
×
326
        } else {
×
327
            config = ComparisonConfig::from_cli(
26!
328
                args_copy.baseline, args_copy.variant, args_copy.query,
329
                args_copy.group_by);
13✔
330
        }
331

332
        config.format = args_copy.format;
13!
333
        config.no_color = true;
13✔
334
        if (args_copy.executor_threads > 0)
13!
335
            config.executor_threads = args_copy.executor_threads;
×
336
        if (!args_copy.baseline_index_dir.empty())
13!
337
            config.baseline_index_dir = args_copy.baseline_index_dir;
×
338
        if (!args_copy.variant_index_dir.empty())
13!
339
            config.variant_index_dir = args_copy.variant_index_dir;
×
340
        if (args_copy.force_rebuild)
13!
341
            config.force_rebuild = args_copy.force_rebuild;
×
342
        if (args_copy.threshold > 0.0)
13!
343
            config.defaults.threshold_pct = args_copy.threshold;
×
344
        if (args_copy.time_interval_ms > 0.0)
13!
345
            config.defaults.time_interval_ms = args_copy.time_interval_ms;
13✔
346

347
        config.resolve();
13!
348

349
        if (config.executor_threads == 0) {
13!
350
            config.executor_threads = dftracer_utils_hardware_concurrency();
13✔
351
        }
352
        if (config.checkpoint_size == 0) {
13!
353
            config.checkpoint_size =
13✔
354
                indexer::internal::Indexer::DEFAULT_CHECKPOINT_SIZE;
355
        }
356

357
        using composites::dft::indexing::IndexResolverUtility;
358
        using composites::dft::indexing::ResolverInput;
359
        using indexer::IndexBatchBuilderUtility;
360
        using indexer::IndexBuildBatchConfig;
361

362
        Runtime *rt = get_runtime(self);
13!
363

364
        auto *error_msg_ptr = &error_msg;
13✔
365
        auto task = [config, output_ptr, error_msg_ptr,
13!
366
                     rt]() -> CoroTask<void> {
367
            auto resolve_and_build =
368
                [&config](
13!
369
                    CoroScope &scope, const std::string &path,
370
                    const std::string &index_dir,
371
                    std::vector<std::string> &out_files) -> CoroTask<void> {
372
                IndexResolverUtility resolver;
373
                ResolverInput resolve_input;
374
                resolve_input.index_dir = index_dir;
375
                resolve_input.require_checkpoints = !config.force_rebuild;
376
                if (fs::is_regular_file(path)) {
377
                    resolve_input.files = {path};
378
                } else {
379
                    resolve_input.directory = path;
380
                }
381

382
                auto result = co_await resolver.process(resolve_input);
383
                out_files = std::move(result.all_files);
384

385
                if (out_files.empty() || result.needs_checkpoint.empty()) {
386
                    co_return;
387
                }
388

389
                auto batch_cfg = std::make_shared<IndexBuildBatchConfig>();
390
                batch_cfg->file_paths.reserve(result.needs_checkpoint.size());
391
                for (const auto &item : result.needs_checkpoint) {
392
                    batch_cfg->file_paths.push_back(item.file_path);
393
                }
394
                batch_cfg->index_dir = index_dir;
395
                batch_cfg->checkpoint_size = config.checkpoint_size;
396
                batch_cfg->parallelism = config.executor_threads;
397
                batch_cfg->force_rebuild = config.force_rebuild;
398
                batch_cfg->use_batch_write = true;
399
                batch_cfg->rebuild_root_summaries = true;
400

401
                co_await IndexBatchBuilderUtility::process(
402
                    &scope, std::move(batch_cfg));
403
            };
26!
404

405
            std::vector<std::string> baseline_files;
406
            std::vector<std::string> variant_files;
407

408
            bool shared_index =
409
                composites::dft::internal::determine_index_path(
410
                    config.baseline, config.baseline_index_dir) ==
411
                composites::dft::internal::determine_index_path(
412
                    config.variant, config.variant_index_dir);
413

414
            co_await run_coro_scope(
415
                rt->executor(), [&](CoroScope &scope) -> CoroTask<void> {
13!
416
                    if (shared_index) {
417
                        co_await resolve_and_build(scope, config.baseline,
418
                                                   config.baseline_index_dir,
419
                                                   baseline_files);
420
                        if (config.baseline == config.variant) {
421
                            variant_files = baseline_files;
422
                        } else {
423
                            co_await resolve_and_build(scope, config.variant,
424
                                                       config.variant_index_dir,
425
                                                       variant_files);
426
                        }
427
                    } else {
428
                        scope.spawn([&](CoroScope &s) -> CoroTask<void> {
×
429
                            co_await resolve_and_build(
430
                                s, config.baseline, config.baseline_index_dir,
431
                                baseline_files);
432
                        });
×
433
                        co_await resolve_and_build(scope, config.variant,
434
                                                   config.variant_index_dir,
435
                                                   variant_files);
436
                    }
437
                });
26!
438

439
            if (baseline_files.empty()) {
440
                *error_msg_ptr =
441
                    "No trace files found in baseline: " + config.baseline;
442
                co_return;
443
            }
444
            if (variant_files.empty()) {
445
                *error_msg_ptr =
446
                    "No trace files found in variant: " + config.variant;
447
                co_return;
448
            }
449

450
            output_ptr->baseline_path = config.baseline;
451
            output_ptr->variant_path = config.variant;
452
            output_ptr->baseline_file_count = baseline_files.size();
453
            output_ptr->variant_file_count = variant_files.size();
454

455
            auto start_time = std::chrono::high_resolution_clock::now();
456

457
            for (auto &node : config.nodes) {
458
                std::vector<const ComparisonNode *> visitors;
459
                flatten_nodes(node, visitors);
460

461
                std::vector<ComparisonVisitorPair> pairs;
462
                pairs.reserve(visitors.size());
463

464
                for (const auto *visitor : visitors) {
465
                    std::optional<common::query::Query> query;
466
                    if (!visitor->composed_query.empty()) {
467
                        auto result = common::query::Query::from_string(
468
                            visitor->composed_query);
469
                        if (!result) {
470
                            *error_msg_ptr = "Invalid query for node '" +
471
                                             visitor->name +
472
                                             "': " + result.error().format();
473
                            co_return;
474
                        }
475
                        query = std::move(*result);
476
                    }
477

478
                    AggregationConfig agg_cfg;
479
                    agg_cfg.time_interval_us = static_cast<std::uint64_t>(
480
                        config.defaults.time_interval_ms * 1000.0);
481
                    agg_cfg.extra_group_keys = {};
482
                    agg_cfg.compute_statistics = true;
483
                    agg_cfg.compute_percentiles = true;
484
                    agg_cfg.percentiles = visitor->resolved_percentiles;
485
                    agg_cfg.sketch_accuracy = 0.01;
486
                    agg_cfg.track_process_parents = false;
487

488
                    auto [base_result, var_result] = co_await coro::when_all(
489
                        run_aggregation(
490
                            baseline_files, agg_cfg, query,
491
                            config.baseline_index_dir, config.checkpoint_size,
492
                            config.force_rebuild, config.executor_threads),
493
                        run_aggregation(
494
                            variant_files, agg_cfg, query,
495
                            config.variant_index_dir, config.checkpoint_size,
496
                            config.force_rebuild, config.executor_threads));
497

498
                    if (pairs.empty()) {
499
                        output_ptr->baseline_meta = extract_metadata(
500
                            base_result.aggregations, baseline_files.size());
501
                        output_ptr->variant_meta = extract_metadata(
502
                            var_result.aggregations, variant_files.size());
503
                    }
504

505
                    ComparisonVisitorPair pair;
506
                    pair.baseline = std::move(base_result);
507
                    pair.variant = std::move(var_result);
508
                    pair.node = *visitor;
509
                    pairs.push_back(std::move(pair));
510
                }
511

512
                ComparisonUtilityInput cmp_input;
513
                cmp_input.visitors = std::move(pairs);
514
                cmp_input.root_node = node;
515
                cmp_input.baseline_file_count = baseline_files.size();
516
                cmp_input.variant_file_count = variant_files.size();
517

518
                ComparisonUtility cmp;
519
                auto cmp_output = co_await cmp.process(cmp_input);
520
                output_ptr->nodes.push_back(std::move(cmp_output.result));
521
            }
522

523
            // Inject metadata rows into root SUMMARY.
524
            auto meta_rows = build_metadata_metrics(output_ptr->baseline_meta,
525
                                                    output_ptr->variant_meta);
526
            for (auto &node : output_ptr->nodes) {
527
                node.summary.metrics.insert(node.summary.metrics.begin(),
528
                                            meta_rows.begin(), meta_rows.end());
529
            }
530

531
            auto end_time = std::chrono::high_resolution_clock::now();
532
            std::chrono::duration<double, std::milli> duration =
533
                end_time - start_time;
534
            output_ptr->execution_time_ms = duration.count();
535
        };
39!
536

537
        rt->submit(task(), "comparator").get();
13!
538
    } catch (const std::exception &e) {
13!
539
        error_msg = e.what();
×
UNCOV
540
    }
×
541
done:
13✔
542
    Py_END_ALLOW_THREADS
13!
543

544
        return error_msg.empty()
13✔
545
        ? 0
13!
546
        : -1;
13✔
547
}
13✔
548

549
// -----------------------------------------------------------------------
550
// compare() -- returns ArrowTable
551
// -----------------------------------------------------------------------
552

553
static PyObject *Comparator_compare(ComparatorObject *self, PyObject *args,
6✔
554
                                    PyObject *kwds) {
555
    ComparatorArgs cargs;
6✔
556
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
6!
557

558
    ComparisonOutput output;
6✔
559
    std::string error_msg;
6✔
560
    if (run_comparison_pipeline(self, cargs, output, error_msg) < 0) {
6!
561
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
562
        return NULL;
×
563
    }
564

565
#ifdef DFTRACER_UTILS_ENABLE_ARROW
566
    auto arrow_result = output.to_arrow();
6!
567
    if (!arrow_result.valid()) {
6!
568
        PyErr_SetString(PyExc_RuntimeError,
×
569
                        "Failed to convert comparison output "
570
                        "to Arrow");
571
        return NULL;
×
572
    }
573
    return arrow_result_to_table(std::move(arrow_result));
6!
574
#else
575
    PyErr_SetString(PyExc_RuntimeError,
576
                    "dftracer-utils was built without Arrow support");
577
    return NULL;
578
#endif
579
}
6✔
580

581
// -----------------------------------------------------------------------
582
// compare_json() -- returns JSON string
583
// -----------------------------------------------------------------------
584

585
static PyObject *Comparator_compare_json(ComparatorObject *self, PyObject *args,
4✔
586
                                         PyObject *kwds) {
587
    ComparatorArgs cargs;
4✔
588
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
4!
589

590
    ComparisonOutput output;
4✔
591
    std::string error_msg;
4✔
592
    if (run_comparison_pipeline(self, cargs, output, error_msg) < 0) {
4!
593
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
594
        return NULL;
×
595
    }
596

597
    TreeTableFormatter formatter;
4!
598
    std::string json = formatter.render_json(output);
4!
599
    return PyUnicode_FromStringAndSize(json.data(), (Py_ssize_t)json.size());
4!
600
}
4✔
601

602
// -----------------------------------------------------------------------
603
// compare_table() -- returns formatted table string
604
// -----------------------------------------------------------------------
605

606
static PyObject *Comparator_compare_table(ComparatorObject *self,
3✔
607
                                          PyObject *args, PyObject *kwds) {
608
    ComparatorArgs cargs;
3✔
609
    if (parse_comparator_args(args, kwds, cargs) < 0) return NULL;
3!
610

611
    ComparisonOutput output;
3✔
612
    std::string error_msg;
3✔
613
    if (run_comparison_pipeline(self, cargs, output, error_msg) < 0) {
3!
614
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
615
        return NULL;
×
616
    }
617

618
    FormatterOptions fmt_opts;
3✔
619
    fmt_opts.use_color = false;
3✔
620
    fmt_opts.use_unicode = false;
3✔
621
    TreeTableFormatter formatter(fmt_opts);
3!
622

623
    // Render to a temporary file and read back as string
624
    char *buf = NULL;
3✔
625
    std::size_t buf_size = 0;
3✔
626
    FILE *memstream = open_memstream(&buf, &buf_size);
3✔
627
    if (!memstream) {
3!
628
        PyErr_SetString(PyExc_RuntimeError, "Failed to create memory stream");
×
629
        return NULL;
×
630
    }
631

632
    formatter.render(memstream, output);
3!
633
    fflush(memstream);
3!
634
    fclose(memstream);
3!
635

636
    PyObject *result = PyUnicode_FromStringAndSize(buf, (Py_ssize_t)buf_size);
3!
637
    free(buf);
3✔
638
    return result;
3✔
639
}
3✔
640

641
// -----------------------------------------------------------------------
642
// __call__ delegates to compare()
643
// -----------------------------------------------------------------------
644

645
static PyObject *Comparator_call(PyObject *self, PyObject *args,
1✔
646
                                 PyObject *kwds) {
647
    return Comparator_compare((ComparatorObject *)self, args, kwds);
1✔
648
}
649

650
// -----------------------------------------------------------------------
651
// Method table
652
// -----------------------------------------------------------------------
653

654
static const char *COMPARE_DOC =
655
    "compare(baseline, variant, query='', group_by='',\n"
656
    "        format='table', time_interval_ms=5000.0,\n"
657
    "        threshold=0.0, executor_threads=0,\n"
658
    "        index_dir='', force_rebuild=False, config='')\n"
659
    "--\n"
660
    "\n"
661
    "Run comparison pipeline, return materialized ArrowTable.\n"
662
    "\n"
663
    "Args:\n"
664
    "    baseline (str): Baseline trace file or directory.\n"
665
    "    variant (str): Variant trace file or directory.\n"
666
    "    query (str): Query filter (default: all events).\n"
667
    "    group_by (str): Comma-separated group keys.\n"
668
    "    format (str): Output format (default 'table').\n"
669
    "    time_interval_ms (float): Time bucket in ms "
670
    "(default 5000).\n"
671
    "    threshold (float): Hide changes below this pct.\n"
672
    "    executor_threads (int): Parallel threads (0=auto).\n"
673
    "    index_dir (str): Directory for .dftindex stores.\n"
674
    "    force_rebuild (bool): Force index rebuild.\n"
675
    "    config (str): JSON config file path.\n"
676
    "\n"
677
    "Returns:\n"
678
    "    ArrowTable: Comparison results.\n";
679

680
static const char *COMPARE_JSON_DOC =
681
    "compare_json(baseline, variant, query='', group_by='',\n"
682
    "             format='table', time_interval_ms=5000.0,\n"
683
    "             threshold=0.0, executor_threads=0,\n"
684
    "             index_dir='', force_rebuild=False, "
685
    "config='')\n"
686
    "--\n"
687
    "\n"
688
    "Run comparison pipeline, return JSON string.\n"
689
    "\n"
690
    "Args:\n"
691
    "    baseline (str): Baseline trace file or directory.\n"
692
    "    variant (str): Variant trace file or directory.\n"
693
    "    query (str): Query filter (default: all events).\n"
694
    "    group_by (str): Comma-separated group keys.\n"
695
    "    format (str): Output format (default 'table').\n"
696
    "    time_interval_ms (float): Time bucket in ms "
697
    "(default 5000).\n"
698
    "    threshold (float): Hide changes below this pct.\n"
699
    "    executor_threads (int): Parallel threads (0=auto).\n"
700
    "    index_dir (str): Directory for .dftindex stores.\n"
701
    "    force_rebuild (bool): Force index rebuild.\n"
702
    "    config (str): JSON config file path.\n"
703
    "\n"
704
    "Returns:\n"
705
    "    str: JSON representation of comparison results.\n";
706

707
static const char *COMPARE_TABLE_DOC =
708
    "compare_table(baseline, variant, query='', group_by='',\n"
709
    "              format='table', time_interval_ms=5000.0,\n"
710
    "              threshold=0.0, executor_threads=0,\n"
711
    "              index_dir='', force_rebuild=False, "
712
    "config='')\n"
713
    "--\n"
714
    "\n"
715
    "Run comparison pipeline, return formatted table string.\n"
716
    "\n"
717
    "Args:\n"
718
    "    baseline (str): Baseline trace file or directory.\n"
719
    "    variant (str): Variant trace file or directory.\n"
720
    "    query (str): Query filter (default: all events).\n"
721
    "    group_by (str): Comma-separated group keys.\n"
722
    "    format (str): Output format (default 'table').\n"
723
    "    time_interval_ms (float): Time bucket in ms "
724
    "(default 5000).\n"
725
    "    threshold (float): Hide changes below this pct.\n"
726
    "    executor_threads (int): Parallel threads (0=auto).\n"
727
    "    index_dir (str): Directory for .dftindex stores.\n"
728
    "    force_rebuild (bool): Force index rebuild.\n"
729
    "    config (str): JSON config file path.\n"
730
    "\n"
731
    "Returns:\n"
732
    "    str: Formatted ASCII table of comparison results.\n";
733

734
static PyMethodDef Comparator_methods[] = {
735
    {"compare", (PyCFunction)Comparator_compare, METH_VARARGS | METH_KEYWORDS,
736
     COMPARE_DOC},
737
    {"compare_json", (PyCFunction)Comparator_compare_json,
738
     METH_VARARGS | METH_KEYWORDS, COMPARE_JSON_DOC},
739
    {"compare_table", (PyCFunction)Comparator_compare_table,
740
     METH_VARARGS | METH_KEYWORDS, COMPARE_TABLE_DOC},
741
    {NULL}};
742

743
PyTypeObject ComparatorType = {
744
    PyVarObject_HEAD_INIT(
745
        NULL, 0) "dftracer_utils_ext.ComparatorUtility", /* tp_name */
746
    sizeof(ComparatorObject),                            /* tp_basicsize */
747
    0,                                                   /* tp_itemsize */
748
    (destructor)Comparator_dealloc,                      /* tp_dealloc */
749
    0,                                        /* tp_vectorcall_offset */
750
    0,                                        /* tp_getattr */
751
    0,                                        /* tp_setattr */
752
    0,                                        /* tp_as_async */
753
    0,                                        /* tp_repr */
754
    0,                                        /* tp_as_number */
755
    0,                                        /* tp_as_sequence */
756
    0,                                        /* tp_as_mapping */
757
    0,                                        /* tp_hash */
758
    Comparator_call,                          /* tp_call */
759
    0,                                        /* tp_str */
760
    0,                                        /* tp_getattro */
761
    0,                                        /* tp_setattro */
762
    0,                                        /* tp_as_buffer */
763
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
764
    "ComparatorUtility(runtime: Runtime | None = None)\n"
765
    "--\n\n"
766
    "Compare DFTracer trace metrics between baseline and "
767
    "variant.\n\n"
768
    "Args:\n"
769
    "    runtime (Runtime or None): Runtime for thread pool "
770
    "control.\n"
771
    "        If None, uses the default global Runtime.\n\n"
772
    "compare(baseline, variant, ...) -> ArrowTable\n"
773
    "    Run comparison and return a materialized Arrow "
774
    "table.\n\n"
775
    "compare_json(baseline, variant, ...) -> str\n"
776
    "    Run comparison and return JSON string.\n\n"
777
    "compare_table(baseline, variant, ...) -> str\n"
778
    "    Run comparison and return formatted table "
779
    "string.\n",               /* tp_doc */
780
    0,                         /* tp_traverse */
781
    0,                         /* tp_clear */
782
    0,                         /* tp_richcompare */
783
    0,                         /* tp_weaklistoffset */
784
    0,                         /* tp_iter */
785
    0,                         /* tp_iternext */
786
    Comparator_methods,        /* tp_methods */
787
    0,                         /* tp_members */
788
    0,                         /* tp_getset */
789
    0,                         /* tp_base */
790
    0,                         /* tp_dict */
791
    0,                         /* tp_descr_get */
792
    0,                         /* tp_descr_set */
793
    0,                         /* tp_dictoffset */
794
    (initproc)Comparator_init, /* tp_init */
795
    0,                         /* tp_alloc */
796
    Comparator_new,            /* tp_new */
797
};
798

799
int init_comparator(PyObject *m) {
1✔
800
    if (PyType_Ready(&ComparatorType) < 0) return -1;
1!
801

802
    Py_INCREF(&ComparatorType);
803
    if (PyModule_AddObject(m, "ComparatorUtility",
1✔
804
                           (PyObject *)&ComparatorType) < 0) {
1!
805
        Py_DECREF(&ComparatorType);
806
        Py_DECREF(m);
807
        return -1;
×
808
    }
809

810
    return 0;
1✔
811
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc