• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 27052412546

06 Jun 2026 04:20AM UTC coverage: 50.862% (+1.0%) from 49.905%
27052412546

Pull #73

github

web-flow
Merge 734572730 into 88a3c8457
Pull Request #73: add portable dependencies wheel support

31801 of 79859 branches covered (39.82%)

Branch coverage included in aggregate %.

32491 of 46545 relevant lines covered (69.81%)

9947.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.07
/src/dftracer/utils/python/streaming_iterator.h
1
#ifndef DFTRACER_UTILS_PYTHON_STREAMING_ITERATOR_H
2
#define DFTRACER_UTILS_PYTHON_STREAMING_ITERATOR_H
3

4
#include <Python.h>
5
#include <dftracer/utils/core/common/config.h>
6

7
#include <atomic>
8
#include <condition_variable>
9
#include <cstddef>
10
#include <exception>
11
#include <future>
12
#include <mutex>
13
#include <optional>
14
#include <queue>
15
#include <utility>
16

17
#ifdef DFTRACER_UTILS_ENABLE_ARROW
18
#include <dftracer/utils/utilities/common/arrow/arrow_export.h>
19
#endif
20

21
namespace dftracer::utils::python {
22

23
/// Generic streaming state for bridging C++ async producers to Python sync
24
/// consumers.
25
///
26
/// Producer (C++ coroutine on Runtime executor):
27
///   - Calls push() to enqueue items
28
///   - Calls complete() when done
29
///   - Calls fail() on error
30
///
31
/// Consumer (Python tp_iternext):
32
///   - Calls pull() which blocks (with GIL released) until item available
33
///   - Returns std::nullopt on completion or error
34
template <typename ItemT>
35
class StreamingState {
36
   public:
37
    explicit StreamingState(std::size_t memory_budget_bytes)
9✔
38
        : memory_budget_bytes_(memory_budget_bytes) {}
9✔
39

40
    bool push(ItemT item, std::size_t item_bytes) {
5✔
41
        std::unique_lock<std::mutex> lock(mtx_);
5✔
42
        cv_producer_.wait(lock, [this] {
10!
43
            return bytes_in_queue_.load(std::memory_order_acquire) <
10✔
44
                       memory_budget_bytes_ ||
10!
45
                   cancelled_.load(std::memory_order_acquire);
×
46
        });
47
        if (cancelled_.load(std::memory_order_acquire)) {
5!
48
            return false;
×
49
        }
50
        bytes_in_queue_.fetch_add(item_bytes, std::memory_order_acq_rel);
5✔
51
        queue_.push({std::move(item), item_bytes});
5!
52
        lock.unlock();
5!
53
        cv_consumer_.notify_one();
5✔
54
        return true;
5✔
55
    }
5✔
56

57
    void complete() {
3✔
58
        {
59
            std::lock_guard<std::mutex> lock(mtx_);
3✔
60
            done_.store(true, std::memory_order_release);
3✔
61
        }
3✔
62
        cv_consumer_.notify_all();
3✔
63
    }
3✔
64

65
    void fail(std::exception_ptr ex) {
×
66
        {
67
            std::lock_guard<std::mutex> lock(mtx_);
×
68
            error_ = std::move(ex);
×
69
            done_.store(true, std::memory_order_release);
×
70
        }
×
71
        cv_consumer_.notify_all();
×
72
    }
×
73

74
    void cancel() {
3✔
75
        cancelled_.store(true, std::memory_order_release);
3✔
76
        cv_producer_.notify_all();
3✔
77
        cv_consumer_.notify_all();
3✔
78
    }
3✔
79

80
    std::optional<ItemT> pull() {
8✔
81
        std::unique_lock<std::mutex> lock(mtx_);
8✔
82
        cv_consumer_.wait(lock, [this] {
24!
83
            return !queue_.empty() ||
27✔
84
                   cancelled_.load(std::memory_order_acquire) ||
11!
85
                   done_.load(std::memory_order_acquire);
11✔
86
        });
87

88
        if (cancelled_.load(std::memory_order_acquire) && queue_.empty()) {
8!
89
            return std::nullopt;
×
90
        }
91

92
        if (queue_.empty()) {
8!
93
            return std::nullopt;
3✔
94
        }
95

96
        auto [item, size] = std::move(queue_.front());
10!
97
        queue_.pop();
5!
98
        bytes_in_queue_.fetch_sub(size, std::memory_order_acq_rel);
10✔
99
        lock.unlock();
5!
100
        cv_producer_.notify_one();
5✔
101
        return std::move(item);
5!
102
    }
8✔
103

104
    std::exception_ptr error() const { return error_; }
3✔
105

106
    bool cancelled() const {
8✔
107
        return cancelled_.load(std::memory_order_acquire);
8✔
108
    }
109

110
    bool done() const { return done_.load(std::memory_order_acquire); }
111

112
    void set_task_future(std::shared_future<void> future) {
113
        task_future_ = std::move(future);
114
    }
115

116
   private:
117
    struct QueueEntry {
118
        ItemT item;
119
        std::size_t size;
120
    };
121
    std::queue<QueueEntry> queue_;
122
    std::mutex mtx_;
123
    std::condition_variable cv_producer_;
124
    std::condition_variable cv_consumer_;
125
    std::exception_ptr error_;
126
    std::atomic<bool> cancelled_{false};
3✔
127
    std::atomic<bool> done_{false};
3✔
128
    std::size_t memory_budget_bytes_;
129
    std::atomic<std::size_t> bytes_in_queue_{0};
3✔
130
    std::shared_future<void> task_future_;
131
};
132

133
#ifdef DFTRACER_UTILS_ENABLE_ARROW
134

135
using utilities::common::arrow::ArrowExportResult;
136

137
/// Internal C++ state for ArrowStreamingIterator.
138
/// Stored as a pointer to avoid C++ object layout issues with Python.
139
struct ArrowStreamingIteratorState {
140
    std::shared_ptr<void> state;
141
    std::function<std::optional<ArrowExportResult>()> pull_next;
142
    std::function<std::exception_ptr()> get_error;
143
    std::function<void()> cancel;
144
};
145

146
/// Type-erased Arrow streaming iterator for Python.
147
///
148
/// This allows different producer types (AggregationBatch, ArrowExportResult,
149
/// etc.) to share the same Python iterator mechanics.
150
struct ArrowStreamingIteratorObject {
151
    PyObject_HEAD
152

153
        /// Pointer to C++ state (owned, allocated with new).
154
        ArrowStreamingIteratorState* cpp_state;
155
};
156

157
extern PyTypeObject ArrowStreamingIteratorType;
158

159
/// Initialize the ArrowStreamingIteratorType.
160
int init_arrow_streaming_iterator(PyObject* m);
161

162
#endif  // DFTRACER_UTILS_ENABLE_ARROW
163

164
}  // namespace dftracer::utils::python
165

166
#endif  // DFTRACER_UTILS_PYTHON_STREAMING_ITERATOR_H
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc