• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28693295402

04 Jul 2026 03:17AM UTC coverage: 52.408% (+0.1%) from 52.278%
28693295402

push

github

hariharan-devarajan
feat: silence noisy warnings on aarch64

37318 of 92666 branches covered (40.27%)

Branch coverage included in aggregate %.

33462 of 42389 relevant lines covered (78.94%)

20557.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.1
/src/dftracer/utils/python/trace_reader.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <Python.h>
3
#include <dftracer/utils/core/common/config.h>
4
#include <dftracer/utils/core/common/constants.h>
5
#include <dftracer/utils/core/common/filesystem.h>
6
#include <dftracer/utils/core/common/memory_budget.h>
7
#include <dftracer/utils/core/coro/channel.h>
8
#include <dftracer/utils/core/coro/task.h>
9
#include <dftracer/utils/core/coro/when_all.h>
10
#include <dftracer/utils/core/tasks/coro_scope.h>
11
#include <dftracer/utils/core/utils/string.h>
12
#include <dftracer/utils/python/arrow_helpers.h>
13
#include <dftracer/utils/python/batch_byte_size.h>
14
#include <dftracer/utils/python/json.h>
15
#include <dftracer/utils/python/py_dict_helpers.h>
16
#include <dftracer/utils/python/py_errors.h>
17
#include <dftracer/utils/python/py_list_helpers.h>
18
#include <dftracer/utils/python/py_runtime_mixin.h>
19
#include <dftracer/utils/python/py_type_helpers.h>
20
#include <dftracer/utils/python/runtime.h>
21
#include <dftracer/utils/python/trace_reader.h>
22
#include <dftracer/utils/python/trace_reader_iterator.h>
23
#include <dftracer/utils/utilities/common/query/query.h>
24
#include <dftracer/utils/utilities/composites/dft/indexing/chunk_pruner_utility.h>
25
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
26
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
27
#include <dftracer/utils/utilities/indexer/index_database.h>
28
#include <dftracer/utils/utilities/indexer/internal/helpers.h>
29
#include <dftracer/utils/utilities/reader/internal/arrow_row_builder.h>
30
#include <dftracer/utils/utilities/reader/internal/chunk_geometry.h>
31
#include <dftracer/utils/utilities/reader/internal/json_dict_builder.h>
32
#include <dftracer/utils/utilities/reader/trace_reader.h>
33

34
#include <algorithm>
35
#include <cctype>
36
#include <cstddef>
37
#include <cstdio>
38
#include <cstring>
39
#include <exception>
40
#include <memory>
41
#include <string>
42
#include <unordered_map>
43
#include <vector>
44
#ifdef DFTRACER_UTILS_ENABLE_ARROW
45
#include <dftracer/utils/python/arrow_stream_capsule.h>
46
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
47
#include <dftracer/utils/utilities/common/json/parser.h>
48
#endif
49
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
50
#include <dftracer/utils/utilities/common/arrow/ipc_writer.h>
51
#include <dftracer/utils/utilities/common/arrow/partition_writer.h>
52
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
53
#include <dftracer/utils/utilities/composites/dft/metadata_collector_utility.h>
54
#include <dftracer/utils/utilities/composites/dft/views/view_builder_utility.h>
55
#include <dftracer/utils/utilities/composites/dft/views/view_definition.h>
56
#include <dftracer/utils/utilities/composites/dft/views/view_reader_utility.h>
57
#endif
58

59
namespace {
60

61
using dftracer::utils::CoroScope;
62
using dftracer::utils::Runtime;
63
using dftracer::utils::coro::CoroTask;
64
using dftracer::utils::coro::when_all;
65
using dftracer::utils::utilities::filesystem::PatternDirectoryScannerUtility;
66
using dftracer::utils::utilities::filesystem::
67
    PatternDirectoryScannerUtilityInput;
68
using dftracer::utils::utilities::reader::ReadConfig;
69
using dftracer::utils::utilities::reader::TraceReader;
70
using dftracer::utils::utilities::reader::TraceReaderConfig;
71
#ifdef DFTRACER_UTILS_ENABLE_ARROW
72
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
73
#endif
74
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
75
using dftracer::utils::utilities::common::arrow::IpcCompression;
76
using dftracer::utils::utilities::common::arrow::PartitionWriter;
77
using dftracer::utils::utilities::common::arrow::PartitionWriteStats;
78
using dftracer::utils::utilities::composites::dft::MetadataCollectorUtility;
79
using dftracer::utils::utilities::composites::dft::
80
    MetadataCollectorUtilityInput;
81
using dftracer::utils::utilities::composites::dft::views::ViewBuilderInput;
82
using dftracer::utils::utilities::composites::dft::views::ViewBuilderUtility;
83
using dftracer::utils::utilities::composites::dft::views::ViewDefinition;
84
using dftracer::utils::utilities::composites::dft::views::ViewReaderInput;
85
using dftracer::utils::utilities::composites::dft::views::ViewReaderUtility;
86
#endif
87

88
using dftracer::utils::python::MemoryViewBatchData;
89
using dftracer::utils::python::MemoryViewBatchIteratorState;
90

91
CoroTask<void> produce_lines_batched(
4,536!
92
    std::shared_ptr<MemoryViewBatchIteratorState> state,
93
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer,
94
    TraceReaderConfig cfg, ReadConfig rc, std::size_t batch_size) {
56!
95
    auto guard = producer.guard();
56!
96
    try {
97
        TraceReader reader(std::move(cfg));
56!
98
        auto gen = reader.read_lines(rc);
56!
99
        MemoryViewBatchData batch;
56✔
100
        std::size_t count = 0;
56✔
101

102
        while (auto opt = co_await gen.next()) {
4,062!
103
            if (state->cancelled.load(std::memory_order_acquire)) break;
946!
104
            auto sv = opt->content;
946✔
105
            Py_ssize_t offset = static_cast<Py_ssize_t>(batch.buffer.size());
946✔
106
            batch.buffer.insert(batch.buffer.end(), sv.begin(), sv.end());
946!
107
            batch.offsets.push_back(offset);
946!
108
            batch.lengths.push_back(static_cast<Py_ssize_t>(sv.size()));
946!
109
            ++count;
946✔
110

111
            if (count >= batch_size) {
946!
112
                auto batch_bytes = dftracer::utils::python::byte_size(batch);
×
113
                state->bytes_in_queue.fetch_add(batch_bytes,
114
                                                std::memory_order_acq_rel);
115
                if (!co_await producer.send(std::move(batch))) break;
×
116
                batch = MemoryViewBatchData{};
117
                count = 0;
118
            }
×
119
        }
1,002!
120
        if (count > 0 && !state->cancelled.load(std::memory_order_acquire)) {
153✔
121
            auto batch_bytes = dftracer::utils::python::byte_size(batch);
156!
122
            state->bytes_in_queue.fetch_add(batch_bytes,
156✔
123
                                            std::memory_order_acq_rel);
124
            co_await producer.send(std::move(batch));
208!
125
        }
52!
126
    } catch (...) {
2,358✔
127
        state->set_error(std::current_exception());
1!
128
    }
1!
129
}
6,481!
130

131
CoroTask<void> produce_raw_batched(
100!
132
    std::shared_ptr<MemoryViewBatchIteratorState> state,
133
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer,
134
    TraceReaderConfig cfg, ReadConfig rc) {
10!
135
    auto guard = producer.guard();
10!
136
    try {
137
        TraceReader reader(std::move(cfg));
10!
138
        auto gen = reader.read_raw(rc);
10!
139
        while (auto opt = co_await gen.next()) {
670!
140
            if (state->cancelled.load(std::memory_order_acquire)) break;
466!
141
            MemoryViewBatchData batch;
466✔
142
            batch.buffer.assign(opt->data(), opt->data() + opt->size());
466!
143
            batch.offsets.push_back(0);
466!
144
            batch.lengths.push_back(static_cast<Py_ssize_t>(opt->size()));
466!
145
            auto batch_bytes = dftracer::utils::python::byte_size(batch);
466✔
146
            state->bytes_in_queue.fetch_add(batch_bytes,
466✔
147
                                            std::memory_order_acq_rel);
148
            if (!co_await producer.send(std::move(batch))) break;
621!
149
        }
475✔
150
    } catch (...) {
30✔
151
        state->set_error(std::current_exception());
×
152
    }
×
153
}
1,330!
154

155
using dftracer::utils::utilities::reader::internal::parse_json_to_event;
156

157
static constexpr std::size_t ESTIMATED_BYTES_PER_LINE = 256;
158
static constexpr std::size_t ESTIMATED_BYTES_PER_RAW_CHUNK = 4 * 1024 * 1024;
159
static constexpr std::size_t ESTIMATED_BYTES_PER_JSON_EVENT = 512;
160
static constexpr std::size_t ESTIMATED_BYTES_PER_ARROW_ROW = 1024;
161

162
CoroTask<void> produce_json_dicts(
134!
163
    std::shared_ptr<JsonDictIteratorState> state,
164
    dftracer::utils::coro::ChannelProducer<JsonDictBatch> producer,
165
    TraceReaderConfig cfg, ReadConfig rc, std::size_t batch_size) {
1!
166
    auto guard = producer.guard();
1!
167
    try {
168
        TraceReader reader(std::move(cfg));
1!
169
        auto gen = reader.read_json(rc);
1!
170
        JsonDictBatch batch;
1✔
171
        batch.events.reserve(batch_size);
1!
172

173
        while (auto opt = co_await gen.next()) {
125!
174
            if (state->cancelled.load(std::memory_order_acquire)) break;
30!
175

176
            JsonDictEvent ev;
30!
177
            parse_json_to_event(*opt->parser, ev);
30!
178
            batch.events.push_back(std::move(ev));
30!
179

180
            if (batch.events.size() >= batch_size) {
30!
181
                auto batch_bytes = dftracer::utils::python::byte_size(batch);
×
182
                state->bytes_in_queue.fetch_add(batch_bytes,
183
                                                std::memory_order_acq_rel);
184
                if (!co_await producer.send(std::move(batch))) break;
×
185
                batch = JsonDictBatch{};
186
                batch.events.reserve(batch_size);
×
187
            }
×
188
        }
31!
189
        if (!batch.events.empty() &&
3✔
190
            !state->cancelled.load(std::memory_order_acquire)) {
1✔
191
            auto batch_bytes = dftracer::utils::python::byte_size(batch);
3!
192
            state->bytes_in_queue.fetch_add(batch_bytes,
3✔
193
                                            std::memory_order_acq_rel);
194
            co_await producer.send(std::move(batch));
4!
195
        }
1!
196
    } catch (...) {
69✔
197
        state->set_error(std::current_exception());
×
198
    }
×
199
}
195!
200

201
static CoroTask<void> send_files_to_channel(
144!
202
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
203
    const std::vector<std::string> *files, std::atomic<bool> *cancelled) {
8!
204
    for (const auto &fp : *files) {
128✔
205
        if (cancelled->load(std::memory_order_acquire)) break;
72!
206
        if (!co_await file_chan->send(fp)) break;
104!
207
    }
24✔
208
    file_chan->close();
8!
209
    co_return;
8✔
210
}
112!
211

212
static CoroTask<void> json_dict_file_worker(
974!
213
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
214
    dftracer::utils::coro::Channel<JsonDictBatch> *out_chan,
215
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
216
    ReadConfig rc, std::size_t batch_size, std::atomic<bool> *cancelled) {
9!
217
    dftracer::utils::coro::ChannelProducer<JsonDictBatch> producer(out_chan);
9!
218
    auto guard = producer.guard();
9!
219

220
    while (auto file_path = co_await file_chan->receive()) {
56!
221
        if (cancelled->load(std::memory_order_acquire)) co_return;
14!
222
        TraceReaderConfig cfg;
14✔
223
        cfg.file_path = std::move(*file_path);
14✔
224
        cfg.index_dir = index_dir;
14!
225
        cfg.checkpoint_size = checkpoint_size;
14✔
226
        cfg.auto_build_index = auto_build_index;
14✔
227

228
        TraceReader reader(std::move(cfg));
14!
229
        auto gen = reader.read_json(rc);
14!
230
        JsonDictBatch batch;
14✔
231
        batch.events.reserve(batch_size);
14!
232

233
        while (auto opt = co_await gen.next()) {
896!
234
            if (cancelled->load(std::memory_order_acquire)) co_return;
210!
235
            JsonDictEvent ev;
210!
236
            parse_json_to_event(*opt->parser, ev);
210!
237
            batch.events.push_back(std::move(ev));
210!
238
            if (batch.events.size() >= batch_size) {
210!
239
                if (!co_await producer.send(std::move(batch))) co_return;
×
240
                batch = JsonDictBatch{};
241
                batch.events.reserve(batch_size);
×
242
            }
243
        }
224!
244
        if (!batch.events.empty()) {
42!
245
            if (!co_await producer.send(std::move(batch))) co_return;
56!
246
        }
14✔
247
    }
499✔
248
    co_return;
9✔
249
}
1,471!
250

251
static CoroTask<void> spawn_json_dict_producers(
30!
252
    CoroScope &child, dftracer::utils::coro::Channel<JsonDictBatch> *out_chan,
253
    const std::vector<std::string> *files, const std::string *index_dir,
254
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
255
    std::size_t batch_size, std::atomic<bool> *cancelled_ptr,
256
    std::size_t max_workers) {
5!
257
    std::size_t num_workers = std::min(files->size(), max_workers);
5!
258
    auto file_chan =
5✔
259
        dftracer::utils::coro::make_channel<std::string>(num_workers);
5!
260

261
    for (std::size_t i = 0; i < num_workers; ++i) {
14✔
262
        child.spawn([out_chan, fc = file_chan, idx = *index_dir,
27!
263
                     checkpoint_size, auto_build_index, r = *rc, batch_size,
27!
264
                     cancelled_ptr](CoroScope &) {
36✔
265
            return json_dict_file_worker(fc, out_chan, idx, checkpoint_size,
27!
266
                                         auto_build_index, r, batch_size,
18!
267
                                         cancelled_ptr);
18!
268
        });
269
    }
9✔
270

271
    child.spawn([fc = file_chan, files, cancelled_ptr](CoroScope &) {
15!
272
        return send_files_to_channel(fc, files, cancelled_ptr);
10!
273
    });
274
    co_return;
10✔
275
}
15!
276

277
static CoroTask<void> produce_json_dicts_parallel(
68!
278
    CoroScope &scope, JsonDictIteratorState *sp, std::string dir_path,
279
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
280
    ReadConfig rc, std::size_t batch_size, std::size_t max_workers) {
6!
281
    try {
282
        PatternDirectoryScannerUtility scanner;
18!
283
        auto scan_input = PatternDirectoryScannerUtilityInput(
36!
284
            dir_path, {".pfw", ".pfw.gz"}, true, false);
18!
285
        auto entries = co_await scope.spawn(scanner, scan_input);
30!
286

287
        std::vector<std::string> files;
16✔
288
        files.reserve(entries.size());
16✔
289
        for (auto &e : entries) files.push_back(e.path.string());
20!
290
        std::sort(files.begin(), files.end());
6✔
291

292
        if (files.empty()) {
16✔
293
            sp->channel->close();
1!
294
            co_return;
1✔
295
        }
296

297
        auto *chan_ptr = sp->channel.get();
15✔
298
        auto *cancelled_ptr = &sp->cancelled;
15✔
299

300
        co_await scope.scope([chan_ptr, &files, &index_dir, checkpoint_size,
120!
301
                              auto_build_index, &rc, batch_size, cancelled_ptr,
45✔
302
                              max_workers](CoroScope &child) -> CoroTask<void> {
20!
303
            co_await spawn_json_dict_producers(
40!
304
                child, chan_ptr, &files, &index_dir, checkpoint_size,
15✔
305
                auto_build_index, &rc, batch_size, cancelled_ptr, max_workers);
15✔
306
        });
20!
307
    } catch (...) {
16✔
308
        sp->set_error(std::current_exception());
×
309
    }
×
310
}
78!
311

312
static CoroTask<void> lines_file_worker(
548!
313
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
314
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
315
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
316
    ReadConfig rc, std::size_t batch_size, std::atomic<bool> *cancelled) {
4!
317
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer(
8!
318
        out_chan);
4✔
319
    auto guard = producer.guard();
4!
320

321
    while (auto file_path = co_await file_chan->receive()) {
24!
322
        if (cancelled->load(std::memory_order_acquire)) co_return;
7!
323
        TraceReaderConfig cfg;
7✔
324
        cfg.file_path = std::move(*file_path);
7✔
325
        cfg.index_dir = index_dir;
7!
326
        cfg.checkpoint_size = checkpoint_size;
7✔
327
        cfg.auto_build_index = auto_build_index;
7✔
328

329
        TraceReader reader(std::move(cfg));
7!
330
        auto gen = reader.read_lines(rc);
7!
331
        MemoryViewBatchData batch;
7✔
332
        std::size_t count = 0;
7✔
333

334
        while (auto opt = co_await gen.next()) {
508!
335
            if (cancelled->load(std::memory_order_acquire)) co_return;
120!
336
            auto sv = opt->content;
120✔
337
            Py_ssize_t offset = static_cast<Py_ssize_t>(batch.buffer.size());
120✔
338
            batch.buffer.insert(batch.buffer.end(), sv.begin(), sv.end());
120!
339
            batch.offsets.push_back(offset);
120!
340
            batch.lengths.push_back(static_cast<Py_ssize_t>(sv.size()));
120!
341
            ++count;
120✔
342
            if (count >= batch_size) {
120!
343
                if (!co_await producer.send(std::move(batch))) co_return;
×
344
                batch = MemoryViewBatchData{};
345
                count = 0;
346
            }
347
        }
127!
348
        if (count > 0) {
21!
349
            if (!co_await producer.send(std::move(batch))) co_return;
28!
350
        }
7✔
351
    }
279✔
352
    co_return;
4✔
353
}
822!
354

355
static CoroTask<void> spawn_lines_producers(
12!
356
    CoroScope &child,
357
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
358
    const std::vector<std::string> *files, const std::string *index_dir,
359
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
360
    std::size_t batch_size, std::atomic<bool> *cancelled_ptr,
361
    std::size_t max_workers) {
2!
362
    std::size_t num_workers = std::min(files->size(), max_workers);
2!
363
    auto file_chan =
2✔
364
        dftracer::utils::coro::make_channel<std::string>(num_workers);
2!
365

366
    for (std::size_t i = 0; i < num_workers; ++i) {
6✔
367
        child.spawn([out_chan, fc = file_chan, idx = *index_dir,
12!
368
                     checkpoint_size, auto_build_index, r = *rc, batch_size,
12!
369
                     cancelled_ptr](CoroScope &) {
16✔
370
            return lines_file_worker(fc, out_chan, idx, checkpoint_size,
12!
371
                                     auto_build_index, r, batch_size,
8!
372
                                     cancelled_ptr);
8!
373
        });
374
    }
4✔
375

376
    child.spawn([fc = file_chan, files, cancelled_ptr](CoroScope &) {
6!
377
        return send_files_to_channel(fc, files, cancelled_ptr);
4!
378
    });
379
    co_return;
4✔
380
}
6!
381

382
static CoroTask<void> produce_lines_parallel(
32!
383
    CoroScope &scope, MemoryViewBatchIteratorState *sp, std::string dir_path,
384
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
385
    ReadConfig rc, std::size_t batch_size, std::size_t max_workers) {
3!
386
    try {
387
        PatternDirectoryScannerUtility scanner;
9!
388
        auto scan_input = PatternDirectoryScannerUtilityInput(
18!
389
            dir_path, {".pfw", ".pfw.gz"}, true, false);
9!
390
        auto entries = co_await scope.spawn(scanner, scan_input);
15!
391

392
        std::vector<std::string> files;
7✔
393
        files.reserve(entries.size());
7✔
394
        for (auto &e : entries) files.push_back(e.path.string());
10!
395
        std::sort(files.begin(), files.end());
3✔
396

397
        if (files.empty()) {
7✔
398
            sp->channel->close();
1!
399
            co_return;
1✔
400
        }
401

402
        auto *chan_ptr = sp->channel.get();
6✔
403
        auto *cancelled_ptr = &sp->cancelled;
6✔
404

405
        co_await scope.scope([chan_ptr, &files, &index_dir, checkpoint_size,
48!
406
                              auto_build_index, &rc, batch_size, cancelled_ptr,
18✔
407
                              max_workers](CoroScope &child) -> CoroTask<void> {
8!
408
            co_await spawn_lines_producers(
16!
409
                child, chan_ptr, &files, &index_dir, checkpoint_size,
6✔
410
                auto_build_index, &rc, batch_size, cancelled_ptr, max_workers);
6✔
411
        });
8!
412
    } catch (...) {
7✔
413
        sp->set_error(std::current_exception());
×
414
    }
×
415
}
36!
416

417
static CoroTask<void> raw_file_worker(
16!
418
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
419
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
420
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
421
    ReadConfig rc, std::atomic<bool> *cancelled) {
2!
422
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer(
4!
423
        out_chan);
2✔
424
    auto guard = producer.guard();
2!
425

426
    while (auto file_path = co_await file_chan->receive()) {
13!
427
        if (cancelled->load(std::memory_order_acquire)) co_return;
3!
428
        TraceReaderConfig cfg;
3✔
429
        cfg.file_path = std::move(*file_path);
3✔
430
        cfg.index_dir = index_dir;
3!
431
        cfg.checkpoint_size = checkpoint_size;
3✔
432
        cfg.auto_build_index = auto_build_index;
3✔
433

434
        TraceReader reader(std::move(cfg));
3!
435
        auto gen = reader.read_raw(rc);
3!
436
        while (auto opt = co_await gen.next()) {
252!
437
            if (cancelled->load(std::memory_order_acquire)) co_return;
180!
438
            MemoryViewBatchData batch;
180✔
439
            batch.buffer.assign(opt->data(), opt->data() + opt->size());
180!
440
            batch.offsets.push_back(0);
180!
441
            batch.lengths.push_back(static_cast<Py_ssize_t>(opt->size()));
180!
442
            if (!co_await producer.send(std::move(batch))) co_return;
240!
443
        }
183!
444
    }
11✔
445
    co_return;
2✔
446
}
508!
447

448
static CoroTask<void> spawn_raw_producers(
6!
449
    CoroScope &child,
450
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
451
    const std::vector<std::string> *files, const std::string *index_dir,
452
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
453
    std::atomic<bool> *cancelled_ptr, std::size_t max_workers) {
1!
454
    std::size_t num_workers = std::min(files->size(), max_workers);
1!
455
    auto file_chan =
1✔
456
        dftracer::utils::coro::make_channel<std::string>(num_workers);
1!
457

458
    for (std::size_t i = 0; i < num_workers; ++i) {
3✔
459
        child.spawn([out_chan, fc = file_chan, idx = *index_dir,
6!
460
                     checkpoint_size, auto_build_index, r = *rc,
6!
461
                     cancelled_ptr](CoroScope &) {
8✔
462
            return raw_file_worker(fc, out_chan, idx, checkpoint_size,
6!
463
                                   auto_build_index, r, cancelled_ptr);
4!
464
        });
465
    }
2✔
466

467
    child.spawn([fc = file_chan, files, cancelled_ptr](CoroScope &) {
3!
468
        return send_files_to_channel(fc, files, cancelled_ptr);
2!
469
    });
470
    co_return;
2✔
471
}
3!
472

473
static CoroTask<void> produce_raw_parallel(
20!
474
    CoroScope &scope, MemoryViewBatchIteratorState *sp, std::string dir_path,
475
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
476
    ReadConfig rc, std::size_t max_workers) {
2!
477
    try {
478
        PatternDirectoryScannerUtility scanner;
6!
479
        auto scan_input = PatternDirectoryScannerUtilityInput(
12!
480
            dir_path, {".pfw", ".pfw.gz"}, true, false);
6!
481
        auto entries = co_await scope.spawn(scanner, scan_input);
10!
482

483
        std::vector<std::string> files;
4✔
484
        files.reserve(entries.size());
4✔
485
        for (auto &e : entries) files.push_back(e.path.string());
5!
486
        std::sort(files.begin(), files.end());
2✔
487

488
        if (files.empty()) {
4✔
489
            sp->channel->close();
1!
490
            co_return;
1✔
491
        }
492

493
        auto *chan_ptr = sp->channel.get();
3✔
494
        auto *cancelled_ptr = &sp->cancelled;
3✔
495

496
        co_await scope.scope([chan_ptr, &files, &index_dir, checkpoint_size,
21!
497
                              auto_build_index, &rc, cancelled_ptr,
6✔
498
                              max_workers](CoroScope &child) -> CoroTask<void> {
4!
499
            co_await spawn_raw_producers(child, chan_ptr, &files, &index_dir,
8!
500
                                         checkpoint_size, auto_build_index, &rc,
3✔
501
                                         cancelled_ptr, max_workers);
3✔
502
        });
4!
503
    } catch (...) {
4✔
504
        sp->set_error(std::current_exception());
×
505
    }
×
506
}
22!
507

508
#ifdef DFTRACER_UTILS_ENABLE_ARROW
509

510
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
511
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
512

513
using dftracer::utils::StringArena;
514
using dftracer::utils::utilities::reader::internal::build_arrow_row;
515

516
static CoroTask<void> produce_arrow_for_file(
×
517
    dftracer::utils::coro::Channel<ArrowExportResult> *chan,
518
    std::string file_path, std::string index_dir, std::size_t checkpoint_size,
519
    bool auto_build_index, ReadConfig rc, std::size_t batch_size,
520
    bool normalize, std::atomic<bool> *cancelled) {
521
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer(chan);
522
    auto guard = producer.guard();
523

524
    TraceReaderConfig cfg;
525
    cfg.file_path = std::move(file_path);
526
    cfg.index_dir = std::move(index_dir);
527
    cfg.checkpoint_size = checkpoint_size;
528
    cfg.auto_build_index = auto_build_index;
529

530
    TraceReader reader(std::move(cfg));
531

532
    // Fast path: non-normalized Arrow build happens inside TraceReader.
533
    // Normalize still goes through read_json + build_arrow_row for the
534
    // richer schema derivation.
535
    if (!normalize) {
536
        auto batch_gen = reader.read_arrow(rc, batch_size);
537
        while (auto batch_opt = co_await batch_gen.next()) {
538
            if (cancelled->load(std::memory_order_acquire)) co_return;
539
            if (!co_await producer.send(std::move(*batch_opt))) co_return;
540
        }
541
        co_return;
542
    }
543

544
    auto gen = reader.read_json(rc);
545
    RecordBatchBuilder builder;
546
    builder.reserve(batch_size);
547
    StringArena arena;
548

549
    while (auto opt = co_await gen.next()) {
550
        if (cancelled->load(std::memory_order_acquire)) co_return;
551
        if (!build_arrow_row(builder, *opt->parser, arena, normalize)) continue;
552
        if (builder.num_rows() >= batch_size) {
553
            auto result = builder.finish();
554
            arena.clear();
555
            if (!co_await producer.send(std::move(result))) co_return;
556
            if (!builder.is_schema_locked()) builder.lock_schema();
557
            builder.reset(true);
558
            builder.reserve(batch_size);
559
        }
560
    }
561
    if (builder.num_rows() > 0) {
562
        co_await producer.send(builder.finish());
563
    }
564
    co_return;
565
}
×
566

567
static CoroTask<void> file_worker(
×
568
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
569
    dftracer::utils::coro::Channel<ArrowExportResult> *out_chan,
570
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
571
    ReadConfig rc, std::size_t batch_size, bool normalize,
572
    std::atomic<bool> *cancelled) {
573
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer(
574
        out_chan);
575
    auto guard = producer.guard();
576

577
    while (auto file_path = co_await file_chan->receive()) {
578
        if (cancelled->load(std::memory_order_acquire)) co_return;
579
        TraceReaderConfig cfg;
580
        cfg.file_path = std::move(*file_path);
581
        cfg.index_dir = index_dir;
582
        cfg.checkpoint_size = checkpoint_size;
583
        cfg.auto_build_index = auto_build_index;
584

585
        TraceReader reader(std::move(cfg));
586

587
        if (!normalize) {
588
            auto batch_gen = reader.read_arrow(rc, batch_size);
589
            while (auto batch_opt = co_await batch_gen.next()) {
590
                if (cancelled->load(std::memory_order_acquire)) co_return;
591
                if (!co_await producer.send(std::move(*batch_opt))) co_return;
592
            }
593
            continue;
594
        }
595

596
        auto gen = reader.read_json(rc);
597
        RecordBatchBuilder builder;
598
        builder.reserve(batch_size);
599
        StringArena arena;
600

601
        while (auto opt = co_await gen.next()) {
602
            if (cancelled->load(std::memory_order_acquire)) co_return;
603
            if (!build_arrow_row(builder, *opt->parser, arena, normalize))
604
                continue;
605
            if (builder.num_rows() >= batch_size) {
606
                auto result = builder.finish();
607
                arena.clear();
608
                if (!co_await producer.send(std::move(result))) co_return;
609
                if (!builder.is_schema_locked()) builder.lock_schema();
610
                builder.reset(true);
611
                builder.reserve(batch_size);
612
            }
613
        }
614
        if (builder.num_rows() > 0) {
615
            if (!co_await producer.send(builder.finish())) co_return;
616
        }
617
    }
618
    co_return;
619
}
×
620

621
using dftracer::utils::utilities::reader::internal::ArrowWorkItem;
622
using dftracer::utils::utilities::reader::internal::enumerate_work_items;
623

624
static CoroTask<void> send_work_items_to_channel(
464!
625
    std::shared_ptr<dftracer::utils::coro::Channel<ArrowWorkItem>> chan,
626
    const std::vector<ArrowWorkItem> *items, std::atomic<bool> *cancelled) {
36!
627
    for (const auto &it : *items) {
346✔
628
        if (cancelled->load(std::memory_order_acquire)) break;
186!
629
        if (!co_await chan->send(it)) break;
284!
630
    }
62✔
631
    chan->close();
36!
632
    co_return;
36✔
633
}
320!
634

635
static CoroTask<void> checkpoint_worker(
311!
636
    std::shared_ptr<dftracer::utils::coro::Channel<ArrowWorkItem>> work_chan,
637
    dftracer::utils::coro::Channel<ArrowExportResult> *out_chan,
638
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
639
    ReadConfig rc, std::size_t batch_size, bool normalize,
640
    std::atomic<bool> *cancelled) {
51✔
641
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer(
102!
642
        out_chan);
51✔
643
    auto guard = producer.guard();
51!
644

645
    // Cache readers keyed by file path so we don't re-probe the same file
646
    // when successive work items land on it.
647
    std::unordered_map<std::string, std::shared_ptr<TraceReader>> readers;
51✔
648

649
    while (auto item = co_await work_chan->receive()) {
342!
650
        if (cancelled->load(std::memory_order_acquire)) co_return;
182!
651

652
        auto &reader_ptr = readers[item->file_path];
182!
653
        if (!reader_ptr) {
182✔
654
            TraceReaderConfig cfg;
62✔
655
            cfg.file_path = item->file_path;
62!
656
            cfg.index_dir = index_dir;
62!
657
            cfg.checkpoint_size = checkpoint_size;
62✔
658
            cfg.auto_build_index = auto_build_index;
62✔
659
            reader_ptr = std::make_shared<TraceReader>(std::move(cfg));
62!
660
        }
62✔
661

662
        ReadConfig local_rc = rc;
182!
663
        if (item->start_line > 0 || item->end_line > 0) {
182✔
664
            // Line-range work items: the read drives off LINE_RANGE; the
665
            // gzip stream resolves it back to byte offsets via checkpoints.
666
            local_rc.start_line = item->start_line;
125✔
667
            local_rc.end_line = item->end_line;
125✔
668
            local_rc.start_byte = 0;
125✔
669
            local_rc.end_byte = 0;
125✔
670
            local_rc.start_at_checkpoint = false;
125✔
671
            local_rc.end_at_checkpoint = false;
125✔
672
        } else {
125✔
673
            local_rc.start_byte = item->start_byte;
57✔
674
            local_rc.end_byte = item->end_byte;
57✔
675
            local_rc.start_at_checkpoint = item->start_at_checkpoint;
57✔
676
            local_rc.end_at_checkpoint = item->end_at_checkpoint;
57✔
677
        }
678
        // Pruning already happened at enumeration time; avoid the per-
679
        // work-item RocksDB opens that would otherwise dwarf the actual
680
        // read cost at directory scale (256 files * N ranges).
681
        local_rc.skip_pruning = true;
182✔
682
        // chunks pre-classified as uniform-matching skip per-event eval.
683
        if (item->chunk_prune_only) local_rc.chunk_prune_only = true;
182!
684

685
        if (!normalize) {
182!
686
            auto batch_gen = reader_ptr->read_arrow(local_rc, batch_size);
182!
687
            while (auto batch_opt = co_await batch_gen.next()) {
567!
688
                if (cancelled->load(std::memory_order_acquire)) co_return;
243!
689
                if (!co_await producer.send(std::move(*batch_opt))) co_return;
324!
690
            }
305!
691
            continue;
62✔
692
        }
62✔
693

694
        auto gen = reader_ptr->read_json(local_rc);
×
695
        RecordBatchBuilder builder;
×
696
        builder.reserve(batch_size);
×
697
        StringArena arena;
×
698

699
        while (auto opt = co_await gen.next()) {
×
700
            if (cancelled->load(std::memory_order_acquire)) co_return;
×
701
            if (!build_arrow_row(builder, *opt->parser, arena, normalize))
×
702
                continue;
703
            if (builder.num_rows() >= batch_size) {
×
704
                auto result = builder.finish();
×
705
                arena.clear();
×
706
                if (!co_await producer.send(std::move(result))) co_return;
×
707
                if (!builder.is_schema_locked()) builder.lock_schema();
×
708
                builder.reset(true);
×
709
                builder.reserve(batch_size);
×
710
            }
×
711
        }
×
712
        if (builder.num_rows() > 0) {
×
713
            if (!co_await producer.send(builder.finish())) co_return;
×
714
        }
715
    }
234!
716
    co_return;
52✔
717
}
1,284!
718

719
static CoroTask<void> spawn_arrow_producers(
216!
720
    CoroScope &child,
721
    dftracer::utils::coro::Channel<ArrowExportResult> *out_chan,
722
    const std::vector<ArrowWorkItem> *work_items, const std::string *index_dir,
723
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
724
    std::size_t batch_size, bool normalize, std::atomic<bool> *cancelled_ptr,
725
    std::size_t max_workers) {
36!
726
    std::size_t num_workers = std::min(work_items->size(), max_workers);
36!
727
    if (num_workers == 0) num_workers = 1;
36!
728
    auto work_chan =
36✔
729
        dftracer::utils::coro::make_channel<ArrowWorkItem>(num_workers);
36!
730

731
    for (std::size_t i = 0; i < num_workers; ++i) {
88✔
732
        child.spawn([out_chan, wc = work_chan, idx = *index_dir,
156!
733
                     checkpoint_size, auto_build_index, r = *rc, batch_size,
156!
734
                     normalize, cancelled_ptr](CoroScope &) {
258✔
735
            return checkpoint_worker(wc, out_chan, idx, checkpoint_size,
154!
736
                                     auto_build_index, r, batch_size, normalize,
104!
737
                                     cancelled_ptr);
104!
738
        });
739
    }
52✔
740

741
    child.spawn([wc = work_chan, work_items, cancelled_ptr](CoroScope &) {
108!
742
        return send_work_items_to_channel(wc, work_items, cancelled_ptr);
72!
743
    });
744
    co_return;
72✔
745
}
108!
746

747
static CoroTask<void> produce_arrow_batches_for_files(
294!
748
    CoroScope &scope, ArrowIteratorState *sp, std::vector<std::string> files,
749
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
750
    ReadConfig rc, std::size_t batch_size, bool normalize,
751
    std::size_t max_workers) {
37!
752
    try {
753
        if (files.empty()) {
109✔
754
            sp->channel->close();
1!
755
            co_return;
38✔
756
        }
757

758
        auto work_items = enumerate_work_items(
216!
759
            files, index_dir, rc.query, max_workers, rc.start_byte, rc.end_byte,
108✔
760
            rc.start_line, rc.end_line);
108✔
761
        if (work_items.empty()) {
108!
762
            sp->channel->close();
×
763
            co_return;
764
        }
765

766
        auto *chan_ptr = sp->channel.get();
108✔
767
        auto *cancelled_ptr = &sp->cancelled;
108✔
768

769
        co_await scope.scope([chan_ptr, &work_items, &index_dir,
1,080!
770
                              checkpoint_size, auto_build_index, &rc,
216✔
771
                              batch_size, normalize, cancelled_ptr,
324✔
772
                              max_workers](CoroScope &child) -> CoroTask<void> {
144!
773
            co_await spawn_arrow_producers(
288!
774
                child, chan_ptr, &work_items, &index_dir, checkpoint_size,
108✔
775
                auto_build_index, &rc, batch_size, normalize, cancelled_ptr,
108✔
776
                max_workers);
108✔
777
        });
144!
778
    } catch (...) {
36!
779
        sp->set_error(std::current_exception());
×
780
    }
×
781
}
146!
782

783
static CoroTask<void> produce_arrow_batches_parallel(
180!
784
    CoroScope &scope, ArrowIteratorState *sp, std::string dir_path,
785
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
786
    ReadConfig rc, std::size_t batch_size, bool normalize,
787
    std::size_t max_workers) {
15!
788
    try {
789
        PatternDirectoryScannerUtility scanner;
45!
790
        auto scan_input = PatternDirectoryScannerUtilityInput(
90!
791
            dir_path, {".pfw", ".pfw.gz"}, true, false);
45!
792
        auto entries = co_await scope.spawn(scanner, scan_input);
75!
793

794
        std::vector<std::string> files;
45✔
795
        files.reserve(entries.size());
45✔
796
        for (auto &e : entries) files.push_back(e.path.string());
54!
797
        std::sort(files.begin(), files.end());
15✔
798

799
        co_await produce_arrow_batches_for_files(
105!
800
            scope, sp, std::move(files), std::move(index_dir), checkpoint_size,
45✔
801
            auto_build_index, std::move(rc), batch_size, normalize,
45✔
802
            max_workers);
45✔
803
    } catch (...) {
45✔
804
        sp->set_error(std::current_exception());
×
805
    }
×
806
}
210!
807

808
CoroTask<void> produce_arrow_batches(
222!
809
    std::shared_ptr<ArrowIteratorState> state,
810
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer,
811
    TraceReaderConfig cfg, ReadConfig rc, std::size_t batch_size,
812
    bool flatten_objects = false, bool normalize = false) {
18!
813
    (void)flatten_objects;
814

815
    auto guard = producer.guard();
132!
816
    try {
817
        TraceReader reader(std::move(cfg));
132!
818

819
        if (!normalize) {
132✔
820
            auto batch_gen = reader.read_arrow(rc, batch_size);
51!
821
            while (auto batch_opt = co_await batch_gen.next()) {
214!
822
                if (state->cancelled.load(std::memory_order_acquire)) break;
97✔
823
                auto result_bytes =
96✔
824
                    dftracer::utils::python::byte_size(*batch_opt);
96!
825
                state->bytes_in_queue.fetch_add(result_bytes,
96✔
826
                                                std::memory_order_acq_rel);
827
                if (!co_await producer.send(std::move(*batch_opt))) break;
128!
828
            }
113!
829
            co_return;
17✔
830
        }
17✔
831

832
        auto gen = reader.read_json(rc);
81!
833
        RecordBatchBuilder builder;
81!
834
        builder.reserve(batch_size);
81!
835

836
        StringArena arena;
81!
837

838
        while (auto opt = co_await gen.next()) {
164!
839
            if (state->cancelled.load(std::memory_order_acquire)) break;
40!
840
            if (!build_arrow_row(builder, *opt->parser, arena, normalize))
40!
841
                continue;
842

843
            if (builder.num_rows() >= batch_size) {
40!
844
                auto result = builder.finish();
×
845
                arena.clear();
×
846
                auto result_bytes = dftracer::utils::python::byte_size(result);
×
847
                state->bytes_in_queue.fetch_add(result_bytes,
848
                                                std::memory_order_acq_rel);
849
                if (!co_await producer.send(std::move(result))) break;
×
850
                if (!builder.is_schema_locked()) {
×
851
                    builder.lock_schema();
852
                }
853
                builder.reset(true);
×
854
                builder.reserve(batch_size);
×
855
            }
×
856
        }
41!
857

858
        if (builder.num_rows() > 0 &&
3✔
859
            !state->cancelled.load(std::memory_order_acquire)) {
1✔
860
            auto result = builder.finish();
3!
861
            auto result_bytes = dftracer::utils::python::byte_size(result);
3!
862
            state->bytes_in_queue.fetch_add(result_bytes,
3✔
863
                                            std::memory_order_acq_rel);
864
            co_await producer.send(std::move(result));
4!
865
        }
1!
866
    } catch (...) {
22✔
867
        state->set_error(std::current_exception());
×
868
    }
×
869
}
546!
870

871
#endif  // DFTRACER_UTILS_ENABLE_ARROW
872

873
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
874

875
struct WriteArrowStats {
39✔
876
    std::unordered_map<std::string, PartitionWriteStats> partitions;
877
    int64_t total_rows = 0;
39✔
878
    int64_t total_uncompressed_bytes = 0;
39✔
879
};
880

881
struct WriteArrowResult {
52✔
882
    WriteArrowStats stats;
883
    std::string error;
884
    std::uint64_t chunks_scanned = 0;
39✔
885
    std::uint64_t chunks_skipped = 0;
39✔
886
};
887

888
CoroTask<WriteArrowResult> write_arrow_pipeline(
190!
889
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
890
    std::vector<ViewDefinition> views, std::string output_path,
891
    int64_t chunk_size_bytes, IpcCompression compression,
892
    std::size_t event_batch_size) {
13!
893
    namespace dft_internal =
894
        dftracer::utils::utilities::composites::dft::internal;
895
    WriteArrowResult result;
13✔
896

897
    try {
898
        if (views.empty()) {
13✔
899
            views.push_back(ViewDefinition().with_name("all"));
8!
900
        }
8✔
901

902
        std::string resolved_index =
13✔
903
            index_path.empty()
26!
904
                ? dft_internal::determine_index_path(file_path, "")
13!
905
                : index_path;
×
906

907
        auto meta_input = MetadataCollectorUtilityInput::from_file(file_path)
26!
908
                              .with_checkpoint_size(checkpoint_size)
13!
909
                              .with_index(resolved_index);
13!
910
        auto metadata = co_await MetadataCollectorUtility{}.process(meta_input);
39!
911
        if (!metadata.success) {
13!
912
            result.error =
913
                "Failed to collect metadata: " + metadata.error_message;
×
914
            co_return result;
×
915
        }
916

917
        for (const auto &view : views) {
99✔
918
            std::string view_output = output_path;
30!
919
            if (views.size() > 1 || view.name != "all") {
30✔
920
                view_output = output_path + "/" + view.name;
42!
921
            }
6✔
922

923
            PartitionWriter writer;
42!
924
            int rc_open = co_await writer.open(view_output, chunk_size_bytes,
98!
925
                                               compression);
42✔
926
            if (rc_open != 0) {
42!
927
                result.error =
928
                    "Failed to open partition writer for view: " + view.name;
×
929
                co_return result;
×
930
            }
931

932
            ViewBuilderInput builder_input;
42✔
933
            builder_input.with_view(view)
42✔
934
                .with_file_path(file_path)
14!
935
                .with_index_path(resolved_index)
14!
936
                .with_uncompressed_size(metadata.uncompressed_size)
14!
937
                .with_num_checkpoints(metadata.num_checkpoints);
14✔
938

939
            auto build_output =
42✔
940
                co_await ViewBuilderUtility{}.process(builder_input);
56!
941
            if (!build_output) {
58!
942
                result.error = "ViewBuilder failed for view: " + view.name;
×
943
                co_return result;
×
944
            }
945

946
            result.chunks_skipped += build_output->skipped_checkpoints;
58✔
947

948
            if (!build_output->file_may_match) {
14✔
949
                auto stats = co_await writer.close();
24!
950
                result.stats.partitions[view.name] = std::move(stats);
6!
951
                continue;
952
            }
6✔
953

954
            RecordBatchBuilder builder;
40!
955
            bool schema_locked = false;
40✔
956

957
            for (const auto &candidate : build_output->candidates) {
64!
958
                ViewReaderInput reader_input;
40✔
959
                reader_input.with_file_path(file_path)
40✔
960
                    .with_index_path(resolved_index)
8!
961
                    .with_checkpoint_size(checkpoint_size)
8!
962
                    .with_byte_range(candidate.start_byte, candidate.end_byte)
8!
963
                    .with_checkpoint_idx(candidate.checkpoint_idx)
8!
964
                    .with_event_batch_size(event_batch_size)
8!
965
                    .with_view(view);
8!
966
                reader_input.query = view.query;
8✔
967

968
                ViewReaderUtility reader;
40!
969
                auto gen = reader.process(reader_input);
40!
970
                while (auto opt = co_await gen.next()) {
64!
971
                    auto arrow_batch = opt->to_arrow(builder);
24!
972
                    int rc_write = co_await writer.write_batch(arrow_batch);
32!
973
                    if (rc_write != 0) {
8!
974
                        result.error =
975
                            "Failed to write batch for view: " + view.name;
×
976
                        co_return result;
×
977
                    }
978
                    if (!schema_locked) {
8!
979
                        builder.lock_schema();
8✔
980
                        schema_locked = true;
8✔
981
                    }
8✔
982
                    builder.reset(true);
8!
983
                }
32!
984
                result.chunks_scanned++;
8✔
985
            }
24✔
986

987
            auto stats = co_await writer.close();
32!
988
            result.stats.partitions[view.name] = std::move(stats);
8!
989
            result.stats.total_rows +=
8✔
990
                result.stats.partitions[view.name].total_rows;
8!
991
            result.stats.total_uncompressed_bytes +=
8✔
992
                result.stats.partitions[view.name].total_uncompressed_bytes;
8!
993
        }
86✔
994
    } catch (const std::exception &e) {
69!
995
        result.error = e.what();
×
996
    }
×
997
    co_return result;
13!
998
}
493!
999

1000
struct ViewChunkInfo {
1001
    std::uint64_t checkpoint_idx;
1002
    std::size_t start_byte;
1003
    std::size_t end_byte;
1004
};
1005

1006
struct GetViewChunksResult {
12✔
1007
    std::vector<ViewChunkInfo> chunks;
1008
    std::uint64_t total_checkpoints = 0;
9✔
1009
    std::uint64_t skipped_checkpoints = 0;
9✔
1010
    bool file_may_match = false;
9✔
1011
    std::string error;
1012
};
1013

1014
CoroTask<GetViewChunksResult> get_view_chunks_pipeline(
30!
1015
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
1016
    ViewDefinition view) {
3!
1017
    namespace dft_internal =
1018
        dftracer::utils::utilities::composites::dft::internal;
1019
    GetViewChunksResult result;
3✔
1020

1021
    try {
1022
        std::string resolved_index =
3✔
1023
            index_path.empty()
6!
1024
                ? dft_internal::determine_index_path(file_path, "")
3!
1025
                : index_path;
×
1026

1027
        auto meta_input = MetadataCollectorUtilityInput::from_file(file_path)
3!
1028
                              .with_checkpoint_size(checkpoint_size)
3✔
1029
                              .with_index(resolved_index);
3!
1030
        auto metadata = co_await MetadataCollectorUtility{}.process(meta_input);
9!
1031
        if (!metadata.success) {
9!
1032
            result.error =
1033
                "Failed to collect metadata: " + metadata.error_message;
×
1034
            co_return result;
×
1035
        }
1036

1037
        ViewBuilderInput builder_input;
9✔
1038
        builder_input.with_view(view)
9✔
1039
            .with_file_path(file_path)
3!
1040
            .with_index_path(resolved_index)
3!
1041
            .with_uncompressed_size(metadata.uncompressed_size)
3!
1042
            .with_num_checkpoints(metadata.num_checkpoints);
3✔
1043

1044
        auto build_output =
9✔
1045
            co_await ViewBuilderUtility{}.process(builder_input);
12!
1046
        if (!build_output) {
3!
1047
            result.error = "ViewBuilder failed";
×
1048
            co_return result;
×
1049
        }
1050

1051
        result.file_may_match = build_output->file_may_match;
3!
1052
        result.total_checkpoints = build_output->total_checkpoints;
3!
1053
        result.skipped_checkpoints = build_output->skipped_checkpoints;
3!
1054

1055
        for (const auto &candidate : build_output->candidates) {
7!
1056
            result.chunks.push_back({candidate.checkpoint_idx,
12!
1057
                                     candidate.start_byte, candidate.end_byte});
8✔
1058
        }
4✔
1059
    } catch (const std::exception &e) {
9!
1060
        result.error = e.what();
×
1061
    }
×
1062
    co_return result;
3!
1063
}
39!
1064

1065
struct WriteViewChunkResult {
16✔
1066
    std::string output_file;
1067
    std::uint64_t events_matched = 0;
12✔
1068
    std::uint64_t events_scanned = 0;
12✔
1069
    int64_t rows_written = 0;
12✔
1070
    int64_t bytes_written = 0;
12✔
1071
    std::string error;
1072
};
1073

1074
CoroTask<WriteViewChunkResult> write_view_chunk_pipeline(
56!
1075
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
1076
    ViewDefinition view, std::uint64_t checkpoint_idx, std::size_t start_byte,
1077
    std::size_t end_byte, std::string output_file, IpcCompression compression,
1078
    std::size_t event_batch_size) {
4!
1079
    namespace dft_internal =
1080
        dftracer::utils::utilities::composites::dft::internal;
1081
    WriteViewChunkResult result;
4✔
1082
    result.output_file = output_file;
4!
1083

1084
    try {
1085
        std::string resolved_index =
4✔
1086
            index_path.empty()
8!
1087
                ? dft_internal::determine_index_path(file_path, "")
4!
1088
                : index_path;
×
1089

1090
        dftracer::utils::utilities::common::arrow::IpcWriter writer;
4!
1091
        int rc_open = co_await writer.open(output_file, compression);
12!
1092
        if (rc_open != 0) {
20!
1093
            result.error = "Failed to open output file";
×
1094
            co_return result;
×
1095
        }
1096

1097
        ViewReaderInput reader_input;
20✔
1098
        reader_input.with_file_path(file_path)
20✔
1099
            .with_index_path(resolved_index)
4!
1100
            .with_checkpoint_size(checkpoint_size)
4!
1101
            .with_byte_range(start_byte, end_byte)
4!
1102
            .with_checkpoint_idx(checkpoint_idx)
4!
1103
            .with_event_batch_size(event_batch_size)
4!
1104
            .with_view(view);
4!
1105
        reader_input.query = view.query;
4✔
1106

1107
        RecordBatchBuilder builder;
20!
1108
        bool schema_locked = false;
20✔
1109

1110
        ViewReaderUtility reader;
20!
1111
        auto gen = reader.process(reader_input);
20!
1112
        while (auto opt = co_await gen.next()) {
32!
1113
            result.events_matched += opt->events_matched;
12✔
1114
            result.events_scanned += opt->events_scanned;
12✔
1115
            auto batch = opt->to_arrow(builder);
12!
1116
            if (batch.valid()) {
12!
1117
                result.rows_written += batch.num_rows();
12✔
1118
                int rc = co_await writer.write_batch(batch);
16!
1119
                if (rc != 0) {
4!
1120
                    result.error = "Failed to write batch";
×
1121
                    co_return result;
×
1122
                }
1123
                if (!schema_locked) {
4!
1124
                    builder.lock_schema();
4✔
1125
                    schema_locked = true;
4✔
1126
                }
4✔
1127
                builder.reset(true);
4!
1128
            }
4!
1129
        }
16!
1130

1131
        int rc = co_await writer.close();
16!
1132
        if (rc != 0) {
4!
1133
            result.error = "Failed to close output file";
×
1134
        }
1135
    } catch (const std::exception &e) {
20!
1136
        result.error = e.what();
×
1137
    }
×
1138
    co_return result;
4!
1139
}
116!
1140

1141
struct ChunkDescriptor {
1142
    std::uint64_t checkpoint_idx;
1143
    std::size_t start_byte;
1144
    std::size_t end_byte;
1145
    std::string output_file;
1146
};
1147

1148
struct WriteViewChunksResult {
3✔
1149
    std::vector<WriteViewChunkResult> results;
1150
    int64_t total_rows = 0;
3✔
1151
    int64_t total_events_matched = 0;
3✔
1152
};
1153

1154
CoroTask<WriteViewChunksResult> write_view_chunks_pipeline(
8!
1155
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
1156
    ViewDefinition view, std::vector<ChunkDescriptor> chunks,
1157
    IpcCompression compression, std::size_t event_batch_size) {
1!
1158
    WriteViewChunksResult result;
3✔
1159

1160
    if (chunks.empty()) {
3!
1161
        co_return result;
1!
1162
    }
1163

1164
    std::vector<CoroTask<WriteViewChunkResult>> tasks;
3✔
1165
    tasks.reserve(chunks.size());
3!
1166

1167
    for (const auto &chunk : chunks) {
6✔
1168
        tasks.push_back(write_view_chunk_pipeline(
6!
1169
            file_path, index_path, checkpoint_size, view, chunk.checkpoint_idx,
3!
1170
            chunk.start_byte, chunk.end_byte, chunk.output_file, compression,
3!
1171
            event_batch_size));
3✔
1172
    }
3✔
1173

1174
    result.results = co_await when_all(std::move(tasks));
4!
1175

1176
    for (const auto &r : result.results) {
4✔
1177
        result.total_rows += r.rows_written;
3✔
1178
        result.total_events_matched += r.events_matched;
3✔
1179
    }
3✔
1180

1181
    co_return result;
1!
1182
}
7!
1183

1184
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
1185

1186
TraceReaderConfig build_config(TraceReaderObject *self) {
304✔
1187
    TraceReaderConfig cfg;
304✔
1188
    cfg.file_path = PyUnicode_AsUTF8(self->file_path);
304!
1189
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
304!
1190
    if (idx) cfg.index_dir = idx;
304!
1191
    cfg.checkpoint_size = self->checkpoint_size;
304✔
1192
    cfg.auto_build_index = self->auto_build_index != 0;
304✔
1193
    return cfg;
304✔
1194
}
152!
1195

1196
static Runtime *get_runtime(TraceReaderObject *self) {
302✔
1197
    if (self->runtime_obj) {
302✔
1198
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
68✔
1199
    }
1200
    return get_default_runtime();
234✔
1201
}
151✔
1202

1203
static TraceReaderIteratorObject *make_memoryview_iterator(
142✔
1204
    std::shared_ptr<MemoryViewBatchIteratorState> state) {
1205
    TraceReaderIteratorObject *it =
71✔
1206
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
142✔
1207
            &TraceReaderIteratorType, 0);
1208
    if (!it) return NULL;
142✔
1209
    new (&it->batch_state)
142✔
1210
        std::shared_ptr<MemoryViewBatchIteratorState>(std::move(state));
142✔
1211
    it->current_batch = NULL;
142✔
1212
    it->batch_index = 0;
142✔
1213
    new (&it->json_dict_state) std::shared_ptr<JsonDictIteratorState>();
142✔
1214
    new (&it->json_dict_current_batch) std::shared_ptr<JsonDictBatch>();
142✔
1215
    it->json_dict_index = 0;
142✔
1216
#ifdef DFTRACER_UTILS_ENABLE_ARROW
1217
    new (&it->arrow_state) std::shared_ptr<ArrowIteratorState>();
142✔
1218
#endif
1219
    it->mode = IteratorMode::MEMORYVIEW;
142✔
1220
    return it;
142✔
1221
}
71✔
1222

1223
static TraceReaderIteratorObject *make_json_dict_iterator(
14✔
1224
    std::shared_ptr<JsonDictIteratorState> state) {
1225
    TraceReaderIteratorObject *it =
7✔
1226
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
14✔
1227
            &TraceReaderIteratorType, 0);
1228
    if (!it) return NULL;
14✔
1229
    new (&it->batch_state) std::shared_ptr<MemoryViewBatchIteratorState>();
14✔
1230
    it->current_batch = NULL;
14✔
1231
    it->batch_index = 0;
14✔
1232
    new (&it->json_dict_state)
14✔
1233
        std::shared_ptr<JsonDictIteratorState>(std::move(state));
14✔
1234
    new (&it->json_dict_current_batch) std::shared_ptr<JsonDictBatch>();
14✔
1235
    it->json_dict_index = 0;
14✔
1236
#ifdef DFTRACER_UTILS_ENABLE_ARROW
1237
    new (&it->arrow_state) std::shared_ptr<ArrowIteratorState>();
14✔
1238
#endif
1239
    it->mode = IteratorMode::JSON_DICT;
14✔
1240
    return it;
14✔
1241
}
7✔
1242

1243
#ifdef DFTRACER_UTILS_ENABLE_ARROW
1244
static TraceReaderIteratorObject *make_arrow_iterator(
54✔
1245
    std::shared_ptr<ArrowIteratorState> state) {
1246
    TraceReaderIteratorObject *it =
27✔
1247
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
54✔
1248
            &TraceReaderIteratorType, 0);
1249
    if (!it) return NULL;
54✔
1250
    new (&it->batch_state) std::shared_ptr<MemoryViewBatchIteratorState>();
54✔
1251
    it->current_batch = NULL;
54✔
1252
    it->batch_index = 0;
54✔
1253
    new (&it->json_dict_state) std::shared_ptr<JsonDictIteratorState>();
54✔
1254
    new (&it->json_dict_current_batch) std::shared_ptr<JsonDictBatch>();
54✔
1255
    it->json_dict_index = 0;
54✔
1256
    new (&it->arrow_state)
54✔
1257
        std::shared_ptr<ArrowIteratorState>(std::move(state));
54✔
1258
    it->mode = IteratorMode::ARROW;
54✔
1259
    return it;
54✔
1260
}
27✔
1261
#endif
1262

1263
}  // namespace
1264

1265
static void TraceReader_dealloc(TraceReaderObject *self) {
318✔
1266
    Py_XDECREF(self->file_path);
318✔
1267
    Py_XDECREF(self->index_dir);
318✔
1268
    Py_XDECREF(self->runtime_obj);
318✔
1269
    Py_TYPE(self)->tp_free((PyObject *)self);
318✔
1270
}
318✔
1271

1272
static PyObject *TraceReader_new(PyTypeObject *type, PyObject *args,
318✔
1273
                                 PyObject *kwds) {
1274
    TraceReaderObject *self = (TraceReaderObject *)type->tp_alloc(type, 0);
318✔
1275
    if (self) {
318✔
1276
        self->file_path = NULL;
318✔
1277
        self->index_dir = NULL;
318✔
1278
        self->checkpoint_size =
318✔
1279
            dftracer::utils::constants::indexer::DEFAULT_CHECKPOINT_SIZE;
1280
        self->auto_build_index = 0;
318✔
1281
        self->has_index = 0;
318✔
1282
        self->runtime_obj = NULL;
318✔
1283
    }
159✔
1284
    return (PyObject *)self;
318✔
1285
}
1286

1287
static int TraceReader_init(TraceReaderObject *self, PyObject *args,
318✔
1288
                            PyObject *kwds) {
1289
    static const char *kwlist[] = {
1290
        "path",    "index_dir", "checkpoint_size", "auto_build_index",
1291
        "runtime", NULL};
1292

1293
    const char *file_path;
1294
    const char *index_dir = "";
318✔
1295
    std::size_t checkpoint_size =
318✔
1296
        dftracer::utils::constants::indexer::DEFAULT_CHECKPOINT_SIZE;
1297
    int auto_build_index = 0;
318✔
1298
    PyObject *runtime_arg = NULL;
318✔
1299

1300
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|snpO", (char **)kwlist,
318!
1301
                                     &file_path, &index_dir, &checkpoint_size,
1302
                                     &auto_build_index, &runtime_arg)) {
1303
        return -1;
×
1304
    }
1305

1306
    if (runtime_arg && runtime_arg != Py_None) {
318✔
1307
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
64!
1308
            // Direct C++ Runtime object
1309
            Py_INCREF(runtime_arg);
×
1310
            self->runtime_obj = runtime_arg;
×
1311
        } else {
1312
            // Python wrapper, extract _native attribute
1313
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
64!
1314
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
64!
1315
                self->runtime_obj = native;  // already incref'd by GetAttr
64✔
1316
            } else {
32✔
1317
                Py_XDECREF(native);
×
1318
                PyErr_SetString(PyExc_TypeError,
×
1319
                                "runtime must be a Runtime instance or None");
1320
                return -1;
×
1321
            }
1322
        }
1323
    }
32✔
1324

1325
    self->file_path = PyUnicode_FromString(file_path);
318!
1326
    if (!self->file_path) return -1;
318✔
1327

1328
    self->index_dir = PyUnicode_FromString(index_dir);
318!
1329
    if (!self->index_dir) {
318✔
1330
        Py_DECREF(self->file_path);
×
1331
        self->file_path = NULL;
×
1332
        return -1;
×
1333
    }
1334

1335
    self->checkpoint_size = checkpoint_size;
318✔
1336
    self->auto_build_index = auto_build_index;
318✔
1337

1338
    try {
1339
        TraceReaderConfig cfg;
318✔
1340
        cfg.file_path = file_path;
318!
1341
        cfg.index_dir = index_dir;
318!
1342
        cfg.checkpoint_size = checkpoint_size;
318✔
1343
        cfg.auto_build_index = auto_build_index != 0;
318✔
1344
        TraceReader probe(std::move(cfg));
318!
1345
        self->has_index = probe.has_index() ? 1 : 0;
318!
1346
    } catch (const std::exception &e) {
318!
1347
        set_typed_py_error(e);
×
1348
        Py_DECREF(self->file_path);
×
1349
        Py_DECREF(self->index_dir);
×
1350
        self->file_path = NULL;
×
1351
        self->index_dir = NULL;
×
1352
        return -1;
×
1353
    }
×
1354

1355
    return 0;
318✔
1356
}
159✔
1357

1358
static PyObject *TraceReader_iter_lines(TraceReaderObject *self, PyObject *args,
124✔
1359
                                        PyObject *kwds) {
1360
    static const char *kwlist[] = {"start_line",    "end_line",    "start_byte",
1361
                                   "end_byte",      "buffer_size", "query",
1362
                                   "memory_budget", NULL};
1363
    Py_ssize_t start_line = 0, end_line = 0;
124✔
1364
    Py_ssize_t start_byte = 0, end_byte = 0;
124✔
1365
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
124✔
1366
    const char *query_str = NULL;
124✔
1367
    Py_ssize_t memory_budget = 0;
124✔
1368

1369
    if (!PyArg_ParseTupleAndKeywords(
124!
1370
            args, kwds, "|nnnnnzn", (char **)kwlist, &start_line, &end_line,
62✔
1371
            &start_byte, &end_byte, &buffer_size, &query_str, &memory_budget)) {
1372
        return NULL;
×
1373
    }
1374

1375
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
124!
1376
        buffer_size <= 0) {
118!
1377
        PyErr_SetString(
6!
1378
            PyExc_ValueError,
3✔
1379
            "range arguments must be >= 0; buffer_size must be > 0");
1380
        return NULL;
6✔
1381
    }
1382

1383
    TraceReaderConfig cfg;
118✔
1384
    try {
1385
        cfg = build_config(self);
118!
1386
    } catch (const std::exception &e) {
59!
1387
        set_typed_py_error(e);
×
1388
        return NULL;
×
1389
    }
×
1390

1391
    ReadConfig rc;
118✔
1392
    rc.start_line = static_cast<std::size_t>(start_line);
118✔
1393
    rc.end_line = static_cast<std::size_t>(end_line);
118✔
1394
    rc.start_byte = static_cast<std::size_t>(start_byte);
118✔
1395
    rc.end_byte = static_cast<std::size_t>(end_byte);
118✔
1396
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
118✔
1397
    if (query_str) rc.query = query_str;
118!
1398

1399
    auto state = std::make_shared<MemoryViewBatchIteratorState>();
118!
1400
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
118!
1401
        static_cast<std::size_t>(memory_budget));
59✔
1402

1403
    Runtime *rt = get_runtime(self);
118!
1404
    std::size_t max_workers = rt->threads();
118!
1405
    constexpr std::size_t LINE_BATCH_SIZE = 1024;
118✔
1406
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
118!
1407
        state->memory_budget_bytes, LINE_BATCH_SIZE * ESTIMATED_BYTES_PER_LINE,
118✔
1408
        max_workers);
59✔
1409
    state->channel =
118✔
1410
        dftracer::utils::coro::make_channel<MemoryViewBatchData>(capacity);
177!
1411
    auto *sp = state.get();
118✔
1412

1413
    try {
1414
        bool is_dir = fs::is_directory(cfg.file_path);
118!
1415
        if (is_dir) {
118✔
1416
            auto handle = rt->scope(
6!
1417
                "iter_lines_parallel",
3!
1418
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
27!
1419
                 checkpoint_size = cfg.checkpoint_size,
6✔
1420
                 auto_build_index = cfg.auto_build_index, rc,
9!
1421
                 max_workers](CoroScope &scope) -> CoroTask<void> {
6!
1422
                    co_await produce_lines_parallel(
24!
1423
                        scope, sp, dir_path, index_dir, checkpoint_size,
9!
1424
                        auto_build_index, rc, LINE_BATCH_SIZE, max_workers);
9!
1425
                });
18!
1426
            state->task_future = handle.future;
6!
1427
        } else {
6✔
1428
            auto handle = rt->submit(
112!
1429
                produce_lines_batched(state, state->channel->producer(), cfg,
224!
1430
                                      rc, LINE_BATCH_SIZE),
56!
1431
                "iter_lines");
224!
1432
            state->task_future = handle.future;
112!
1433
        }
112✔
1434
    } catch (const std::exception &e) {
59!
1435
        set_typed_py_error(e);
×
1436
        return NULL;
×
1437
    }
×
1438

1439
    TraceReaderIteratorObject *it = make_memoryview_iterator(std::move(state));
118!
1440
    return (PyObject *)it;
118✔
1441
}
121✔
1442

1443
static PyObject *TraceReader_iter_raw(TraceReaderObject *self, PyObject *args,
26✔
1444
                                      PyObject *kwds) {
1445
    static const char *kwlist[] = {"start_line", "end_line",    "start_byte",
1446
                                   "end_byte",   "buffer_size", "line_aligned",
1447
                                   "multi_line", "query",       "memory_budget",
1448
                                   NULL};
1449
    Py_ssize_t start_line = 0, end_line = 0;
26✔
1450
    Py_ssize_t start_byte = 0, end_byte = 0;
26✔
1451
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
26✔
1452
    int line_aligned = 1;
26✔
1453
    int multi_line = 1;
26✔
1454
    const char *query_str = NULL;
26✔
1455
    Py_ssize_t memory_budget = 0;
26✔
1456

1457
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnppzn", (char **)kwlist,
26!
1458
                                     &start_line, &end_line, &start_byte,
1459
                                     &end_byte, &buffer_size, &line_aligned,
1460
                                     &multi_line, &query_str, &memory_budget)) {
1461
        return NULL;
×
1462
    }
1463

1464
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
26!
1465
        buffer_size <= 0) {
24!
1466
        PyErr_SetString(
2!
1467
            PyExc_ValueError,
1✔
1468
            "range arguments must be >= 0; buffer_size must be > 0");
1469
        return NULL;
2✔
1470
    }
1471

1472
    TraceReaderConfig cfg;
24✔
1473
    try {
1474
        cfg = build_config(self);
24!
1475
    } catch (const std::exception &e) {
12!
1476
        set_typed_py_error(e);
×
1477
        return NULL;
×
1478
    }
×
1479

1480
    ReadConfig rc;
24✔
1481
    rc.start_line = static_cast<std::size_t>(start_line);
24✔
1482
    rc.end_line = static_cast<std::size_t>(end_line);
24✔
1483
    rc.start_byte = static_cast<std::size_t>(start_byte);
24✔
1484
    rc.end_byte = static_cast<std::size_t>(end_byte);
24✔
1485
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
24✔
1486
    rc.line_aligned = line_aligned != 0;
24✔
1487
    rc.multi_line = multi_line != 0;
24✔
1488
    if (query_str) rc.query = query_str;
24!
1489

1490
    auto state = std::make_shared<MemoryViewBatchIteratorState>();
24!
1491
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
24!
1492
        static_cast<std::size_t>(memory_budget));
12✔
1493

1494
    Runtime *rt = get_runtime(self);
24!
1495
    std::size_t max_workers = rt->threads();
24!
1496
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
24!
1497
        state->memory_budget_bytes, ESTIMATED_BYTES_PER_RAW_CHUNK, max_workers);
24✔
1498
    state->channel =
24✔
1499
        dftracer::utils::coro::make_channel<MemoryViewBatchData>(capacity);
36!
1500
    auto *sp = state.get();
24✔
1501

1502
    try {
1503
        bool is_dir = fs::is_directory(cfg.file_path);
24!
1504
        if (is_dir) {
24✔
1505
            auto handle = rt->scope(
4!
1506
                "iter_raw_parallel",
2!
1507
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
18!
1508
                 checkpoint_size = cfg.checkpoint_size,
4✔
1509
                 auto_build_index = cfg.auto_build_index, rc,
6!
1510
                 max_workers](CoroScope &scope) -> CoroTask<void> {
4!
1511
                    co_await produce_raw_parallel(
16!
1512
                        scope, sp, dir_path, index_dir, checkpoint_size,
6!
1513
                        auto_build_index, rc, max_workers);
6!
1514
                });
12!
1515
            state->task_future = handle.future;
4!
1516
        } else {
4✔
1517
            auto handle = rt->submit(
20!
1518
                produce_raw_batched(state, state->channel->producer(), cfg, rc),
30!
1519
                "iter_raw");
40!
1520
            state->task_future = handle.future;
20!
1521
        }
20✔
1522
    } catch (const std::exception &e) {
12!
1523
        set_typed_py_error(e);
×
1524
        return NULL;
×
1525
    }
×
1526

1527
    TraceReaderIteratorObject *it = make_memoryview_iterator(std::move(state));
24!
1528
    return (PyObject *)it;
24✔
1529
}
25✔
1530

1531
static PyObject *TraceReader_read_lines(TraceReaderObject *self, PyObject *args,
92✔
1532
                                        PyObject *kwds) {
1533
    PyObject *iter = TraceReader_iter_lines(self, args, kwds);
92✔
1534
    if (!iter) return NULL;
92✔
1535
    PyObject *list = PySequence_List(iter);
88✔
1536
    Py_DECREF(iter);
44✔
1537
    return list;
88✔
1538
}
46✔
1539

1540
static PyObject *TraceReader_iter_json(TraceReaderObject *self, PyObject *args,
14✔
1541
                                       PyObject *kwds) {
1542
    static const char *kwlist[] = {"start_line", "end_line",      "start_byte",
1543
                                   "end_byte",   "buffer_size",   "query",
1544
                                   "batch_size", "memory_budget", NULL};
1545
    Py_ssize_t start_line = 0, end_line = 0;
14✔
1546
    Py_ssize_t start_byte = 0, end_byte = 0;
14✔
1547
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
14✔
1548
    const char *query_str = NULL;
14✔
1549
    Py_ssize_t batch_size = 1024;
14✔
1550
    Py_ssize_t memory_budget = 0;
14✔
1551

1552
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnznn", (char **)kwlist,
14!
1553
                                     &start_line, &end_line, &start_byte,
1554
                                     &end_byte, &buffer_size, &query_str,
1555
                                     &batch_size, &memory_budget)) {
1556
        return NULL;
×
1557
    }
1558

1559
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
14!
1560
        buffer_size <= 0 || batch_size <= 0) {
14!
1561
        PyErr_SetString(PyExc_ValueError,
×
1562
                        "range arguments must be >= 0; buffer_size and "
1563
                        "batch_size must be > 0");
1564
        return NULL;
×
1565
    }
1566

1567
    TraceReaderConfig cfg;
14✔
1568
    try {
1569
        cfg = build_config(self);
14!
1570
    } catch (const std::exception &e) {
7!
1571
        set_typed_py_error(e);
×
1572
        return NULL;
×
1573
    }
×
1574

1575
    ReadConfig rc;
14✔
1576
    rc.start_line = static_cast<std::size_t>(start_line);
14✔
1577
    rc.end_line = static_cast<std::size_t>(end_line);
14✔
1578
    rc.start_byte = static_cast<std::size_t>(start_byte);
14✔
1579
    rc.end_byte = static_cast<std::size_t>(end_byte);
14✔
1580
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
14✔
1581
    if (query_str) rc.query = query_str;
14!
1582

1583
    auto state = std::make_shared<JsonDictIteratorState>();
14!
1584
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
14!
1585
        static_cast<std::size_t>(memory_budget));
7✔
1586

1587
    Runtime *rt = get_runtime(self);
14!
1588
    std::size_t max_workers = rt->threads();
14!
1589
    auto bs = static_cast<std::size_t>(batch_size);
14✔
1590
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
14!
1591
        state->memory_budget_bytes, bs * ESTIMATED_BYTES_PER_JSON_EVENT,
14✔
1592
        max_workers);
7✔
1593
    state->channel =
14✔
1594
        dftracer::utils::coro::make_channel<JsonDictBatch>(capacity);
21!
1595
    auto *sp = state.get();
14✔
1596

1597
    try {
1598
        bool is_dir = fs::is_directory(cfg.file_path);
14!
1599
        if (is_dir) {
14✔
1600
            auto handle = rt->scope(
12!
1601
                "iter_json_parallel",
6!
1602
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
54!
1603
                 checkpoint_size = cfg.checkpoint_size,
12✔
1604
                 auto_build_index = cfg.auto_build_index, rc, bs,
18!
1605
                 max_workers](CoroScope &scope) -> CoroTask<void> {
12!
1606
                    co_await produce_json_dicts_parallel(
48!
1607
                        scope, sp, dir_path, index_dir, checkpoint_size,
18!
1608
                        auto_build_index, rc, bs, max_workers);
18!
1609
                });
36!
1610
            state->task_future = handle.future;
12!
1611
        } else {
12✔
1612
            auto handle =
1613
                rt->submit(produce_json_dicts(state, state->channel->producer(),
5!
1614
                                              cfg, rc, bs),
1!
1615
                           "iter_json");
4!
1616
            state->task_future = handle.future;
2!
1617
        }
2✔
1618
    } catch (const std::exception &e) {
7!
1619
        set_typed_py_error(e);
×
1620
        return NULL;
×
1621
    }
×
1622

1623
    TraceReaderIteratorObject *it = make_json_dict_iterator(std::move(state));
14!
1624
    return (PyObject *)it;
14✔
1625
}
14✔
1626

1627
static PyObject *TraceReader_read_json_py(TraceReaderObject *self,
2✔
1628
                                          PyObject *args, PyObject *kwds) {
1629
    PyObject *iter = TraceReader_iter_json(self, args, kwds);
2✔
1630
    if (!iter) return NULL;
2✔
1631
    PyObject *list = PySequence_List(iter);
2✔
1632
    Py_DECREF(iter);
1✔
1633
    return list;
2✔
1634
}
1✔
1635

1636
static PyObject *TraceReader_read_raw(TraceReaderObject *self, PyObject *args,
8✔
1637
                                      PyObject *kwds) {
1638
    PyObject *iter = TraceReader_iter_raw(self, args, kwds);
8✔
1639
    if (!iter) return NULL;
8✔
1640
    PyObject *list = PySequence_List(iter);
8✔
1641
    Py_DECREF(iter);
4✔
1642
    return list;
8✔
1643
}
4✔
1644

1645
#ifdef DFTRACER_UTILS_ENABLE_ARROW
1646

1647
static PyObject *TraceReader_iter_arrow(TraceReaderObject *self, PyObject *args,
54✔
1648
                                        PyObject *kwds) {
1649
    static const char *kwlist[] = {
1650
        "batch_size", "start_line",    "end_line", "start_byte",
1651
        "end_byte",   "buffer_size",   "query",    "flatten_objects",
1652
        "normalize",  "memory_budget", NULL};
1653
    Py_ssize_t batch_size = 10000;
54✔
1654
    Py_ssize_t start_line = 0, end_line = 0;
54✔
1655
    Py_ssize_t start_byte = 0, end_byte = 0;
54✔
1656
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
54✔
1657
    const char *query_str = NULL;
54✔
1658
    int flatten_objects = 1;  // default: expand top-level objects
54✔
1659
    int normalize = 0;
54✔
1660
    Py_ssize_t memory_budget = 0;
54✔
1661

1662
    if (!PyArg_ParseTupleAndKeywords(
54!
1663
            args, kwds, "|nnnnnnzppn", (char **)kwlist, &batch_size,
27✔
1664
            &start_line, &end_line, &start_byte, &end_byte, &buffer_size,
1665
            &query_str, &flatten_objects, &normalize, &memory_budget)) {
1666
        return NULL;
×
1667
    }
1668

1669
    if (batch_size <= 0) {
54!
1670
        PyErr_SetString(PyExc_ValueError, "batch_size must be > 0");
×
1671
        return NULL;
×
1672
    }
1673
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
54!
1674
        buffer_size <= 0) {
54!
1675
        PyErr_SetString(
×
1676
            PyExc_ValueError,
1677
            "range arguments must be >= 0; buffer_size must be > 0");
1678
        return NULL;
×
1679
    }
1680

1681
    TraceReaderConfig cfg;
54✔
1682
    try {
1683
        cfg = build_config(self);
54!
1684
    } catch (const std::exception &e) {
27!
1685
        set_typed_py_error(e);
×
1686
        return NULL;
×
1687
    }
×
1688

1689
    ReadConfig rc;
54✔
1690
    rc.start_line = static_cast<std::size_t>(start_line);
54✔
1691
    rc.end_line = static_cast<std::size_t>(end_line);
54✔
1692
    rc.start_byte = static_cast<std::size_t>(start_byte);
54✔
1693
    rc.end_byte = static_cast<std::size_t>(end_byte);
54✔
1694
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
54✔
1695
    rc.flatten_objects = flatten_objects != 0;
54✔
1696
    if (query_str) rc.query = query_str;
54!
1697

1698
    auto state = std::make_shared<ArrowIteratorState>();
54!
1699
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
54!
1700
        static_cast<std::size_t>(memory_budget));
27✔
1701

1702
    Runtime *rt = get_runtime(self);
54!
1703
    std::size_t max_workers = rt->threads();
54!
1704
    auto bs = static_cast<std::size_t>(batch_size);
54✔
1705
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
54!
1706
        state->memory_budget_bytes, bs * ESTIMATED_BYTES_PER_ARROW_ROW,
54✔
1707
        max_workers);
27✔
1708
    state->channel =
54✔
1709
        dftracer::utils::coro::make_channel<ArrowIteratorState::BatchType>(
81!
1710
            capacity);
54✔
1711
    auto *sp = state.get();
54✔
1712

1713
    try {
1714
        bool is_dir = fs::is_directory(cfg.file_path);
54!
1715
        if (is_dir) {
54✔
1716
            auto handle = rt->scope(
10!
1717
                "iter_arrow_parallel",
5!
1718
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
45!
1719
                 checkpoint_size = cfg.checkpoint_size,
10✔
1720
                 auto_build_index = cfg.auto_build_index, rc, bs,
15!
1721
                 norm = normalize != 0,
10✔
1722
                 max_workers](CoroScope &scope) -> CoroTask<void> {
10!
1723
                    co_await produce_arrow_batches_parallel(
40!
1724
                        scope, sp, dir_path, index_dir, checkpoint_size,
15!
1725
                        auto_build_index, rc, bs, norm, max_workers);
15!
1726
                });
30!
1727
            state->task_future = handle.future;
10!
1728
        } else if (normalize) {
54!
1729
            auto handle = rt->submit(
×
1730
                produce_arrow_batches(state, state->channel->producer(), cfg,
×
1731
                                      rc, static_cast<std::size_t>(batch_size),
×
1732
                                      flatten_objects != 0, normalize != 0),
1733
                "iter_arrow");
×
1734
            state->task_future = handle.future;
×
1735
        } else {
×
1736
            std::vector<std::string> files_vec{cfg.file_path};
88!
1737
            auto handle = rt->scope(
44!
1738
                "iter_arrow_parallel",
22!
1739
                [sp, files = std::move(files_vec), index_dir = cfg.index_dir,
198!
1740
                 checkpoint_size = cfg.checkpoint_size,
44✔
1741
                 auto_build_index = cfg.auto_build_index, rc, bs,
66!
1742
                 norm = normalize != 0,
44✔
1743
                 max_workers](CoroScope &scope) mutable -> CoroTask<void> {
44!
1744
                    co_await produce_arrow_batches_for_files(
176!
1745
                        scope, sp, std::move(files), index_dir, checkpoint_size,
66!
1746
                        auto_build_index, rc, bs, norm, max_workers);
66!
1747
                });
132!
1748
            state->task_future = handle.future;
44!
1749
        }
44✔
1750
    } catch (const std::exception &e) {
27!
1751
        set_typed_py_error(e);
×
1752
        return NULL;
×
1753
    }
×
1754

1755
    TraceReaderIteratorObject *it = make_arrow_iterator(std::move(state));
54!
1756
    return (PyObject *)it;
54✔
1757
}
54✔
1758

1759
// Build ArrowIteratorState + spawn the producer task. Same plumbing as
1760
// TraceReader_iter_arrow but returns the state so callers can wrap it as
1761
// either a per-batch iterator or an ArrowArrayStream.
1762
static std::shared_ptr<ArrowIteratorState> spawn_arrow_producer(
56✔
1763
    TraceReaderObject *self, PyObject *args, PyObject *kwds) {
1764
    static const char *kwlist[] = {
1765
        "batch_size", "start_line",    "end_line", "start_byte",
1766
        "end_byte",   "buffer_size",   "query",    "flatten_objects",
1767
        "normalize",  "memory_budget", NULL};
1768
    Py_ssize_t batch_size = 10000;
56✔
1769
    Py_ssize_t start_line = 0, end_line = 0;
56✔
1770
    Py_ssize_t start_byte = 0, end_byte = 0;
56✔
1771
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
56✔
1772
    const char *query_str = NULL;
56✔
1773
    int flatten_objects = 1;  // default: expand top-level objects
56✔
1774
    int normalize = 0;
56✔
1775
    Py_ssize_t memory_budget = 0;
56✔
1776

1777
    if (!PyArg_ParseTupleAndKeywords(
56!
1778
            args, kwds, "|nnnnnnzppn", (char **)kwlist, &batch_size,
28✔
1779
            &start_line, &end_line, &start_byte, &end_byte, &buffer_size,
1780
            &query_str, &flatten_objects, &normalize, &memory_budget)) {
1781
        return nullptr;
×
1782
    }
1783

1784
    if (batch_size <= 0) {
56!
1785
        PyErr_SetString(PyExc_ValueError, "batch_size must be > 0");
×
1786
        return nullptr;
×
1787
    }
1788
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
56!
1789
        buffer_size <= 0) {
56!
1790
        PyErr_SetString(
×
1791
            PyExc_ValueError,
1792
            "range arguments must be >= 0; buffer_size must be > 0");
1793
        return nullptr;
×
1794
    }
1795

1796
    TraceReaderConfig cfg;
56✔
1797
    try {
1798
        cfg = build_config(self);
56!
1799
    } catch (const std::exception &e) {
28!
1800
        set_typed_py_error(e);
×
1801
        return nullptr;
×
1802
    }
×
1803

1804
    ReadConfig rc;
56✔
1805
    rc.start_line = static_cast<std::size_t>(start_line);
56✔
1806
    rc.end_line = static_cast<std::size_t>(end_line);
56✔
1807
    rc.start_byte = static_cast<std::size_t>(start_byte);
56✔
1808
    rc.end_byte = static_cast<std::size_t>(end_byte);
56✔
1809
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
56✔
1810
    rc.flatten_objects = flatten_objects != 0;
56✔
1811
    if (query_str) rc.query = query_str;
56!
1812

1813
    auto state = std::make_shared<ArrowIteratorState>();
56!
1814
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
56!
1815
        static_cast<std::size_t>(memory_budget));
28✔
1816

1817
    Runtime *rt = get_runtime(self);
56!
1818
    std::size_t max_workers = rt->threads();
56!
1819
    auto bs = static_cast<std::size_t>(batch_size);
56✔
1820
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
56!
1821
        state->memory_budget_bytes, bs * ESTIMATED_BYTES_PER_ARROW_ROW,
56✔
1822
        max_workers);
28✔
1823
    state->channel =
56✔
1824
        dftracer::utils::coro::make_channel<ArrowIteratorState::BatchType>(
84!
1825
            capacity);
56✔
1826
    auto *sp = state.get();
56✔
1827

1828
    try {
1829
        bool is_dir = fs::is_directory(cfg.file_path);
56!
1830
        if (is_dir) {
56✔
1831
            auto handle = rt->scope(
20!
1832
                "iter_arrow_parallel",
10!
1833
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
90!
1834
                 checkpoint_size = cfg.checkpoint_size,
20✔
1835
                 auto_build_index = cfg.auto_build_index, rc, bs,
30!
1836
                 norm = normalize != 0,
20✔
1837
                 max_workers](CoroScope &scope) -> CoroTask<void> {
20!
1838
                    co_await produce_arrow_batches_parallel(
80!
1839
                        scope, sp, dir_path, index_dir, checkpoint_size,
30!
1840
                        auto_build_index, rc, bs, norm, max_workers);
30!
1841
                });
60!
1842
            state->task_future = handle.future;
20!
1843
        } else {
20✔
1844
            auto handle = rt->submit(
36!
1845
                produce_arrow_batches(state, state->channel->producer(), cfg,
72!
1846
                                      rc, static_cast<std::size_t>(batch_size),
18!
1847
                                      flatten_objects != 0, normalize != 0),
18✔
1848
                "iter_arrow");
72!
1849
            state->task_future = handle.future;
36!
1850
        }
36✔
1851
    } catch (const std::exception &e) {
28!
1852
        set_typed_py_error(e);
×
1853
        return nullptr;
×
1854
    }
×
1855

1856
    return state;
56✔
1857
}
56✔
1858

1859
static PyObject *TraceReader_iter_arrow_stream(TraceReaderObject *self,
30✔
1860
                                               PyObject *args, PyObject *kwds) {
1861
    auto state = spawn_arrow_producer(self, args, kwds);
30!
1862
    if (!state) return NULL;
30!
1863
    return make_arrow_batch_stream(std::move(state));
30!
1864
}
30✔
1865

1866
static PyObject *TraceReader_read_arrow(TraceReaderObject *self, PyObject *args,
26✔
1867
                                        PyObject *kwds) {
1868
    auto state = spawn_arrow_producer(self, args, kwds);
26!
1869
    if (!state) return NULL;
26!
1870
    PyObject *stream = make_arrow_batch_stream(std::move(state));
26!
1871
    if (!stream) return NULL;
26✔
1872
    return dftracer::utils::python::wrap_arrow_stream_table(stream);
26!
1873
}
26✔
1874

1875
#endif  // DFTRACER_UTILS_ENABLE_ARROW
1876

1877
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
1878

1879
// Parse a single view spec (string name/preset or dict with optional "name"
1880
// and "query") into `view`. Returns false with a Python error set on failure.
1881
// When `strict`, a value that is neither string nor dict raises TypeError;
1882
// otherwise it is silently ignored (leaving `view` default-constructed).
1883
static bool parse_view_spec(PyObject *view_obj, ViewDefinition &view,
10✔
1884
                            bool strict) {
1885
    if (view_obj && view_obj != Py_None) {
10!
1886
        if (PyUnicode_Check(view_obj)) {
2!
1887
            const char *name = PyUnicode_AsUTF8(view_obj);
×
1888
            if (!name) return false;
×
1889
            std::string name_str(name);
×
1890
            if (name_str == "io") {
×
1891
                view = ViewDefinition::io_view();
×
1892
            } else if (name_str == "compute") {
×
1893
                view = ViewDefinition::compute_view();
×
1894
            } else if (name_str == "dlio") {
×
1895
                view = ViewDefinition::dlio_view();
×
1896
            } else {
1897
                view.with_name(name_str);
×
1898
            }
1899
        } else if (PyDict_Check(view_obj)) {
2!
1900
            PyObject *name_obj = PyDict_GetItemString(view_obj, "name");
2✔
1901
            if (name_obj && PyUnicode_Check(name_obj)) {
2!
1902
                view.with_name(PyUnicode_AsUTF8(name_obj));
2!
1903
            }
1✔
1904
            PyObject *query_obj = PyDict_GetItemString(view_obj, "query");
2✔
1905
            if (query_obj && query_obj != Py_None &&
3!
1906
                PyUnicode_Check(query_obj)) {
2✔
1907
                view.with_query(PyUnicode_AsUTF8(query_obj));
2!
1908
            }
1✔
1909
        } else if (strict) {
1!
1910
            PyErr_SetString(PyExc_TypeError, "view must be a string or dict");
×
1911
            return false;
×
1912
        }
1913
    }
1✔
1914
    return true;
10✔
1915
}
5✔
1916

1917
static PyObject *TraceReader_write_arrow(TraceReaderObject *self,
26✔
1918
                                         PyObject *args, PyObject *kwds) {
1919
    static const char *kwlist[] = {"path",        "views",      "chunk_size_mb",
1920
                                   "compression", "batch_size", NULL};
1921
    const char *path = NULL;
26✔
1922
    PyObject *views_obj = Py_None;
26✔
1923
    int chunk_size_mb = 32;
26✔
1924
    const char *compression_str = "zstd";
26✔
1925
    Py_ssize_t batch_size = 10000;
26✔
1926

1927
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|Oisn", (char **)kwlist,
26!
1928
                                     &path, &views_obj, &chunk_size_mb,
1929
                                     &compression_str, &batch_size)) {
1930
        return NULL;
×
1931
    }
1932

1933
    if (chunk_size_mb < 0) {
26✔
1934
        PyErr_SetString(PyExc_ValueError, "chunk_size_mb must be >= 0");
×
1935
        return NULL;
×
1936
    }
1937

1938
    std::vector<ViewDefinition> views;
26✔
1939
    if (views_obj && views_obj != Py_None) {
26!
1940
        if (!PyList_Check(views_obj)) {
10!
1941
            PyErr_SetString(PyExc_TypeError, "views must be a list or None");
×
1942
            return NULL;
×
1943
        }
1944
        Py_ssize_t n = PyList_Size(views_obj);
10!
1945
        for (Py_ssize_t i = 0; i < n; i++) {
22✔
1946
            PyObject *item = PyList_GetItem(views_obj, i);
12!
1947
            ViewDefinition vd;
12✔
1948

1949
            if (PyUnicode_Check(item)) {
12✔
1950
                const char *name = PyUnicode_AsUTF8(item);
2!
1951
                if (!name) return NULL;
2✔
1952
                std::string name_str(name);
2!
1953
                if (name_str == "io") {
2!
1954
                    vd = ViewDefinition::io_view();
2!
1955
                } else if (name_str == "compute") {
1!
1956
                    vd = ViewDefinition::compute_view();
×
1957
                } else if (name_str == "dlio") {
×
1958
                    vd = ViewDefinition::dlio_view();
×
1959
                } else {
1960
                    vd.with_name(name_str);
×
1961
                }
1962
            } else if (PyDict_Check(item)) {
12!
1963
                PyObject *name_obj = PyDict_GetItemString(item, "name");
10!
1964
                if (!name_obj || !PyUnicode_Check(name_obj)) {
10!
1965
                    PyErr_SetString(PyExc_ValueError,
×
1966
                                    "view dict must have 'name' string");
1967
                    return NULL;
×
1968
                }
1969
                vd.with_name(PyUnicode_AsUTF8(name_obj));
10!
1970

1971
                PyObject *query_obj = PyDict_GetItemString(item, "query");
10!
1972
                if (query_obj && query_obj != Py_None) {
10!
1973
                    if (!PyUnicode_Check(query_obj)) {
10!
1974
                        PyErr_SetString(PyExc_ValueError,
×
1975
                                        "view 'query' must be a string");
1976
                        return NULL;
×
1977
                    }
1978
                    vd.with_query(PyUnicode_AsUTF8(query_obj));
10!
1979
                }
5✔
1980

1981
                PyObject *meta_obj =
5✔
1982
                    PyDict_GetItemString(item, "include_metadata");
10!
1983
                if (meta_obj && meta_obj != Py_None) {
10!
1984
                    vd.with_include_metadata(PyObject_IsTrue(meta_obj));
2!
1985
                }
1✔
1986
            } else {
5✔
1987
                PyErr_SetString(PyExc_TypeError,
×
1988
                                "views list must contain strings or dicts");
1989
                return NULL;
×
1990
            }
1991
            views.push_back(std::move(vd));
12!
1992
        }
12✔
1993
    }
5✔
1994

1995
    IpcCompression compression = IpcCompression::ZSTD;
26✔
1996
    if (compression_str) {
26!
1997
        std::string comp_lower(compression_str);
26!
1998
        for (auto &c : comp_lower) c = std::tolower(c);
130!
1999
        if (comp_lower == "none") {
26✔
2000
            compression = IpcCompression::NONE;
2✔
2001
        } else if (comp_lower == "zstd") {
25✔
2002
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
2003
            compression = IpcCompression::ZSTD;
24✔
2004
#else
2005
            PyErr_SetString(
2006
                PyExc_ValueError,
2007
                "ZSTD compression not available (built without ZSTD)");
2008
            return NULL;
2009
#endif
2010
        } else {
12✔
2011
            PyErr_Format(PyExc_ValueError,
×
2012
                         "Unknown compression: %s (use 'none' or 'zstd')",
2013
                         compression_str);
2014
            return NULL;
×
2015
        }
2016
    }
26✔
2017

2018
    int64_t chunk_size_bytes =
26✔
2019
        static_cast<int64_t>(chunk_size_mb) * 1024 * 1024;
26✔
2020

2021
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
26!
2022
    std::string index_path;
26✔
2023
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
26!
2024
    if (idx && idx[0] != '\0') {
26!
2025
        index_path = idx;
×
2026
    }
2027
    std::size_t checkpoint_size = self->checkpoint_size;
26✔
2028

2029
    std::string output_path(path);
26!
2030
    WriteArrowResult result;
26✔
2031
    if (!run_blocking_r(
26!
2032
            [&] {
39✔
2033
                Runtime *rt = get_runtime(self);
26✔
2034
                return rt
26✔
2035
                    ->submit(
65!
2036
                        write_arrow_pipeline(
52!
2037
                            file_path, index_path, checkpoint_size,
26!
2038
                            std::move(views), output_path, chunk_size_bytes,
26!
2039
                            compression, static_cast<std::size_t>(batch_size)),
26!
2040
                        "write_arrow")
13!
2041
                    .get();
39!
2042
            },
2043
            result)) {
2044
        return NULL;
×
2045
    }
2046

2047
    if (!result.error.empty()) {
26!
2048
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
2049
        return NULL;
×
2050
    }
2051

2052
    // Build result dict
2053
    PyObject *dict = PyDict_New();
26!
2054
    if (!dict) return NULL;
26✔
2055

2056
    // Build files list per partition
2057
    PyObject *partitions_dict = PyDict_New();
26!
2058
    if (!partitions_dict) {
26!
2059
        Py_DECREF(dict);
×
2060
        return NULL;
×
2061
    }
2062

2063
    for (const auto &[partition_name, partition_stats] :
54!
2064
         result.stats.partitions) {
53✔
2065
        PyObject *partition_dict = PyDict_New();
28!
2066
        if (!partition_dict) {
28!
2067
            Py_DECREF(partitions_dict);
×
2068
            Py_DECREF(dict);
×
2069
            return NULL;
×
2070
        }
2071

2072
        PyObject *files_list = PyList_New(0);
28!
2073
        if (!files_list) {
28!
2074
            Py_DECREF(partition_dict);
×
2075
            Py_DECREF(partitions_dict);
×
2076
            Py_DECREF(dict);
×
2077
            return NULL;
×
2078
        }
2079

2080
        for (const auto &f : partition_stats.files) {
44✔
2081
            PyObject *file_str = PyUnicode_FromString(f.c_str());
16!
2082
            if (!file_str || PyList_Append(files_list, file_str) < 0) {
16!
2083
                Py_XDECREF(file_str);
×
2084
                Py_DECREF(files_list);
×
2085
                Py_DECREF(partition_dict);
×
2086
                Py_DECREF(partitions_dict);
×
2087
                Py_DECREF(dict);
×
2088
                return NULL;
×
2089
            }
2090
            Py_DECREF(file_str);
8!
2091
        }
2092

2093
        PyDict_SetItemString(partition_dict, "files", files_list);
28!
2094
        dict_set_steal(partition_dict, "rows",
28!
2095
                       PyLong_FromLongLong(partition_stats.total_rows));
28!
2096
        dict_set_steal(
28!
2097
            partition_dict, "bytes",
14✔
2098
            PyLong_FromLongLong(partition_stats.total_uncompressed_bytes));
28!
2099
        Py_DECREF(files_list);
14!
2100

2101
        PyObject *key = partition_name.empty()
42!
2102
                            ? PyUnicode_FromString("_default")
14!
2103
                            : PyUnicode_FromString(partition_name.c_str());
28!
2104
        PyDict_SetItem(partitions_dict, key, partition_dict);
28!
2105
        Py_DECREF(key);
14!
2106
        Py_DECREF(partition_dict);
14!
2107
    }
2108

2109
    PyDict_SetItemString(dict, "partitions", partitions_dict);
26!
2110
    dict_set_steal(dict, "total_rows",
26!
2111
                   PyLong_FromLongLong(result.stats.total_rows));
26!
2112
    dict_set_steal(dict, "total_bytes",
26!
2113
                   PyLong_FromLongLong(result.stats.total_uncompressed_bytes));
26!
2114
    dict_set_steal(dict, "chunks_scanned",
26!
2115
                   PyLong_FromUnsignedLongLong(result.chunks_scanned));
26!
2116
    dict_set_steal(dict, "chunks_skipped",
26!
2117
                   PyLong_FromUnsignedLongLong(result.chunks_skipped));
26!
2118
    Py_DECREF(partitions_dict);
13!
2119

2120
    return dict;
26✔
2121
}
26✔
2122

2123
static PyObject *TraceReader_get_view_chunks(TraceReaderObject *self,
6✔
2124
                                             PyObject *args, PyObject *kwds) {
2125
    static const char *kwlist[] = {"view", NULL};
2126
    PyObject *view_obj = Py_None;
6✔
2127

2128
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char **)kwlist,
6!
2129
                                     &view_obj)) {
2130
        return NULL;
×
2131
    }
2132

2133
    ViewDefinition view;
6✔
2134
    if (!parse_view_spec(view_obj, view, /*strict=*/true)) return NULL;
6!
2135

2136
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
6!
2137
    std::string index_path;
6✔
2138
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
6!
2139
    if (idx && idx[0] != '\0') {
6!
2140
        index_path = idx;
×
2141
    }
2142
    std::size_t checkpoint_size = self->checkpoint_size;
6✔
2143

2144
    GetViewChunksResult result;
6✔
2145
    if (!run_blocking_r(
6!
2146
            [&] {
9✔
2147
                Runtime *rt = get_runtime(self);
6✔
2148
                return rt
6✔
2149
                    ->submit(get_view_chunks_pipeline(file_path, index_path,
18!
2150
                                                      checkpoint_size, view),
6!
2151
                             "get_view_chunks")
3!
2152
                    .get();
9!
2153
            },
2154
            result)) {
2155
        return NULL;
×
2156
    }
2157

2158
    if (!result.error.empty()) {
6!
2159
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
2160
        return NULL;
×
2161
    }
2162

2163
    PyObject *dict = PyDict_New();
6!
2164
    if (!dict) return NULL;
6✔
2165

2166
    PyObject *chunks_list = PyList_New(result.chunks.size());
6!
2167
    if (!chunks_list) {
6!
2168
        Py_DECREF(dict);
×
2169
        return NULL;
×
2170
    }
2171

2172
    for (std::size_t i = 0; i < result.chunks.size(); ++i) {
14✔
2173
        const auto &chunk = result.chunks[i];
8✔
2174
        PyObject *chunk_dict = PyDict_New();
8!
2175
        if (!chunk_dict) {
8!
2176
            Py_DECREF(chunks_list);
×
2177
            Py_DECREF(dict);
×
2178
            return NULL;
×
2179
        }
2180
        dict_set_steal(chunk_dict, "checkpoint_idx",
8!
2181
                       PyLong_FromUnsignedLongLong(chunk.checkpoint_idx));
8!
2182
        dict_set_steal(chunk_dict, "start_byte",
8!
2183
                       PyLong_FromSize_t(chunk.start_byte));
8!
2184
        dict_set_steal(chunk_dict, "end_byte",
8!
2185
                       PyLong_FromSize_t(chunk.end_byte));
8!
2186
        PyList_SetItem(chunks_list, i, chunk_dict);
8!
2187
    }
4✔
2188

2189
    PyDict_SetItemString(dict, "chunks", chunks_list);
6!
2190
    dict_set_steal(dict, "total_checkpoints",
6!
2191
                   PyLong_FromUnsignedLongLong(result.total_checkpoints));
6!
2192
    dict_set_steal(dict, "skipped_checkpoints",
6!
2193
                   PyLong_FromUnsignedLongLong(result.skipped_checkpoints));
6!
2194
    dict_set_steal(dict, "file_may_match",
6!
2195
                   PyBool_FromLong(result.file_may_match ? 1 : 0));
6✔
2196
    Py_DECREF(chunks_list);
3!
2197

2198
    return dict;
6✔
2199
}
6✔
2200

2201
static PyObject *TraceReader_write_view_chunk(TraceReaderObject *self,
2✔
2202
                                              PyObject *args, PyObject *kwds) {
2203
    static const char *kwlist[] = {
2204
        "output_file", "checkpoint_idx", "start_byte", "end_byte",
2205
        "view",        "compression",    "batch_size", NULL};
2206
    const char *output_file = NULL;
2✔
2207
    unsigned long long checkpoint_idx = 0;
2✔
2208
    Py_ssize_t start_byte = 0;
2✔
2209
    Py_ssize_t end_byte = 0;
2✔
2210
    PyObject *view_obj = Py_None;
2✔
2211
    const char *compression_str = "zstd";
2✔
2212
    Py_ssize_t batch_size = 10000;
2✔
2213

2214
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "sKnn|Osn", (char **)kwlist,
2!
2215
                                     &output_file, &checkpoint_idx, &start_byte,
2216
                                     &end_byte, &view_obj, &compression_str,
2217
                                     &batch_size)) {
2218
        return NULL;
×
2219
    }
2220

2221
    IpcCompression compression = IpcCompression::ZSTD;
2✔
2222
    if (compression_str) {
2✔
2223
        std::string comp_lower(compression_str);
2!
2224
        for (auto &c : comp_lower) c = std::tolower(c);
10!
2225
        if (comp_lower == "none") {
2!
2226
            compression = IpcCompression::NONE;
×
2227
        } else if (comp_lower == "zstd") {
2✔
2228
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
2229
            compression = IpcCompression::ZSTD;
2✔
2230
#else
2231
            PyErr_SetString(PyExc_ValueError, "ZSTD compression not available");
2232
            return NULL;
2233
#endif
2234
        }
1✔
2235
    }
2✔
2236

2237
    ViewDefinition view;
2✔
2238
    if (!parse_view_spec(view_obj, view, /*strict=*/false)) return NULL;
2!
2239

2240
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
2!
2241
    std::string index_path;
2✔
2242
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
2!
2243
    if (idx && idx[0] != '\0') {
2!
2244
        index_path = idx;
×
2245
    }
2246
    std::size_t checkpoint_size = self->checkpoint_size;
2✔
2247

2248
    WriteViewChunkResult result;
2✔
2249
    if (!run_blocking_r(
2!
2250
            [&] {
3✔
2251
                Runtime *rt = get_runtime(self);
2✔
2252
                return rt
2✔
2253
                    ->submit(write_view_chunk_pipeline(
6!
2254
                                 file_path, index_path, checkpoint_size, view,
2!
2255
                                 checkpoint_idx,
2✔
2256
                                 static_cast<std::size_t>(start_byte),
2✔
2257
                                 static_cast<std::size_t>(end_byte),
2!
2258
                                 std::string(output_file), compression,
3!
2259
                                 static_cast<std::size_t>(batch_size)),
2✔
2260
                             "write_view_chunk")
1!
2261
                    .get();
3!
2262
            },
2263
            result)) {
2264
        return NULL;
×
2265
    }
2266

2267
    if (!result.error.empty()) {
2!
2268
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
2269
        return NULL;
×
2270
    }
2271

2272
    PyObject *dict = PyDict_New();
2!
2273
    if (!dict) return NULL;
2✔
2274

2275
    dict_set_steal(dict, "output_file",
2!
2276
                   PyUnicode_FromString(result.output_file.c_str()));
1!
2277
    dict_set_steal(dict, "events_matched",
2!
2278
                   PyLong_FromUnsignedLongLong(result.events_matched));
2!
2279
    dict_set_steal(dict, "events_scanned",
2!
2280
                   PyLong_FromUnsignedLongLong(result.events_scanned));
2!
2281
    dict_set_steal(dict, "rows_written",
2!
2282
                   PyLong_FromLongLong(result.rows_written));
2!
2283
    dict_set_steal(dict, "bytes_written",
2!
2284
                   PyLong_FromLongLong(result.bytes_written));
2!
2285

2286
    return dict;
2✔
2287
}
2✔
2288

2289
static PyObject *TraceReader_write_view_chunks(TraceReaderObject *self,
2✔
2290
                                               PyObject *args, PyObject *kwds) {
2291
    static const char *kwlist[] = {"chunks",      "output_dir", "view",
2292
                                   "compression", "batch_size", NULL};
2293
    PyObject *chunks_list = NULL;
2✔
2294
    const char *output_dir = NULL;
2✔
2295
    PyObject *view_obj = Py_None;
2✔
2296
    const char *compression_str = "zstd";
2✔
2297
    Py_ssize_t batch_size = 10000;
2✔
2298

2299
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "Os|Osn", (char **)kwlist,
2!
2300
                                     &chunks_list, &output_dir, &view_obj,
2301
                                     &compression_str, &batch_size)) {
2302
        return NULL;
×
2303
    }
2304

2305
    if (!PyList_Check(chunks_list)) {
2✔
2306
        PyErr_SetString(PyExc_TypeError, "chunks must be a list");
×
2307
        return NULL;
×
2308
    }
2309

2310
    IpcCompression compression = IpcCompression::ZSTD;
2✔
2311
    if (strcmp(compression_str, "none") == 0) {
2✔
2312
        compression = IpcCompression::NONE;
×
2313
    } else if (strcmp(compression_str, "zstd") != 0) {
2!
2314
        PyErr_SetString(PyExc_ValueError,
×
2315
                        "compression must be 'zstd' or 'none'");
2316
        return NULL;
×
2317
    }
2318

2319
    ViewDefinition view;
2✔
2320
    if (!parse_view_spec(view_obj, view, /*strict=*/false)) return NULL;
2!
2321

2322
    std::vector<ChunkDescriptor> chunks;
2✔
2323
    Py_ssize_t num_chunks = PyList_Size(chunks_list);
2!
2324
    chunks.reserve(static_cast<std::size_t>(num_chunks));
2!
2325

2326
    for (Py_ssize_t i = 0; i < num_chunks; i++) {
8✔
2327
        PyObject *chunk_dict = PyList_GetItem(chunks_list, i);
6!
2328
        if (!PyDict_Check(chunk_dict)) {
6!
2329
            PyErr_SetString(PyExc_TypeError, "each chunk must be a dict");
×
2330
            return NULL;
×
2331
        }
2332

2333
        ChunkDescriptor desc;
6✔
2334

2335
        PyObject *cp_idx = PyDict_GetItemString(chunk_dict, "checkpoint_idx");
6!
2336
        PyObject *start = PyDict_GetItemString(chunk_dict, "start_byte");
6!
2337
        PyObject *end = PyDict_GetItemString(chunk_dict, "end_byte");
6!
2338

2339
        if (!cp_idx || !start || !end) {
6!
2340
            PyErr_SetString(
×
2341
                PyExc_KeyError,
2342
                "chunk must have checkpoint_idx, start_byte, end_byte");
2343
            return NULL;
×
2344
        }
2345

2346
        desc.checkpoint_idx =
6✔
2347
            static_cast<std::uint64_t>(PyLong_AsUnsignedLongLong(cp_idx));
6!
2348
        desc.start_byte =
6✔
2349
            static_cast<std::size_t>(PyLong_AsUnsignedLongLong(start));
6!
2350
        desc.end_byte =
6✔
2351
            static_cast<std::size_t>(PyLong_AsUnsignedLongLong(end));
6!
2352

2353
        char filename[64];
2354
        snprintf(filename, sizeof(filename), "chunk-%05llu.arrow",
9✔
2355
                 (unsigned long long)desc.checkpoint_idx);
6✔
2356
        desc.output_file = std::string(output_dir) + "/" + filename;
6!
2357

2358
        chunks.push_back(std::move(desc));
6!
2359
    }
6✔
2360

2361
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
2!
2362
    std::string index_path;
2✔
2363
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
2!
2364
    if (idx && idx[0] != '\0') {
2!
2365
        index_path = idx;
×
2366
    }
2367
    std::size_t checkpoint_size = self->checkpoint_size;
2✔
2368

2369
    WriteViewChunksResult result;
2✔
2370
    if (!run_blocking_r(
2!
2371
            [&] {
3✔
2372
                Runtime *rt = get_runtime(self);
2✔
2373
                return rt
2✔
2374
                    ->submit(write_view_chunks_pipeline(
6!
2375
                                 file_path, index_path, checkpoint_size, view,
2!
2376
                                 std::move(chunks), compression,
2✔
2377
                                 static_cast<std::size_t>(batch_size)),
2✔
2378
                             "write_view_chunks")
1!
2379
                    .get();
3!
2380
            },
2381
            result)) {
2382
        return NULL;
×
2383
    }
2384

2385
    PyObject *dict = PyDict_New();
2!
2386
    if (!dict) return NULL;
2✔
2387

2388
    PyObject *results_list =
1✔
2389
        PyList_New(static_cast<Py_ssize_t>(result.results.size()));
2!
2390
    if (!results_list) {
2!
2391
        Py_DECREF(dict);
×
2392
        return NULL;
×
2393
    }
2394

2395
    for (std::size_t i = 0; i < result.results.size(); i++) {
8✔
2396
        const auto &r = result.results[i];
6✔
2397
        PyObject *item = PyDict_New();
6!
2398
        if (!item) {
6!
2399
            Py_DECREF(results_list);
×
2400
            Py_DECREF(dict);
×
2401
            return NULL;
×
2402
        }
2403
        dict_set_steal(item, "output_file",
6!
2404
                       PyUnicode_FromString(r.output_file.c_str()));
3!
2405
        dict_set_steal(item, "rows_written",
6!
2406
                       PyLong_FromLongLong(r.rows_written));
6!
2407
        dict_set_steal(item, "events_matched",
6!
2408
                       PyLong_FromUnsignedLongLong(r.events_matched));
6!
2409
        if (!r.error.empty()) {
6!
2410
            dict_set_steal(item, "error",
×
2411
                           PyUnicode_FromString(r.error.c_str()));
×
2412
        }
2413
        PyList_SetItem(results_list, static_cast<Py_ssize_t>(i), item);
6!
2414
    }
3✔
2415

2416
    PyDict_SetItemString(dict, "results", results_list);
2!
2417
    Py_DECREF(results_list);
1!
2418
    dict_set_steal(dict, "total_rows", PyLong_FromLongLong(result.total_rows));
2!
2419
    dict_set_steal(dict, "total_events_matched",
2!
2420
                   PyLong_FromLongLong(result.total_events_matched));
2!
2421

2422
    return dict;
2✔
2423
}
2✔
2424

2425
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
2426

2427
static PyObject *TraceReader_enter(TraceReaderObject *self,
118!
2428
                                   PyObject *Py_UNUSED(ignored)) {
2429
    Py_INCREF(self);
59✔
2430
    return (PyObject *)self;
118✔
2431
}
2432

2433
static PyObject *TraceReader_exit(TraceReaderObject *self, PyObject *args) {
116✔
2434
    Py_RETURN_NONE;
116✔
2435
}
2436

2437
static PyObject *TraceReader_get_file_path(TraceReaderObject *self,
14✔
2438
                                           void *closure) {
2439
    Py_INCREF(self->file_path);
14!
2440
    return self->file_path;
14✔
2441
}
2442

2443
static PyObject *TraceReader_get_index_dir(TraceReaderObject *self,
6✔
2444
                                           void *closure) {
2445
    Py_INCREF(self->index_dir);
6✔
2446
    return self->index_dir;
6✔
2447
}
2448

2449
static PyObject *TraceReader_get_has_index(TraceReaderObject *self,
12✔
2450
                                           void *closure) {
2451
    return PyBool_FromLong(self->has_index);
12✔
2452
}
2453

2454
static PyObject *TraceReader_get_num_lines_prop(TraceReaderObject *self,
8✔
2455
                                                void *closure) {
2456
    try {
2457
        TraceReaderConfig cfg = build_config(self);
8!
2458
        TraceReader reader(std::move(cfg));
8!
2459
        std::size_t n = reader.get_num_lines();
8!
2460
        if (n > 0) return PyLong_FromSize_t(n);
8!
2461
    } catch (...) {
8!
2462
    }
×
2463
    PyObject *empty_args = PyTuple_New(0);
8✔
2464
    if (!empty_args) return NULL;
8✔
2465
    PyObject *list = TraceReader_read_lines(self, empty_args, NULL);
8✔
2466
    Py_DECREF(empty_args);
4✔
2467
    if (!list) return NULL;
8✔
2468
    Py_ssize_t n = PyList_GET_SIZE(list);
8✔
2469
    Py_DECREF(list);
4✔
2470
    return PyLong_FromSsize_t(n);
8✔
2471
}
4✔
2472

2473
static PyObject *TraceReader_get_max_bytes(TraceReaderObject *self,
24✔
2474
                                           PyObject *Py_UNUSED(ignored)) {
2475
    try {
2476
        TraceReaderConfig cfg = build_config(self);
24!
2477
        TraceReader reader(std::move(cfg));
24!
2478
        return PyLong_FromSize_t(reader.get_max_bytes());
24!
2479
    } catch (const std::exception &e) {
24!
2480
        set_typed_py_error(e);
×
2481
        return NULL;
×
2482
    }
×
2483
}
12✔
2484

2485
static PyObject *TraceReader_get_num_lines(TraceReaderObject *self,
6✔
2486
                                           PyObject *Py_UNUSED(ignored)) {
2487
    try {
2488
        TraceReaderConfig cfg = build_config(self);
6!
2489
        TraceReader reader(std::move(cfg));
6!
2490
        return PyLong_FromSize_t(reader.get_num_lines());
6!
2491
    } catch (const std::exception &e) {
6!
2492
        set_typed_py_error(e);
×
2493
        return NULL;
×
2494
    }
×
2495
}
3✔
2496

2497
static PyMethodDef TraceReader_methods[] = {
2498
    {"iter_lines", (PyCFunction)TraceReader_iter_lines,
2499
     METH_VARARGS | METH_KEYWORDS,
2500
     "Return an iterator over decoded lines.\n"
2501
     "\n"
2502
     "Args:\n"
2503
     "    start_line (int): First line (0 = beginning).\n"
2504
     "    end_line (int): Last line (0 = end of file).\n"
2505
     "    start_byte (int): First byte offset (0 = beginning).\n"
2506
     "    end_byte (int): Last byte offset (0 = end of file).\n"
2507
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
2508
    {"iter_raw", (PyCFunction)TraceReader_iter_raw,
2509
     METH_VARARGS | METH_KEYWORDS,
2510
     "Return an iterator over raw byte chunks.\n"
2511
     "\n"
2512
     "Args:\n"
2513
     "    start_line (int): First line (0 = beginning).\n"
2514
     "    end_line (int): Last line (0 = end of file).\n"
2515
     "    start_byte (int): First byte offset (0 = beginning).\n"
2516
     "    end_byte (int): Last byte offset (0 = end of file).\n"
2517
     "    buffer_size (int): Internal read buffer size in bytes.\n"
2518
     "    line_aligned (bool): Align chunks to line boundaries.\n"
2519
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
2520
    {"read_lines", (PyCFunction)TraceReader_read_lines,
2521
     METH_VARARGS | METH_KEYWORDS,
2522
     "Read all lines and return as list.\n"
2523
     "\n"
2524
     "Args:\n"
2525
     "    start_line (int): First line (0 = beginning).\n"
2526
     "    end_line (int): Last line (0 = end of file).\n"
2527
     "    start_byte (int): First byte offset (0 = beginning).\n"
2528
     "    end_byte (int): Last byte offset (0 = end of file).\n"
2529
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
2530
    {"iter_json", (PyCFunction)TraceReader_iter_json,
2531
     METH_VARARGS | METH_KEYWORDS,
2532
     "Return an iterator over parsed JSON events as Python dicts.\n"
2533
     "\n"
2534
     "Each event is parsed once in C++ (single-pass simdjson ondemand)\n"
2535
     "and yielded as a Python dict. No double-parsing overhead.\n"
2536
     "\n"
2537
     "Args:\n"
2538
     "    start_line (int): First line (0 = beginning).\n"
2539
     "    end_line (int): Last line (0 = end of file).\n"
2540
     "    start_byte (int): First byte offset (0 = beginning).\n"
2541
     "    end_byte (int): Last byte offset (0 = end of file).\n"
2542
     "    buffer_size (int): Internal read buffer size in bytes.\n"
2543
     "    query (str): Optional query filter.\n"
2544
     "    batch_size (int): Events per internal batch (default 1024).\n"},
2545
    {"read_json", (PyCFunction)TraceReader_read_json_py,
2546
     METH_VARARGS | METH_KEYWORDS,
2547
     "Read all events as parsed Python dicts (list).\n"
2548
     "\n"
2549
     "Equivalent to list(iter_json(...)).\n"},
2550
    {"read_raw", (PyCFunction)TraceReader_read_raw,
2551
     METH_VARARGS | METH_KEYWORDS,
2552
     "Read all raw chunks and return as list.\n"
2553
     "\n"
2554
     "Args:\n"
2555
     "    start_line (int): First line (0 = beginning).\n"
2556
     "    end_line (int): Last line (0 = end of file).\n"
2557
     "    start_byte (int): First byte offset (0 = beginning).\n"
2558
     "    end_byte (int): Last byte offset (0 = end of file).\n"
2559
     "    buffer_size (int): Internal read buffer size in bytes.\n"
2560
     "    line_aligned (bool): Align chunks to line boundaries.\n"
2561
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
2562
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2563
    {"iter_arrow", (PyCFunction)TraceReader_iter_arrow,
2564
     METH_VARARGS | METH_KEYWORDS,
2565
     "Return an iterator over Arrow record batches.\n"
2566
     "\n"
2567
     "Args:\n"
2568
     "    batch_size (int): Maximum rows per Arrow batch.\n"
2569
     "    start_line (int): First line (0 = beginning).\n"
2570
     "    end_line (int): Last line (0 = end of file).\n"
2571
     "    start_byte (int): First byte offset (0 = beginning).\n"
2572
     "    end_byte (int): Last byte offset (0 = end of file).\n"
2573
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
2574
    {"iter_arrow_stream", (PyCFunction)TraceReader_iter_arrow_stream,
2575
     METH_VARARGS | METH_KEYWORDS,
2576
     "Return an _ArrowBatchStream that exposes Arrow record batches\n"
2577
     "via the Arrow C Data Interface stream protocol\n"
2578
     "(__arrow_c_stream__). PyArrow can drain the producer channel\n"
2579
     "with a single call, without per-batch Python iteration.\n"},
2580
    {"read_arrow", (PyCFunction)TraceReader_read_arrow,
2581
     METH_VARARGS | METH_KEYWORDS,
2582
     "Read all events as a materialized ArrowTable.\n"
2583
     "\n"
2584
     "Args:\n"
2585
     "    batch_size (int): Maximum rows per Arrow batch.\n"
2586
     "    start_line (int): First line (0 = beginning).\n"
2587
     "    end_line (int): Last line (0 = end of file).\n"
2588
     "    start_byte (int): First byte offset (0 = beginning).\n"
2589
     "    end_byte (int): Last byte offset (0 = end of file).\n"
2590
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
2591
#endif
2592
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
2593
    {"write_arrow", (PyCFunction)TraceReader_write_arrow,
2594
     METH_VARARGS | METH_KEYWORDS,
2595
     "Write trace data to partitioned Arrow IPC files.\n"
2596
     "\n"
2597
     "Args:\n"
2598
     "    path (str): Output directory path.\n"
2599
     "    partition_by (list[str] or None): Column names to partition by.\n"
2600
     "    num_buckets (int): Number of hash buckets (0 = no bucketing).\n"
2601
     "    chunk_size_mb (int): Max uncompressed MB per file (default 32).\n"
2602
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
2603
     "    batch_size (int): Rows per internal batch (default 10000).\n"
2604
     "    normalize (bool): Use normalized schema (default False).\n"
2605
     "\n"
2606
     "Returns:\n"
2607
     "    dict: Statistics including partitions, total_rows, total_bytes.\n"},
2608
    {"get_view_chunks", (PyCFunction)TraceReader_get_view_chunks,
2609
     METH_VARARGS | METH_KEYWORDS,
2610
     "Get candidate chunks for a view after bloom filter pruning.\n"
2611
     "\n"
2612
     "Args:\n"
2613
     "    view (str or dict): View name ('io', 'compute', 'dlio') or\n"
2614
     "                        dict with 'name' and optional 'query'.\n"
2615
     "\n"
2616
     "Returns:\n"
2617
     "    dict: chunks list, total_checkpoints, skipped_checkpoints.\n"},
2618
    {"write_view_chunk", (PyCFunction)TraceReader_write_view_chunk,
2619
     METH_VARARGS | METH_KEYWORDS,
2620
     "Write a single chunk to an Arrow IPC file.\n"
2621
     "\n"
2622
     "Args:\n"
2623
     "    output_file (str): Path to output Arrow IPC file.\n"
2624
     "    checkpoint_idx (int): Checkpoint index.\n"
2625
     "    start_byte (int): Start byte offset.\n"
2626
     "    end_byte (int): End byte offset.\n"
2627
     "    view (str or dict): View definition.\n"
2628
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
2629
     "    batch_size (int): Events per batch (default 10000).\n"
2630
     "\n"
2631
     "Returns:\n"
2632
     "    dict: output_file, events_matched, rows_written, bytes_written.\n"},
2633
    {"write_view_chunks", (PyCFunction)TraceReader_write_view_chunks,
2634
     METH_VARARGS | METH_KEYWORDS,
2635
     "Write multiple chunks to Arrow IPC files in parallel.\n"
2636
     "\n"
2637
     "All chunks are processed concurrently on the Runtime thread pool.\n"
2638
     "\n"
2639
     "Args:\n"
2640
     "    chunks (list): List of dicts with checkpoint_idx, start_byte, "
2641
     "end_byte.\n"
2642
     "    output_dir (str): Directory for output Arrow IPC files.\n"
2643
     "    view (str or dict): View definition.\n"
2644
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
2645
     "    batch_size (int): Events per batch (default 10000).\n"
2646
     "\n"
2647
     "Returns:\n"
2648
     "    dict: results list, total_rows, total_events_matched.\n"},
2649
#endif
2650
    {"get_max_bytes", (PyCFunction)TraceReader_get_max_bytes, METH_NOARGS,
2651
     "Get the maximum byte position (0 if unknown for compressed\n"
2652
     "files without index)."},
2653
    {"get_num_lines", (PyCFunction)TraceReader_get_num_lines, METH_NOARGS,
2654
     "Get the total number of lines (0 if unknown for files without\n"
2655
     "index)."},
2656
    {"__enter__", (PyCFunction)TraceReader_enter, METH_NOARGS,
2657
     "Enter the runtime context for the with statement."},
2658
    {"__exit__", (PyCFunction)TraceReader_exit, METH_VARARGS,
2659
     "Exit the runtime context for the with statement.\n"
2660
     "\n"
2661
     "TraceReader does not own the shared RocksDB instance for an index path;\n"
2662
     "any shared DB lifetime remains manager-owned on the native side."},
2663
    {NULL}};
2664

2665
static PyGetSetDef TraceReader_getsetters[] = {
2666
    {"path", (getter)TraceReader_get_file_path, NULL,
2667
     "Path to the trace file or directory", NULL},
2668
    {"index_dir", (getter)TraceReader_get_index_dir, NULL,
2669
     "Directory for index files", NULL},
2670
    {"has_index", (getter)TraceReader_get_has_index, NULL,
2671
     "True if a checkpoint index was found", NULL},
2672
    {"num_lines", (getter)TraceReader_get_num_lines_prop, NULL,
2673
     "Total line count (reads all lines if needed)", NULL},
2674
    {NULL}};
2675

2676
PyTypeObject TraceReaderType = {
2677
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.TraceReader",
2678
    sizeof(TraceReaderObject),                /* tp_basicsize */
2679
    0,                                        /* tp_itemsize */
2680
    (destructor)TraceReader_dealloc,          /* tp_dealloc */
2681
    0,                                        /* tp_vectorcall_offset */
2682
    0,                                        /* tp_getattr */
2683
    0,                                        /* tp_setattr */
2684
    0,                                        /* tp_as_async */
2685
    0,                                        /* tp_repr */
2686
    0,                                        /* tp_as_number */
2687
    0,                                        /* tp_as_sequence */
2688
    0,                                        /* tp_as_mapping */
2689
    0,                                        /* tp_hash */
2690
    0,                                        /* tp_call */
2691
    0,                                        /* tp_str */
2692
    0,                                        /* tp_getattro */
2693
    0,                                        /* tp_setattro */
2694
    0,                                        /* tp_as_buffer */
2695
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
2696
    "TraceReader(file_path: str, index_dir: str = '',\n"
2697
    "            checkpoint_size: int = 33554432,\n"
2698
    "            auto_build_index: bool = False,\n"
2699
    "            runtime: Runtime | None = None)\n"
2700
    "--\n"
2701
    "\n"
2702
    "Smart trace file reader that auto-selects sequential or indexed\n"
2703
    "reading based on whether a ``.dftindex`` store exists.\n"
2704
    "\n"
2705
    "Args:\n"
2706
    "    file_path (str): Path to the trace file (.pfw.gz or plain "
2707
    "text).\n"
2708
    "    index_dir (str): Directory to search for ``.dftindex`` "
2709
    "stores.\n"
2710
    "        Empty string (default) searches next to the trace file.\n"
2711
    "    checkpoint_size (int): Checkpoint interval in bytes for index\n"
2712
    "        building (default 32 MB).\n"
2713
    "    auto_build_index (bool): If True, automatically build an "
2714
    "index\n"
2715
    "        when none exists.\n"
2716
    "    runtime (Runtime or None): Runtime instance for thread pool "
2717
    "control.\n"
2718
    "        If None, uses the default global Runtime.\n"
2719
    "\n"
2720
    "Raises:\n"
2721
    "    RuntimeError: If *file_path* does not exist or cannot be "
2722
    "opened.\n",                /* tp_doc */
2723
    0,                          /* tp_traverse */
2724
    0,                          /* tp_clear */
2725
    0,                          /* tp_richcompare */
2726
    0,                          /* tp_weaklistoffset */
2727
    0,                          /* tp_iter */
2728
    0,                          /* tp_iternext */
2729
    TraceReader_methods,        /* tp_methods */
2730
    0,                          /* tp_members */
2731
    TraceReader_getsetters,     /* tp_getset */
2732
    0,                          /* tp_base */
2733
    0,                          /* tp_dict */
2734
    0,                          /* tp_descr_get */
2735
    0,                          /* tp_descr_set */
2736
    0,                          /* tp_dictoffset */
2737
    (initproc)TraceReader_init, /* tp_init */
2738
    0,                          /* tp_alloc */
2739
    TraceReader_new,            /* tp_new */
2740
};
2741

2742
int init_trace_reader(PyObject *m) {
2✔
2743
    if (register_type(m, &TraceReaderType, "TraceReader") < 0) return -1;
2✔
2744

2745
    return 0;
2✔
2746
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc