• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26043728131

18 May 2026 03:37PM UTC coverage: 51.706% (-0.4%) from 52.076%
26043728131

push

github

hariharan-devarajan
feat(perf): performance improvements for parallel reading, indexing, and aggregation

Indexer
- Streaming parse-and-emit worker pipeline with bounded memory usage
- Concurrent SST artifact ingestion with staging support
- Gzip member slicing for parallel indexing
- Lazy decoding for compressed value counts
- Bypass DOM wrapper for indexer hot path (simdjson on_demand)
- Decoupled write workers from parse workers
- --rebuild-summaries flag and optimized root summary rebuild

Aggregator / MPI
- Task-based DAG execution for aggregator pipeline
- Shared staging for multi-node artifact relocation
- Per-node thread scaling to avoid oversubscription
- Unified distributed aggregation tracking, removed manifest consolidation
- Deterministic aggregation and intra-file parallelism

Trace reader / query
- Compiled predicate evaluation for AND-of-EQ queries
- Uniform-match shortcut for AND-of-EQ queries
- Line-range support for work items and checkpoint processing
- Optimized chunk pruning and checkpoint handling

Replay
- Pipelined replay with coroutines and channels
- JsonParser-based trace processing
- Optimized string handling and i/o buffering

Organize / writer / dft
- Parallel slice creation and merging in organize visitor
- Inline indexer in organize
- Gzip member tracking in writer
- Coroutine-based event dispatcher with extracted parse logic
- Batch flushing in organize visitor

Arrow / call_tree
- Optimized arrow conversion
- Arrow IPC support and improved save/load in call_tree

Build / infrastructure
- zlib-ng option, system simdjson fallback
- cgroup v1/v2 memory limit detection
- Auto-computed per-file memory estimates and batch sizes
- CI: perf branch trigger, formatting

Docs
- Rewritten indexer and trace reader API references

35907 of 90345 branches covered (39.74%)

Branch coverage included in aggregate %.

16869 of 21880 new or added lines in 137 files covered. (77.1%)

273 existing lines in 39 files now uncovered.

32021 of 41028 relevant lines covered (78.05%)

13164.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.9
/src/dftracer/utils/python/streaming_iterator.h
1
#ifndef DFTRACER_UTILS_PYTHON_STREAMING_ITERATOR_H
2
#define DFTRACER_UTILS_PYTHON_STREAMING_ITERATOR_H
3

4
#include <Python.h>
5
#include <dftracer/utils/core/common/config.h>
6

7
#include <atomic>
8
#include <condition_variable>
9
#include <cstddef>
10
#include <exception>
11
#include <future>
12
#include <mutex>
13
#include <optional>
14
#include <queue>
15
#include <utility>
16

17
#ifdef DFTRACER_UTILS_ENABLE_ARROW
18
#include <dftracer/utils/utilities/common/arrow/arrow_export.h>
19
#endif
20

21
namespace dftracer::utils::python {
22

23
/// Generic streaming state for bridging C++ async producers to Python sync
24
/// consumers.
25
///
26
/// Producer (C++ coroutine on Runtime executor):
27
///   - Calls push() to enqueue items
28
///   - Calls complete() when done
29
///   - Calls fail() on error
30
///
31
/// Consumer (Python tp_iternext):
32
///   - Calls pull() which blocks (with GIL released) until item available
33
///   - Returns std::nullopt on completion or error
34
template <typename ItemT>
35
class StreamingState {
36
   public:
37
    explicit StreamingState(std::size_t memory_budget_bytes)
12✔
38
        : memory_budget_bytes_(memory_budget_bytes) {}
12✔
39

40
    bool push(ItemT item, std::size_t item_bytes) {
10✔
41
        std::unique_lock<std::mutex> lock(mtx_);
10!
42
        cv_producer_.wait(lock, [this] {
15!
43
            return bytes_in_queue_.load(std::memory_order_acquire) <
15✔
44
                       memory_budget_bytes_ ||
15!
45
                   cancelled_.load(std::memory_order_acquire);
5✔
46
        });
47
        if (cancelled_.load(std::memory_order_acquire)) {
10✔
NEW
48
            return false;
×
49
        }
50
        bytes_in_queue_.fetch_add(item_bytes, std::memory_order_acq_rel);
10✔
51
        queue_.push({std::move(item), item_bytes});
10!
52
        lock.unlock();
10!
53
        cv_consumer_.notify_one();
10✔
54
        return true;
10✔
55
    }
10✔
56

57
    void complete() {
6✔
58
        {
59
            std::lock_guard<std::mutex> lock(mtx_);
6!
60
            done_.store(true, std::memory_order_release);
6✔
61
        }
6✔
62
        cv_consumer_.notify_all();
6✔
63
    }
6✔
64

NEW
65
    void fail(std::exception_ptr ex) {
×
66
        {
NEW
67
            std::lock_guard<std::mutex> lock(mtx_);
×
NEW
68
            error_ = std::move(ex);
×
NEW
69
            done_.store(true, std::memory_order_release);
×
NEW
70
        }
×
NEW
71
        cv_consumer_.notify_all();
×
NEW
72
    }
×
73

74
    void cancel() {
6✔
75
        cancelled_.store(true, std::memory_order_release);
6✔
76
        cv_producer_.notify_all();
6✔
77
        cv_consumer_.notify_all();
6✔
78
    }
6✔
79

80
    std::optional<ItemT> pull() {
16✔
81
        std::unique_lock<std::mutex> lock(mtx_);
16!
82
        cv_consumer_.wait(lock, [this] {
32!
83
            return !queue_.empty() ||
54✔
84
                   cancelled_.load(std::memory_order_acquire) ||
38✔
85
                   done_.load(std::memory_order_acquire);
38✔
86
        });
87

88
        if (cancelled_.load(std::memory_order_acquire) && queue_.empty()) {
16!
NEW
89
            return std::nullopt;
×
90
        }
91

92
        if (queue_.empty()) {
16✔
93
            return std::nullopt;
6✔
94
        }
95

96
        auto [item, size] = std::move(queue_.front());
15!
97
        queue_.pop();
10!
98
        bytes_in_queue_.fetch_sub(size, std::memory_order_acq_rel);
15✔
99
        lock.unlock();
10!
100
        cv_producer_.notify_one();
10✔
101
        return std::move(item);
10!
102
    }
16✔
103

104
    std::exception_ptr error() const { return error_; }
6✔
105

106
    bool cancelled() const {
16✔
107
        return cancelled_.load(std::memory_order_acquire);
16✔
108
    }
109

110
    bool done() const { return done_.load(std::memory_order_acquire); }
111

112
    void set_task_future(std::shared_future<void> future) {
113
        task_future_ = std::move(future);
114
    }
115

116
   private:
117
    struct QueueEntry {
118
        ItemT item;
119
        std::size_t size;
120
    };
121
    std::queue<QueueEntry> queue_;
122
    std::mutex mtx_;
123
    std::condition_variable cv_producer_;
124
    std::condition_variable cv_consumer_;
125
    std::exception_ptr error_;
126
    std::atomic<bool> cancelled_{false};
3✔
127
    std::atomic<bool> done_{false};
3✔
128
    std::size_t memory_budget_bytes_;
129
    std::atomic<std::size_t> bytes_in_queue_{0};
3✔
130
    std::shared_future<void> task_future_;
131
};
132

133
#ifdef DFTRACER_UTILS_ENABLE_ARROW
134

135
using utilities::common::arrow::ArrowExportResult;
136

137
/// Internal C++ state for ArrowStreamingIterator.
138
/// Stored as a pointer to avoid C++ object layout issues with Python.
139
struct ArrowStreamingIteratorState {
140
    std::shared_ptr<void> state;
141
    std::function<std::optional<ArrowExportResult>()> pull_next;
142
    std::function<std::exception_ptr()> get_error;
143
    std::function<void()> cancel;
144
};
145

146
/// Type-erased Arrow streaming iterator for Python.
147
///
148
/// This allows different producer types (AggregationBatch, ArrowExportResult,
149
/// etc.) to share the same Python iterator mechanics.
150
struct ArrowStreamingIteratorObject {
151
    PyObject_HEAD
152

153
        /// Pointer to C++ state (owned, allocated with new).
154
        ArrowStreamingIteratorState* cpp_state;
155
};
156

157
extern PyTypeObject ArrowStreamingIteratorType;
158

159
/// Initialize the ArrowStreamingIteratorType.
160
int init_arrow_streaming_iterator(PyObject* m);
161

162
#endif  // DFTRACER_UTILS_ENABLE_ARROW
163

164
}  // namespace dftracer::utils::python
165

166
#endif  // DFTRACER_UTILS_PYTHON_STREAMING_ITERATOR_H
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc