• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28308562767

28 Jun 2026 02:14AM UTC coverage: 52.331% (-0.03%) from 52.356%
28308562767

Pull #79

github

web-flow
Merge e24fb36f6 into 8eb383f39
Pull Request #79: Add Valgrind memory checking (C++, Python, MPI) and fix the bugs it found

37489 of 93043 branches covered (40.29%)

Branch coverage included in aggregate %.

129 of 144 new or added lines in 11 files covered. (89.58%)

17 existing lines in 7 files now uncovered.

33724 of 43039 relevant lines covered (78.36%)

20348.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.51
/src/dftracer/utils/python/trace_reader.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <Python.h>
3
#include <dftracer/utils/core/common/config.h>
4
#include <dftracer/utils/core/common/filesystem.h>
5
#include <dftracer/utils/core/common/memory_budget.h>
6
#include <dftracer/utils/core/coro/channel.h>
7
#include <dftracer/utils/core/coro/task.h>
8
#include <dftracer/utils/core/coro/when_all.h>
9
#include <dftracer/utils/core/tasks/coro_scope.h>
10
#include <dftracer/utils/core/utils/string.h>
11
#include <dftracer/utils/python/arrow_helpers.h>
12
#include <dftracer/utils/python/batch_byte_size.h>
13
#include <dftracer/utils/python/json.h>
14
#include <dftracer/utils/python/py_dict_helpers.h>
15
#include <dftracer/utils/python/runtime.h>
16
#include <dftracer/utils/python/trace_reader.h>
17
#include <dftracer/utils/python/trace_reader_iterator.h>
18
#include <dftracer/utils/utilities/common/query/query.h>
19
#include <dftracer/utils/utilities/composites/dft/indexing/chunk_pruner_utility.h>
20
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
21
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
22
#include <dftracer/utils/utilities/indexer/index_database.h>
23
#include <dftracer/utils/utilities/indexer/internal/helpers.h>
24
#include <dftracer/utils/utilities/reader/trace_reader.h>
25

26
#include <algorithm>
27
#include <cctype>
28
#include <cstddef>
29
#include <cstdio>
30
#include <cstring>
31
#include <exception>
32
#include <memory>
33
#include <string>
34
#include <unordered_map>
35
#include <vector>
36
#ifdef DFTRACER_UTILS_ENABLE_ARROW
37
#include <dftracer/utils/python/arrow_stream_capsule.h>
38
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
39
#include <dftracer/utils/utilities/common/json/parser.h>
40
#endif
41
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
42
#include <dftracer/utils/utilities/common/arrow/ipc_writer.h>
43
#include <dftracer/utils/utilities/common/arrow/partition_writer.h>
44
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
45
#include <dftracer/utils/utilities/composites/dft/metadata_collector_utility.h>
46
#include <dftracer/utils/utilities/composites/dft/views/view_builder_utility.h>
47
#include <dftracer/utils/utilities/composites/dft/views/view_definition.h>
48
#include <dftracer/utils/utilities/composites/dft/views/view_reader_utility.h>
49
#endif
50

51
namespace {
52

53
using dftracer::utils::CoroScope;
54
using dftracer::utils::Runtime;
55
using dftracer::utils::coro::CoroTask;
56
using dftracer::utils::coro::when_all;
57
using dftracer::utils::utilities::filesystem::PatternDirectoryScannerUtility;
58
using dftracer::utils::utilities::filesystem::
59
    PatternDirectoryScannerUtilityInput;
60
using dftracer::utils::utilities::reader::ReadConfig;
61
using dftracer::utils::utilities::reader::TraceReader;
62
using dftracer::utils::utilities::reader::TraceReaderConfig;
63
#ifdef DFTRACER_UTILS_ENABLE_ARROW
64
using dftracer::utils::utilities::common::arrow::ColumnType;
65
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
66
using dftracer::utils::utilities::common::json::JsonParser;
67
using dftracer::utils::utilities::common::json::JsonValueHelper;
68
#endif
69
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
70
using dftracer::utils::utilities::common::arrow::IpcCompression;
71
using dftracer::utils::utilities::common::arrow::PartitionWriter;
72
using dftracer::utils::utilities::common::arrow::PartitionWriteStats;
73
using dftracer::utils::utilities::composites::dft::MetadataCollectorUtility;
74
using dftracer::utils::utilities::composites::dft::
75
    MetadataCollectorUtilityInput;
76
using dftracer::utils::utilities::composites::dft::views::ViewBuilderInput;
77
using dftracer::utils::utilities::composites::dft::views::ViewBuilderUtility;
78
using dftracer::utils::utilities::composites::dft::views::ViewDefinition;
79
using dftracer::utils::utilities::composites::dft::views::ViewReaderInput;
80
using dftracer::utils::utilities::composites::dft::views::ViewReaderUtility;
81
#endif
82

83
using dftracer::utils::python::MemoryViewBatchData;
84
using dftracer::utils::python::MemoryViewBatchIteratorState;
85

86
CoroTask<void> produce_lines_batched(
4,536!
87
    std::shared_ptr<MemoryViewBatchIteratorState> state,
88
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer,
89
    TraceReaderConfig cfg, ReadConfig rc, std::size_t batch_size) {
56!
90
    auto guard = producer.guard();
56!
91
    try {
92
        TraceReader reader(std::move(cfg));
56!
93
        auto gen = reader.read_lines(rc);
56!
94
        MemoryViewBatchData batch;
56✔
95
        std::size_t count = 0;
56✔
96

97
        while (auto opt = co_await gen.next()) {
4,062!
98
            if (state->cancelled.load(std::memory_order_acquire)) break;
946!
99
            auto sv = opt->content;
946✔
100
            Py_ssize_t offset = static_cast<Py_ssize_t>(batch.buffer.size());
946✔
101
            batch.buffer.insert(batch.buffer.end(), sv.begin(), sv.end());
946!
102
            batch.offsets.push_back(offset);
946!
103
            batch.lengths.push_back(static_cast<Py_ssize_t>(sv.size()));
946!
104
            ++count;
946✔
105

106
            if (count >= batch_size) {
946!
107
                auto batch_bytes = dftracer::utils::python::byte_size(batch);
×
108
                state->bytes_in_queue.fetch_add(batch_bytes,
109
                                                std::memory_order_acq_rel);
110
                if (!co_await producer.send(std::move(batch))) break;
×
111
                batch = MemoryViewBatchData{};
112
                count = 0;
113
            }
×
114
        }
1,002!
115
        if (count > 0 && !state->cancelled.load(std::memory_order_acquire)) {
153✔
116
            auto batch_bytes = dftracer::utils::python::byte_size(batch);
156!
117
            state->bytes_in_queue.fetch_add(batch_bytes,
156✔
118
                                            std::memory_order_acq_rel);
119
            co_await producer.send(std::move(batch));
208!
120
        }
52!
121
    } catch (...) {
2,358✔
122
        state->set_error(std::current_exception());
1!
123
    }
1!
124
}
6,481!
125

126
CoroTask<void> produce_raw_batched(
100!
127
    std::shared_ptr<MemoryViewBatchIteratorState> state,
128
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer,
129
    TraceReaderConfig cfg, ReadConfig rc) {
10!
130
    auto guard = producer.guard();
10!
131
    try {
132
        TraceReader reader(std::move(cfg));
10!
133
        auto gen = reader.read_raw(rc);
10!
134
        while (auto opt = co_await gen.next()) {
642!
135
            if (state->cancelled.load(std::memory_order_acquire)) break;
445!
136
            MemoryViewBatchData batch;
445✔
137
            batch.buffer.assign(opt->data(), opt->data() + opt->size());
445!
138
            batch.offsets.push_back(0);
445!
139
            batch.lengths.push_back(static_cast<Py_ssize_t>(opt->size()));
445!
140
            auto batch_bytes = dftracer::utils::python::byte_size(batch);
445✔
141
            state->bytes_in_queue.fetch_add(batch_bytes,
445✔
142
                                            std::memory_order_acq_rel);
143
            if (!co_await producer.send(std::move(batch))) break;
593!
144
        }
454✔
145
    } catch (...) {
30✔
146
        state->set_error(std::current_exception());
×
147
    }
×
148
}
1,274!
149

150
using dftracer::utils::utilities::common::json::JsonParser;
151
using dftracer::utils::utilities::common::json::JsonValueHelper;
152

153
static constexpr std::size_t ESTIMATED_BYTES_PER_LINE = 256;
154
static constexpr std::size_t ESTIMATED_BYTES_PER_RAW_CHUNK = 4 * 1024 * 1024;
155
static constexpr std::size_t ESTIMATED_BYTES_PER_JSON_EVENT = 512;
156
static constexpr std::size_t ESTIMATED_BYTES_PER_ARROW_ROW = 1024;
157

158
static void insert_simdjson_value(ArgsMap &map, std::string_view key,
4,308✔
159
                                  simdjson::ondemand::value val) {
160
    auto type = val.type();
4,308✔
161
    if (type.error()) return;
4,301!
162
    switch (type.value_unsafe()) {
4,299!
163
        case simdjson::ondemand::json_type::string: {
954✔
164
            auto r = val.get_string();
1,915✔
165
            if (!r.error()) map.insert(key, std::string(r.value_unsafe()));
1,918!
166
            break;
1,918✔
167
        }
168
        case simdjson::ondemand::json_type::number: {
1,190✔
169
            auto ri = val.get_int64();
2,389✔
170
            if (!ri.error()) {
2,391✔
171
                auto v = ri.value_unsafe();
2,391✔
172
                if (v >= 0)
2,391✔
173
                    map.insert(key, static_cast<std::uint64_t>(v));
2,390!
174
                else
175
                    map.insert(key, v);
1!
176
            } else {
1,199✔
177
                auto rd = val.get_double();
×
178
                if (!rd.error()) map.insert(key, rd.value_unsafe());
×
179
            }
180
            break;
2,394✔
181
        }
182
        case simdjson::ondemand::json_type::boolean: {
183
            auto r = val.get_bool();
×
184
            if (!r.error()) map.insert(key, r.value_unsafe());
×
185
            break;
×
186
        }
187
        default:
188
            break;
×
189
    }
190
}
2,158✔
191

192
static void parse_json_to_event(JsonParser &parser, JsonDictEvent &ev) {
480✔
193
    ev.top.set_valid(true);
480✔
194
    parser.for_each_field(
718!
195
        [&](std::string_view key, simdjson::ondemand::value val) {
4,065✔
196
            if (key == "args") {
3,825✔
197
                auto obj = val.get_object();
480✔
198
                if (!obj.error()) {
479!
199
                    ev.args.set_valid(true);
479✔
200
                    for (auto field : obj.value_unsafe()) {
1,438✔
201
                        if (field.error()) continue;
959!
202
                        auto fkey = field.unescaped_key();
957✔
203
                        if (fkey.error()) continue;
958!
204
                        auto fval = field.value();
958✔
205
                        if (fval.error()) continue;
958!
206
                        insert_simdjson_value(ev.args, fkey.value_unsafe(),
1,439!
207
                                              fval.value_unsafe());
959✔
208
                    }
209
                }
240✔
210
            } else {
240✔
211
                insert_simdjson_value(ev.top, key, val);
3,348✔
212
            }
213
        });
3,829✔
214
}
479✔
215

216
CoroTask<void> produce_json_dicts(
134!
217
    std::shared_ptr<JsonDictIteratorState> state,
218
    dftracer::utils::coro::ChannelProducer<JsonDictBatch> producer,
219
    TraceReaderConfig cfg, ReadConfig rc, std::size_t batch_size) {
1!
220
    auto guard = producer.guard();
1!
221
    try {
222
        TraceReader reader(std::move(cfg));
1!
223
        auto gen = reader.read_json(rc);
1!
224
        JsonDictBatch batch;
1✔
225
        batch.events.reserve(batch_size);
1!
226

227
        while (auto opt = co_await gen.next()) {
125!
228
            if (state->cancelled.load(std::memory_order_acquire)) break;
30!
229

230
            JsonDictEvent ev;
30!
231
            parse_json_to_event(*opt->parser, ev);
30!
232
            batch.events.push_back(std::move(ev));
30!
233

234
            if (batch.events.size() >= batch_size) {
30!
235
                auto batch_bytes = dftracer::utils::python::byte_size(batch);
×
236
                state->bytes_in_queue.fetch_add(batch_bytes,
237
                                                std::memory_order_acq_rel);
238
                if (!co_await producer.send(std::move(batch))) break;
×
239
                batch = JsonDictBatch{};
240
                batch.events.reserve(batch_size);
×
241
            }
×
242
        }
31!
243
        if (!batch.events.empty() &&
3✔
244
            !state->cancelled.load(std::memory_order_acquire)) {
1✔
245
            auto batch_bytes = dftracer::utils::python::byte_size(batch);
3!
246
            state->bytes_in_queue.fetch_add(batch_bytes,
3✔
247
                                            std::memory_order_acq_rel);
248
            co_await producer.send(std::move(batch));
4!
249
        }
1!
250
    } catch (...) {
69✔
251
        state->set_error(std::current_exception());
×
252
    }
×
253
}
195!
254

255
static CoroTask<void> send_files_to_channel(
144!
256
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
257
    const std::vector<std::string> *files, std::atomic<bool> *cancelled) {
8!
258
    for (const auto &fp : *files) {
128✔
259
        if (cancelled->load(std::memory_order_acquire)) break;
72!
260
        if (!co_await file_chan->send(fp)) break;
104!
261
    }
24✔
262
    file_chan->close();
8!
263
    co_return;
8✔
264
}
112!
265

266
static CoroTask<void> json_dict_file_worker(
970!
267
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
268
    dftracer::utils::coro::Channel<JsonDictBatch> *out_chan,
269
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
270
    ReadConfig rc, std::size_t batch_size, std::atomic<bool> *cancelled) {
9!
271
    dftracer::utils::coro::ChannelProducer<JsonDictBatch> producer(out_chan);
9!
272
    auto guard = producer.guard();
9!
273

274
    while (auto file_path = co_await file_chan->receive()) {
59!
275
        if (cancelled->load(std::memory_order_acquire)) co_return;
14!
276
        TraceReaderConfig cfg;
14✔
277
        cfg.file_path = std::move(*file_path);
14✔
278
        cfg.index_dir = index_dir;
14!
279
        cfg.checkpoint_size = checkpoint_size;
14✔
280
        cfg.auto_build_index = auto_build_index;
14✔
281

282
        TraceReader reader(std::move(cfg));
14!
283
        auto gen = reader.read_json(rc);
14!
284
        JsonDictBatch batch;
14✔
285
        batch.events.reserve(batch_size);
14!
286

287
        while (auto opt = co_await gen.next()) {
896!
288
            if (cancelled->load(std::memory_order_acquire)) co_return;
210!
289
            JsonDictEvent ev;
210!
290
            parse_json_to_event(*opt->parser, ev);
210!
291
            batch.events.push_back(std::move(ev));
210!
292
            if (batch.events.size() >= batch_size) {
210!
293
                if (!co_await producer.send(std::move(batch))) co_return;
×
294
                batch = JsonDictBatch{};
295
                batch.events.reserve(batch_size);
×
296
            }
297
        }
224!
298
        if (!batch.events.empty()) {
42!
299
            if (!co_await producer.send(std::move(batch))) co_return;
56!
300
        }
14✔
301
    }
499✔
302
    co_return;
9✔
303
}
1,473!
304

305
static CoroTask<void> spawn_json_dict_producers(
30!
306
    CoroScope &child, dftracer::utils::coro::Channel<JsonDictBatch> *out_chan,
307
    const std::vector<std::string> *files, const std::string *index_dir,
308
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
309
    std::size_t batch_size, std::atomic<bool> *cancelled_ptr,
310
    std::size_t max_workers) {
5!
311
    std::size_t num_workers = std::min(files->size(), max_workers);
5!
312
    auto file_chan =
5✔
313
        dftracer::utils::coro::make_channel<std::string>(num_workers);
5!
314

315
    for (std::size_t i = 0; i < num_workers; ++i) {
14✔
316
        child.spawn([out_chan, fc = file_chan, idx = *index_dir,
27!
317
                     checkpoint_size, auto_build_index, r = *rc, batch_size,
27!
318
                     cancelled_ptr](CoroScope &) {
36✔
319
            return json_dict_file_worker(fc, out_chan, idx, checkpoint_size,
27!
320
                                         auto_build_index, r, batch_size,
18!
321
                                         cancelled_ptr);
18!
322
        });
323
    }
9✔
324

325
    child.spawn([fc = file_chan, files, cancelled_ptr](CoroScope &) {
15!
326
        return send_files_to_channel(fc, files, cancelled_ptr);
10!
327
    });
328
    co_return;
10✔
329
}
15!
330

331
static CoroTask<void> produce_json_dicts_parallel(
68!
332
    CoroScope &scope, JsonDictIteratorState *sp, std::string dir_path,
333
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
334
    ReadConfig rc, std::size_t batch_size, std::size_t max_workers) {
6!
335
    try {
336
        PatternDirectoryScannerUtility scanner;
18!
337
        auto scan_input = PatternDirectoryScannerUtilityInput(
36!
338
            dir_path, {".pfw", ".pfw.gz"}, true, false);
18!
339
        auto entries = co_await scope.spawn(scanner, scan_input);
30!
340

341
        std::vector<std::string> files;
16✔
342
        files.reserve(entries.size());
16✔
343
        for (auto &e : entries) files.push_back(e.path.string());
20!
344
        std::sort(files.begin(), files.end());
6✔
345

346
        if (files.empty()) {
16✔
347
            sp->channel->close();
1!
348
            co_return;
1✔
349
        }
350

351
        auto *chan_ptr = sp->channel.get();
15✔
352
        auto *cancelled_ptr = &sp->cancelled;
15✔
353

354
        co_await scope.scope([chan_ptr, &files, &index_dir, checkpoint_size,
120!
355
                              auto_build_index, &rc, batch_size, cancelled_ptr,
45✔
356
                              max_workers](CoroScope &child) -> CoroTask<void> {
20!
357
            co_await spawn_json_dict_producers(
40!
358
                child, chan_ptr, &files, &index_dir, checkpoint_size,
15✔
359
                auto_build_index, &rc, batch_size, cancelled_ptr, max_workers);
15✔
360
        });
20!
361
    } catch (...) {
16✔
362
        sp->set_error(std::current_exception());
×
363
    }
×
364
}
78!
365

366
static CoroTask<void> lines_file_worker(
552!
367
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
368
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
369
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
370
    ReadConfig rc, std::size_t batch_size, std::atomic<bool> *cancelled) {
4!
371
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer(
8!
372
        out_chan);
4✔
373
    auto guard = producer.guard();
4!
374

375
    while (auto file_path = co_await file_chan->receive()) {
27!
376
        if (cancelled->load(std::memory_order_acquire)) co_return;
5!
377
        TraceReaderConfig cfg;
5✔
378
        cfg.file_path = std::move(*file_path);
5✔
379
        cfg.index_dir = index_dir;
5!
380
        cfg.checkpoint_size = checkpoint_size;
5✔
381
        cfg.auto_build_index = auto_build_index;
5✔
382

383
        TraceReader reader(std::move(cfg));
5!
384
        auto gen = reader.read_lines(rc);
5!
385
        MemoryViewBatchData batch;
5✔
386
        std::size_t count = 0;
5✔
387

388
        while (auto opt = co_await gen.next()) {
508!
389
            if (cancelled->load(std::memory_order_acquire)) co_return;
120!
390
            auto sv = opt->content;
120✔
391
            Py_ssize_t offset = static_cast<Py_ssize_t>(batch.buffer.size());
120✔
392
            batch.buffer.insert(batch.buffer.end(), sv.begin(), sv.end());
120!
393
            batch.offsets.push_back(offset);
120!
394
            batch.lengths.push_back(static_cast<Py_ssize_t>(sv.size()));
120!
395
            ++count;
120✔
396
            if (count >= batch_size) {
120!
397
                if (!co_await producer.send(std::move(batch))) co_return;
×
398
                batch = MemoryViewBatchData{};
399
                count = 0;
400
            }
401
        }
127!
402
        if (count > 0) {
21!
403
            if (!co_await producer.send(std::move(batch))) co_return;
28!
404
        }
7✔
405
    }
283✔
406
    co_return;
4✔
407
}
828!
408

409
static CoroTask<void> spawn_lines_producers(
12!
410
    CoroScope &child,
411
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
412
    const std::vector<std::string> *files, const std::string *index_dir,
413
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
414
    std::size_t batch_size, std::atomic<bool> *cancelled_ptr,
415
    std::size_t max_workers) {
2!
416
    std::size_t num_workers = std::min(files->size(), max_workers);
2!
417
    auto file_chan =
2✔
418
        dftracer::utils::coro::make_channel<std::string>(num_workers);
2!
419

420
    for (std::size_t i = 0; i < num_workers; ++i) {
6✔
421
        child.spawn([out_chan, fc = file_chan, idx = *index_dir,
12!
422
                     checkpoint_size, auto_build_index, r = *rc, batch_size,
12!
423
                     cancelled_ptr](CoroScope &) {
16✔
424
            return lines_file_worker(fc, out_chan, idx, checkpoint_size,
12!
425
                                     auto_build_index, r, batch_size,
8!
426
                                     cancelled_ptr);
8!
427
        });
428
    }
4✔
429

430
    child.spawn([fc = file_chan, files, cancelled_ptr](CoroScope &) {
6!
431
        return send_files_to_channel(fc, files, cancelled_ptr);
4!
432
    });
433
    co_return;
4✔
434
}
6!
435

436
static CoroTask<void> produce_lines_parallel(
32!
437
    CoroScope &scope, MemoryViewBatchIteratorState *sp, std::string dir_path,
438
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
439
    ReadConfig rc, std::size_t batch_size, std::size_t max_workers) {
3!
440
    try {
441
        PatternDirectoryScannerUtility scanner;
9!
442
        auto scan_input = PatternDirectoryScannerUtilityInput(
18!
443
            dir_path, {".pfw", ".pfw.gz"}, true, false);
9!
444
        auto entries = co_await scope.spawn(scanner, scan_input);
15!
445

446
        std::vector<std::string> files;
7✔
447
        files.reserve(entries.size());
7✔
448
        for (auto &e : entries) files.push_back(e.path.string());
10!
449
        std::sort(files.begin(), files.end());
3✔
450

451
        if (files.empty()) {
7✔
452
            sp->channel->close();
1!
453
            co_return;
1✔
454
        }
455

456
        auto *chan_ptr = sp->channel.get();
6✔
457
        auto *cancelled_ptr = &sp->cancelled;
6✔
458

459
        co_await scope.scope([chan_ptr, &files, &index_dir, checkpoint_size,
48!
460
                              auto_build_index, &rc, batch_size, cancelled_ptr,
18✔
461
                              max_workers](CoroScope &child) -> CoroTask<void> {
8!
462
            co_await spawn_lines_producers(
16!
463
                child, chan_ptr, &files, &index_dir, checkpoint_size,
6✔
464
                auto_build_index, &rc, batch_size, cancelled_ptr, max_workers);
6✔
465
        });
8!
466
    } catch (...) {
7✔
467
        sp->set_error(std::current_exception());
×
468
    }
×
469
}
36!
470

471
static CoroTask<void> raw_file_worker(
16!
472
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
473
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
474
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
475
    ReadConfig rc, std::atomic<bool> *cancelled) {
2!
476
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer(
4!
477
        out_chan);
2✔
478
    auto guard = producer.guard();
2!
479

480
    while (auto file_path = co_await file_chan->receive()) {
13!
481
        if (cancelled->load(std::memory_order_acquire)) co_return;
3!
482
        TraceReaderConfig cfg;
3✔
483
        cfg.file_path = std::move(*file_path);
3✔
484
        cfg.index_dir = index_dir;
3!
485
        cfg.checkpoint_size = checkpoint_size;
3✔
486
        cfg.auto_build_index = auto_build_index;
3✔
487

488
        TraceReader reader(std::move(cfg));
3!
489
        auto gen = reader.read_raw(rc);
3!
490
        while (auto opt = co_await gen.next()) {
252!
491
            if (cancelled->load(std::memory_order_acquire)) co_return;
180!
492
            MemoryViewBatchData batch;
180✔
493
            batch.buffer.assign(opt->data(), opt->data() + opt->size());
180!
494
            batch.offsets.push_back(0);
180!
495
            batch.lengths.push_back(static_cast<Py_ssize_t>(opt->size()));
180!
496
            if (!co_await producer.send(std::move(batch))) co_return;
240!
497
        }
183!
498
    }
11✔
499
    co_return;
2✔
500
}
508!
501

502
static CoroTask<void> spawn_raw_producers(
6!
503
    CoroScope &child,
504
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
505
    const std::vector<std::string> *files, const std::string *index_dir,
506
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
507
    std::atomic<bool> *cancelled_ptr, std::size_t max_workers) {
1!
508
    std::size_t num_workers = std::min(files->size(), max_workers);
1!
509
    auto file_chan =
1✔
510
        dftracer::utils::coro::make_channel<std::string>(num_workers);
1!
511

512
    for (std::size_t i = 0; i < num_workers; ++i) {
3✔
513
        child.spawn([out_chan, fc = file_chan, idx = *index_dir,
6!
514
                     checkpoint_size, auto_build_index, r = *rc,
6!
515
                     cancelled_ptr](CoroScope &) {
8✔
516
            return raw_file_worker(fc, out_chan, idx, checkpoint_size,
6!
517
                                   auto_build_index, r, cancelled_ptr);
4!
518
        });
519
    }
2✔
520

521
    child.spawn([fc = file_chan, files, cancelled_ptr](CoroScope &) {
3!
522
        return send_files_to_channel(fc, files, cancelled_ptr);
2!
523
    });
524
    co_return;
2✔
525
}
3!
526

527
static CoroTask<void> produce_raw_parallel(
20!
528
    CoroScope &scope, MemoryViewBatchIteratorState *sp, std::string dir_path,
529
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
530
    ReadConfig rc, std::size_t max_workers) {
2!
531
    try {
532
        PatternDirectoryScannerUtility scanner;
6!
533
        auto scan_input = PatternDirectoryScannerUtilityInput(
12!
534
            dir_path, {".pfw", ".pfw.gz"}, true, false);
6!
535
        auto entries = co_await scope.spawn(scanner, scan_input);
10!
536

537
        std::vector<std::string> files;
4✔
538
        files.reserve(entries.size());
4✔
539
        for (auto &e : entries) files.push_back(e.path.string());
5!
540
        std::sort(files.begin(), files.end());
2✔
541

542
        if (files.empty()) {
4✔
543
            sp->channel->close();
1!
544
            co_return;
1✔
545
        }
546

547
        auto *chan_ptr = sp->channel.get();
3✔
548
        auto *cancelled_ptr = &sp->cancelled;
3✔
549

550
        co_await scope.scope([chan_ptr, &files, &index_dir, checkpoint_size,
21!
551
                              auto_build_index, &rc, cancelled_ptr,
6✔
552
                              max_workers](CoroScope &child) -> CoroTask<void> {
4!
553
            co_await spawn_raw_producers(child, chan_ptr, &files, &index_dir,
8!
554
                                         checkpoint_size, auto_build_index, &rc,
3✔
555
                                         cancelled_ptr, max_workers);
3✔
556
        });
4!
557
    } catch (...) {
4✔
558
        sp->set_error(std::current_exception());
×
559
    }
×
560
}
22!
561

562
#ifdef DFTRACER_UTILS_ENABLE_ARROW
563

564
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
565
using dftracer::utils::utilities::common::arrow::ColumnType;
566
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
567

568
// Bump arena for string_views that must survive until builder.finish().
569
struct StringArena {
570
    static constexpr std::size_t BLOCK_SIZE = 64 * 1024;
571
    std::vector<std::vector<char>> blocks;
572
    std::size_t pos = 0;
573

574
    StringArena() { blocks.emplace_back(BLOCK_SIZE); }
×
575

576
    std::string_view push(const char *data, std::size_t len) {
×
577
        if (pos + len > blocks.back().size()) {
×
578
            blocks.emplace_back(std::max(BLOCK_SIZE, len));
×
579
            pos = 0;
×
580
        }
581
        char *dst = blocks.back().data() + pos;
×
582
        std::memcpy(dst, data, len);
×
583
        pos += len;
×
584
        return {dst, len};
×
585
    }
586

587
    void clear() {
×
588
        if (blocks.size() > 1) blocks.resize(1);
×
589
        pos = 0;
×
590
    }
×
591
};
592

593
// --- Row type constants (must match Python TYPE_* constants) ---
594
enum RowType : int8_t {
595
    ROW_EVENT = 0,
596
    ROW_FILE_HASH = 1,
597
    ROW_HOST_HASH = 2,
598
    ROW_STRING_HASH = 3,
599
    ROW_METADATA = 4,
600
    ROW_PROC_METADATA = 5,
601
    ROW_PROFILE = 6,
602
    ROW_SYSTEM = 7,
603
};
604

605
// --- IO category constants (must match Python IOCategory values) ---
606
enum IOCat : int8_t {
607
    IO_READ = 1,
608
    IO_WRITE = 2,
609
    IO_METADATA = 3,
610
    IO_PCTL = 4,
611
    IO_IPC = 5,
612
    IO_OTHER = 6,
613
    IO_SYNC = 7,
614
};
615

616
static int8_t get_io_cat(std::string_view func) {
×
617
    using namespace dftracer::utils::utilities::composites::dft::internal;
618
    for (auto op : posix_ops::READ)
×
619
        if (op == func) return IO_READ;
×
620
    for (auto op : posix_ops::WRITE)
×
621
        if (op == func) return IO_WRITE;
×
622
    for (auto op : posix_ops::SYNC)
×
623
        if (op == func) return IO_SYNC;
×
624
    for (auto op : posix_ops::PCTL)
×
625
        if (op == func) return IO_PCTL;
×
626
    for (auto op : posix_ops::IPC)
×
627
        if (op == func) return IO_IPC;
×
628
    for (auto op : posix_ops::METADATA)
×
629
        if (op == func) return IO_METADATA;
×
630
    return IO_OTHER;
×
631
}
632

633
static bool str_iequal(std::string_view a, const char *b) {
×
634
    std::size_t len = std::strlen(b);
×
635
    if (a.size() != len) return false;
×
636
    for (std::size_t i = 0; i < len; ++i) {
×
637
        if (std::tolower(static_cast<unsigned char>(a[i])) !=
×
638
            static_cast<unsigned char>(b[i]))
×
639
            return false;
×
640
    }
641
    return true;
×
642
}
643

644
static bool str_contains_lower(std::string_view s, const char *needle) {
×
645
    std::size_t nlen = std::strlen(needle);
×
646
    if (s.size() < nlen) return false;
×
647
    for (std::size_t i = 0; i <= s.size() - nlen; ++i) {
×
648
        bool match = true;
×
649
        for (std::size_t j = 0; j < nlen; ++j) {
×
650
            if (std::tolower(static_cast<unsigned char>(s[i + j])) !=
×
651
                static_cast<unsigned char>(needle[j])) {
×
652
                match = false;
×
653
                break;
×
654
            }
655
        }
656
        if (match) return true;
×
657
    }
658
    return false;
×
659
}
660

661
// Normalize a raw JSON row (parsed with simdjson) into the semantic
662
// output schema.  Appends one row to `builder` with the full set of output
663
// columns.  Returns false if the row should be skipped (no valid name).
664
static bool normalize_row(RecordBatchBuilder &builder, StringArena &arena,
×
665
                          JsonParser &parser) {
666
    using SVH = JsonValueHelper;
667

668
    // --- Extract top-level fields ---
669
    auto ph = parser.get_string("ph").value_or(std::string_view{});
×
670
    auto name_sv = parser.get_string("name").value_or(std::string_view{});
×
671
    auto cat_sv = parser.get_string("cat").value_or(std::string_view{});
×
672
    auto pid_opt = parser.get_int64("pid");
×
673
    auto tid_opt = parser.get_int64("tid");
×
674
    auto ts_opt = parser.get_int64("ts");
×
675
    auto dur_opt = parser.get_int64("dur");
×
676

677
    // Helper lambdas to access args fields (need to rewind after each access)
678
    // We'll do a single pass over args instead
679
    std::optional<std::string_view> args_name, args_value, args_hhash,
×
680
        args_fhash;
×
681
    std::optional<int64_t> args_epoch, args_step, args_size_sum, args_ret;
×
682
    std::optional<int64_t> args_offset, args_image_idx, args_image_size;
×
683
    std::unordered_map<std::string, int64_t> args_int_map;
×
684
    std::unordered_map<std::string, double> args_float_map;
×
685

686
    parser.rewind();
×
687
    parser.for_each_field(
×
688
        "args", [&](std::string_view key, simdjson::ondemand::value val) {
×
689
            if (key == "name") {
×
690
                if (auto s = SVH::get_string(val)) args_name = s;
×
691
            } else if (key == "value") {
×
692
                if (auto s = SVH::get_string(val)) args_value = s;
×
693
            } else if (key == "hhash") {
×
694
                if (auto s = SVH::get_string(val)) args_hhash = s;
×
695
            } else if (key == "fhash") {
×
696
                if (auto s = SVH::get_string(val)) args_fhash = s;
×
697
            } else if (key == "epoch") {
×
698
                if (auto i = SVH::get_int64(val)) args_epoch = i;
×
699
            } else if (key == "step") {
×
700
                if (auto i = SVH::get_int64(val)) args_step = i;
×
701
            } else if (key == "size_sum") {
×
702
                if (auto i = SVH::get_int64(val)) args_size_sum = i;
×
703
            } else if (key == "ret") {
×
704
                if (auto i = SVH::get_int64(val)) args_ret = i;
×
705
            } else if (key == "offset") {
×
706
                if (auto i = SVH::get_int64(val)) args_offset = i;
×
707
            } else if (key == "image_idx") {
×
708
                if (auto i = SVH::get_int64(val)) args_image_idx = i;
×
709
            } else if (key == "image_size") {
×
710
                if (auto i = SVH::get_int64(val)) args_image_size = i;
×
711
            } else {
712
                // Store other int/float args for profile/sys columns
713
                if (auto i = SVH::get_int64(val)) {
×
714
                    args_int_map[std::string(key)] = *i;
×
715
                } else if (auto d = SVH::get_double(val)) {
×
716
                    args_float_map[std::string(key)] = *d;
×
717
                }
718
            }
719
        });
×
720

721
    // --- Type classification ---
722
    bool is_M = (ph == "M");
×
723
    bool is_C = (ph == "C");
×
724
    bool is_event = !is_M && !is_C;
×
725

726
    int8_t row_type = ROW_EVENT;
×
727
    if (is_M) {
×
728
        if (name_sv == "FH")
×
729
            row_type = ROW_FILE_HASH;
×
730
        else if (name_sv == "HH")
×
731
            row_type = ROW_HOST_HASH;
×
732
        else if (name_sv == "SH")
×
733
            row_type = ROW_STRING_HASH;
×
734
        else if (name_sv == "PR")
×
735
            row_type = ROW_PROC_METADATA;
×
736
        else
737
            row_type = ROW_METADATA;
×
738
    } else if (is_C) {
×
739
        row_type = str_iequal(cat_sv, "sys") ? ROW_SYSTEM : ROW_PROFILE;
×
740
    }
741
    bool is_hash = (row_type >= ROW_FILE_HASH && row_type <= ROW_STRING_HASH) ||
×
742
                   row_type == ROW_PROC_METADATA;
743
    bool is_profile = (row_type == ROW_PROFILE);
×
744
    bool is_sys = (row_type == ROW_SYSTEM);
×
745

746
    // Name: metadata rows use args.name if available
747
    std::string_view out_name = name_sv;
×
748
    if (is_M && args_name && !args_name->empty()) {
×
749
        out_name = *args_name;
×
750
    }
751
    if (out_name.empty()) return false;  // skip rows without name
×
752

753
    // --- Declare all output columns ---
754
    auto ci_type = builder.add_or_get_column("type", ColumnType::INT64);
×
755
    auto ci_cat = builder.add_or_get_column("cat", ColumnType::STRING);
×
756
    auto ci_name = builder.add_or_get_column("name", ColumnType::STRING);
×
757
    auto ci_pid = builder.add_or_get_column("pid", ColumnType::INT64);
×
758
    auto ci_tid = builder.add_or_get_column("tid", ColumnType::INT64);
×
759
    auto ci_hash = builder.add_or_get_column("hash", ColumnType::STRING);
×
760
    auto ci_value = builder.add_or_get_column("value", ColumnType::STRING);
×
761
    auto ci_host_hash =
762
        builder.add_or_get_column("host_hash", ColumnType::STRING);
×
763
    auto ci_file_hash =
764
        builder.add_or_get_column("file_hash", ColumnType::STRING);
×
765
    auto ci_epoch = builder.add_or_get_column("epoch", ColumnType::INT64);
×
766
    auto ci_step = builder.add_or_get_column("step", ColumnType::INT64);
×
767
    auto ci_ts = builder.add_or_get_column("ts", ColumnType::INT64);
×
768
    auto ci_dur = builder.add_or_get_column("dur", ColumnType::INT64);
×
769
    auto ci_te = builder.add_or_get_column("te", ColumnType::INT64);
×
770
    [[maybe_unused]] auto ci_trange =
771
        builder.add_or_get_column("trange", ColumnType::INT64);
×
772
    auto ci_io_cat = builder.add_or_get_column("io_cat", ColumnType::INT64);
×
773
    auto ci_size = builder.add_or_get_column("size", ColumnType::INT64);
×
774
    auto ci_offset = builder.add_or_get_column("offset", ColumnType::INT64);
×
775
    auto ci_image_id = builder.add_or_get_column("image_id", ColumnType::INT64);
×
776

777
    // --- Populate core columns ---
778
    builder.append_int64(ci_type, row_type);
×
779

780
    // cat (lowercased) - write into arena
781
    if (!cat_sv.empty()) {
×
782
        char lbuf[256];
783
        std::size_t clen = std::min(cat_sv.size(), sizeof(lbuf));
×
784
        for (std::size_t i = 0; i < clen; ++i)
×
785
            lbuf[i] = static_cast<char>(
×
786
                std::tolower(static_cast<unsigned char>(cat_sv[i])));
×
787
        builder.append_string(ci_cat, arena.push(lbuf, clen));
×
788
    } else {
789
        builder.append_null(ci_cat);
×
790
    }
791

792
    builder.append_string(ci_name, out_name);
×
793

794
    if (pid_opt) builder.append_int64(ci_pid, *pid_opt);
×
795
    if (tid_opt) builder.append_int64(ci_tid, *tid_opt);
×
796

797
    // hash / value
798
    if (is_hash && args_value && !args_value->empty())
×
799
        builder.append_string(ci_hash, *args_value);
×
800
    if (row_type == ROW_METADATA && args_value && !args_value->empty())
×
801
        builder.append_string(ci_value, *args_value);
×
802

803
    // host_hash / file_hash
804
    if (args_hhash && !args_hhash->empty())
×
805
        builder.append_string(ci_host_hash, *args_hhash);
×
806
    if (args_fhash && !args_fhash->empty())
×
807
        builder.append_string(ci_file_hash, *args_fhash);
×
808

809
    // epoch / step
810
    if (args_epoch && *args_epoch >= 0)
×
811
        builder.append_int64(ci_epoch, *args_epoch);
×
812
    if (args_step && *args_step >= 0) builder.append_int64(ci_step, *args_step);
×
813

814
    // --- Temporal ---
815
    bool has_ts = (is_event || is_C) && ts_opt.has_value();
×
816
    bool has_dur = dur_opt.has_value();
×
817
    int64_t ts_val = 0, dur_val = 0;
×
818
    if (has_ts) {
×
819
        ts_val = *ts_opt;
×
820
        builder.append_int64(ci_ts, ts_val);
×
821
    }
822
    if (is_event && has_ts && has_dur) {
×
823
        dur_val = *dur_opt;
×
824
        builder.append_int64(ci_dur, dur_val);
×
825
        builder.append_int64(ci_te, ts_val + dur_val);
×
826
    }
827

828
    // --- IO columns (events only) ---
829
    if (is_event) {
×
830
        bool is_posix_stdio =
831
            str_iequal(cat_sv, "posix") || str_iequal(cat_sv, "stdio");
×
832
        int8_t io_cat = IO_OTHER;
×
833

834
        // size priority: size_sum > POSIX ret > image_size
835
        if (args_size_sum) {
×
836
            builder.append_int64(ci_size, *args_size_sum);
×
837
            if (is_posix_stdio) io_cat = get_io_cat(out_name);
×
838
        } else if (is_posix_stdio) {
×
839
            io_cat = get_io_cat(out_name);
×
840
            if (args_ret && *args_ret > 0 &&
×
841
                (io_cat == IO_READ || io_cat == IO_WRITE))
×
842
                builder.append_int64(ci_size, *args_ret);
×
843
            if (args_offset && *args_offset >= 0)
×
844
                builder.append_int64(ci_offset, *args_offset);
×
845
        } else {
846
            if (args_image_idx && *args_image_idx > 0)
×
847
                builder.append_int64(ci_image_id, *args_image_idx);
×
848
            if (args_image_size && *args_image_size > 0 &&
×
849
                !str_contains_lower(out_name, "open"))
×
850
                builder.append_int64(ci_size, *args_image_size);
×
851
        }
852
        builder.append_int64(ci_io_cat, io_cat);
×
853
    }
854

855
    // --- Profile columns ---
856
    if (is_profile) {
×
857
        bool is_posix_stdio =
858
            str_iequal(cat_sv, "posix") || str_iequal(cat_sv, "stdio");
×
859
        int8_t io_cat = is_posix_stdio ? get_io_cat(out_name) : IO_OTHER;
×
860
        builder.append_int64(ci_io_cat, io_cat);
×
861

862
        static const char *profile_keys[] = {
863
            "count",      "count_max",  "count_min",  "count_sum",
864
            "dft_cnt",    "dur",        "dur_max",    "dur_min",
865
            "dur_sum",    "epoch",      "flags",      "offset",
866
            "offset_max", "offset_min", "offset_sum", "ret",
867
            "ret_max",    "ret_min",    "ret_sum",    "whence",
868
            "whence_max", "whence_min", "whence_sum", nullptr};
869
        for (const char **pk = profile_keys; *pk; ++pk) {
×
870
            auto it = args_int_map.find(*pk);
×
871
            if (it != args_int_map.end()) {
×
872
                auto idx = builder.add_or_get_column(*pk, ColumnType::INT64);
×
873
                builder.append_int64(idx, it->second);
×
874
            }
875
        }
876
    }
877

878
    // --- System columns ---
879
    if (is_sys) {
×
880
        static const char *sys_keys[] = {
881
            "user_pct", "system_pct",  "iowait_pct",   "idle_pct",
882
            "irq_pct",  "softirq_pct", "MemAvailable", "MemFree",
883
            "Cached",   "Dirty",       "Active",       nullptr};
884
        for (const char **sk = sys_keys; *sk; ++sk) {
×
885
            auto it = args_float_map.find(*sk);
×
886
            if (it != args_float_map.end()) {
×
887
                auto idx = builder.add_or_get_column(*sk, ColumnType::DOUBLE);
×
888
                builder.append_double(idx, it->second);
×
889
            }
890
        }
891
    }
892

893
    builder.end_row();
×
894
    return true;
×
895
}
×
896

897
// Flatten a simdjson object into "prefix.key" columns using native types.
898
// On type mismatch (same key, different type across rows), appends null.
899
static void flatten_object_into(RecordBatchBuilder &builder, StringArena &arena,
900
                                std::string_view prefix,
901
                                simdjson::ondemand::object obj) {
902
    using SVH = JsonValueHelper;
903
    char key_buf[512];
904

905
    for (auto field : obj) {
×
906
        if (field.error()) continue;
×
907

908
        auto key_result = field.unescaped_key();
909
        if (key_result.error()) continue;
×
910
        std::string_view sk = key_result.value_unsafe();
911

912
        auto val_result = field.value();
913
        if (val_result.error()) continue;
×
914
        auto sub_val = val_result.value_unsafe();
915

916
        std::size_t needed = prefix.size() + 1 + sk.size();
917
        if (needed >= sizeof(key_buf)) continue;
×
918
        std::memcpy(key_buf, prefix.data(), prefix.size());
919
        key_buf[prefix.size()] = '.';
920
        std::memcpy(key_buf + prefix.size() + 1, sk.data(), sk.size());
921
        std::string_view full_key(key_buf, needed);
922

923
        auto type_result = sub_val.type();
924
        if (type_result.error()) continue;
×
925
        auto json_type = type_result.value_unsafe();
926

927
        switch (json_type) {
×
928
            case simdjson::ondemand::json_type::number: {
929
                auto num_result = sub_val.get_number();
930
                if (num_result.error()) break;
×
931
                auto num = num_result.value_unsafe();
932
                if (num.is_int64()) {
×
933
                    auto idx =
934
                        builder.add_or_get_column(full_key, ColumnType::INT64);
×
935
                    if (builder.column_type(idx) == ColumnType::INT64)
×
936
                        builder.append_int64(idx, num.get_int64());
×
937
                    else
938
                        builder.append_null(idx);
×
939
                } else if (num.is_uint64()) {
×
940
                    auto idx =
941
                        builder.add_or_get_column(full_key, ColumnType::UINT64);
×
942
                    if (builder.column_type(idx) == ColumnType::UINT64)
×
943
                        builder.append_uint64(idx, num.get_uint64());
×
944
                    else
945
                        builder.append_null(idx);
×
946
                } else {
947
                    auto idx =
948
                        builder.add_or_get_column(full_key, ColumnType::DOUBLE);
×
949
                    if (builder.column_type(idx) == ColumnType::DOUBLE)
×
950
                        builder.append_double(idx, num.get_double());
×
951
                    else
952
                        builder.append_null(idx);
×
953
                }
954
                break;
955
            }
956
            case simdjson::ondemand::json_type::string: {
957
                auto str_result = sub_val.get_string();
958
                if (str_result.error()) break;
×
959
                auto str = str_result.value_unsafe();
960
                auto idx =
961
                    builder.add_or_get_column(full_key, ColumnType::STRING);
×
962
                if (builder.column_type(idx) == ColumnType::STRING)
×
963
                    builder.append_string(idx, str);
×
964
                else
965
                    builder.append_null(idx);
×
966
                break;
967
            }
968
            case simdjson::ondemand::json_type::boolean: {
969
                auto bool_result = sub_val.get_bool();
970
                if (bool_result.error()) break;
×
971
                auto b = bool_result.value_unsafe();
972
                auto idx =
973
                    builder.add_or_get_column(full_key, ColumnType::BOOL);
×
974
                if (builder.column_type(idx) == ColumnType::BOOL)
×
975
                    builder.append_bool(idx, b);
×
976
                else
977
                    builder.append_null(idx);
×
978
                break;
979
            }
980
            case simdjson::ondemand::json_type::null: {
981
                auto existing = builder.find_column(full_key);
×
982
                if (existing) builder.append_null(*existing);
×
983
                break;
984
            }
985
            case simdjson::ondemand::json_type::object:
986
            case simdjson::ondemand::json_type::array: {
987
                // Serialize nested object/array to JSON string
988
                auto json_str = SVH::to_json_string(sub_val);
×
989
                auto idx =
990
                    builder.add_or_get_column(full_key, ColumnType::STRING);
×
991
                if (json_str) {
×
992
                    builder.append_string(
×
993
                        idx, arena.push(json_str->data(), json_str->size()));
994
                } else {
995
                    builder.append_null(idx);
×
996
                }
997
                break;
998
            }
999
            default:
1000
                break;
1001
        }
1002
    }
1003
}
1004

1005
static bool build_arrow_row(RecordBatchBuilder &builder, JsonParser &parser,
×
1006
                            StringArena &arena, bool normalize) {
1007
    if (normalize) return normalize_row(builder, arena, parser);
×
1008

1009
    using SVH = JsonValueHelper;
1010
    parser.for_each_field([&](std::string_view key_sv,
×
1011
                              simdjson::ondemand::value val) {
1012
        auto type_result = val.type();
×
1013
        if (type_result.error()) return;
×
1014
        auto json_type = type_result.value_unsafe();
×
1015
        switch (json_type) {
×
1016
            case simdjson::ondemand::json_type::number: {
1017
                auto num_result = val.get_number();
×
1018
                if (num_result.error()) break;
×
1019
                auto num = num_result.value_unsafe();
×
1020
                if (num.is_int64()) {
×
1021
                    std::size_t idx =
1022
                        builder.add_or_get_column(key_sv, ColumnType::INT64);
×
1023
                    builder.append_int64(idx, num.get_int64());
×
1024
                } else if (num.is_uint64()) {
×
1025
                    std::size_t idx =
1026
                        builder.add_or_get_column(key_sv, ColumnType::UINT64);
×
1027
                    builder.append_uint64(idx, num.get_uint64());
×
1028
                } else {
1029
                    std::size_t idx =
1030
                        builder.add_or_get_column(key_sv, ColumnType::DOUBLE);
×
1031
                    builder.append_double(idx, num.get_double());
×
1032
                }
1033
                break;
×
1034
            }
1035
            case simdjson::ondemand::json_type::string: {
1036
                auto str_result = val.get_string();
×
1037
                if (str_result.error()) break;
×
1038
                auto str = str_result.value_unsafe();
×
1039
                std::size_t idx =
1040
                    builder.add_or_get_column(key_sv, ColumnType::STRING);
×
1041
                builder.append_string(idx, str);
×
1042
                break;
×
1043
            }
1044
            case simdjson::ondemand::json_type::boolean: {
1045
                auto bool_result = val.get_bool();
×
1046
                if (bool_result.error()) break;
×
1047
                auto b = bool_result.value_unsafe();
×
1048
                std::size_t idx =
1049
                    builder.add_or_get_column(key_sv, ColumnType::BOOL);
×
1050
                builder.append_bool(idx, b);
×
1051
                break;
×
1052
            }
1053
            case simdjson::ondemand::json_type::null: {
1054
                auto existing = builder.find_column(key_sv);
×
1055
                if (existing) builder.append_null(*existing);
×
1056
                break;
×
1057
            }
1058
            case simdjson::ondemand::json_type::object:
1059
            case simdjson::ondemand::json_type::array: {
1060
                auto json_str = SVH::to_json_string(val);
×
1061
                std::size_t idx =
1062
                    builder.add_or_get_column(key_sv, ColumnType::STRING);
×
1063
                if (json_str) {
×
1064
                    builder.append_string(
×
1065
                        idx, arena.push(json_str->data(), json_str->size()));
×
1066
                } else {
1067
                    builder.append_null(idx);
×
1068
                }
1069
                break;
1070
            }
×
1071
            default:
1072
                break;
×
1073
        }
1074
    });
1075
    builder.end_row();
×
1076
    return true;
×
1077
}
1078

1079
static bool process_json_line(RecordBatchBuilder &builder, JsonParser &parser,
1080
                              StringArena &arena, std::string_view content,
1081
                              bool normalize) {
1082
    const char *trimmed;
1083
    std::size_t trimmed_length;
1084
    if (!dftracer::utils::json_trim_and_validate_with_comma(
×
1085
            content.data(), content.size(), trimmed, trimmed_length))
1086
        return false;
1087
    if (!parser.parse(std::string_view(trimmed, trimmed_length))) return false;
×
1088
    return build_arrow_row(builder, parser, arena, normalize);
×
1089
}
1090

1091
static CoroTask<void> produce_arrow_for_file(
×
1092
    dftracer::utils::coro::Channel<ArrowExportResult> *chan,
1093
    std::string file_path, std::string index_dir, std::size_t checkpoint_size,
1094
    bool auto_build_index, ReadConfig rc, std::size_t batch_size,
1095
    bool normalize, std::atomic<bool> *cancelled) {
1096
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer(chan);
1097
    auto guard = producer.guard();
1098

1099
    TraceReaderConfig cfg;
1100
    cfg.file_path = std::move(file_path);
1101
    cfg.index_dir = std::move(index_dir);
1102
    cfg.checkpoint_size = checkpoint_size;
1103
    cfg.auto_build_index = auto_build_index;
1104

1105
    TraceReader reader(std::move(cfg));
1106

1107
    // Fast path: non-normalized Arrow build happens inside TraceReader.
1108
    // Normalize still goes through read_json + build_arrow_row for the
1109
    // richer schema derivation.
1110
    if (!normalize) {
1111
        auto batch_gen = reader.read_arrow(rc, batch_size);
1112
        while (auto batch_opt = co_await batch_gen.next()) {
1113
            if (cancelled->load(std::memory_order_acquire)) co_return;
1114
            if (!co_await producer.send(std::move(*batch_opt))) co_return;
1115
        }
1116
        co_return;
1117
    }
1118

1119
    auto gen = reader.read_json(rc);
1120
    RecordBatchBuilder builder;
1121
    builder.reserve(batch_size);
1122
    StringArena arena;
1123

1124
    while (auto opt = co_await gen.next()) {
1125
        if (cancelled->load(std::memory_order_acquire)) co_return;
1126
        if (!build_arrow_row(builder, *opt->parser, arena, normalize)) continue;
1127
        if (builder.num_rows() >= batch_size) {
1128
            auto result = builder.finish();
1129
            arena.clear();
1130
            if (!co_await producer.send(std::move(result))) co_return;
1131
            if (!builder.is_schema_locked()) builder.lock_schema();
1132
            builder.reset(true);
1133
            builder.reserve(batch_size);
1134
        }
1135
    }
1136
    if (builder.num_rows() > 0) {
1137
        co_await producer.send(builder.finish());
1138
    }
1139
    co_return;
1140
}
×
1141

1142
static CoroTask<void> file_worker(
×
1143
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
1144
    dftracer::utils::coro::Channel<ArrowExportResult> *out_chan,
1145
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1146
    ReadConfig rc, std::size_t batch_size, bool normalize,
1147
    std::atomic<bool> *cancelled) {
1148
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer(
1149
        out_chan);
1150
    auto guard = producer.guard();
1151

1152
    while (auto file_path = co_await file_chan->receive()) {
1153
        if (cancelled->load(std::memory_order_acquire)) co_return;
1154
        TraceReaderConfig cfg;
1155
        cfg.file_path = std::move(*file_path);
1156
        cfg.index_dir = index_dir;
1157
        cfg.checkpoint_size = checkpoint_size;
1158
        cfg.auto_build_index = auto_build_index;
1159

1160
        TraceReader reader(std::move(cfg));
1161

1162
        if (!normalize) {
1163
            auto batch_gen = reader.read_arrow(rc, batch_size);
1164
            while (auto batch_opt = co_await batch_gen.next()) {
1165
                if (cancelled->load(std::memory_order_acquire)) co_return;
1166
                if (!co_await producer.send(std::move(*batch_opt))) co_return;
1167
            }
1168
            continue;
1169
        }
1170

1171
        auto gen = reader.read_json(rc);
1172
        RecordBatchBuilder builder;
1173
        builder.reserve(batch_size);
1174
        StringArena arena;
1175

1176
        while (auto opt = co_await gen.next()) {
1177
            if (cancelled->load(std::memory_order_acquire)) co_return;
1178
            if (!build_arrow_row(builder, *opt->parser, arena, normalize))
1179
                continue;
1180
            if (builder.num_rows() >= batch_size) {
1181
                auto result = builder.finish();
1182
                arena.clear();
1183
                if (!co_await producer.send(std::move(result))) co_return;
1184
                if (!builder.is_schema_locked()) builder.lock_schema();
1185
                builder.reset(true);
1186
                builder.reserve(batch_size);
1187
            }
1188
        }
1189
        if (builder.num_rows() > 0) {
1190
            if (!co_await producer.send(builder.finish())) co_return;
1191
        }
1192
    }
1193
    co_return;
1194
}
×
1195

1196
// Extract AND-of-EQ leaves from a Query AST. Returns nullopt if the predicate
1197
// shape is anything else (NE, range ops, IN, NOT, OR), in which case the
1198
// uniform-match shortcut does not apply.
1199
static std::optional<std::vector<std::pair<std::string, std::string>>>
1200
extract_eq_leaves(
×
1201
    const dftracer::utils::utilities::common::query::QueryNode &node) {
1202
    namespace q_ns = dftracer::utils::utilities::common::query;
1203
    using LeafVec = std::vector<std::pair<std::string, std::string>>;
1204

1205
    auto literal_to_string = [](const q_ns::LiteralNode &lit) -> std::string {
×
1206
        return std::visit(
1207
            [](auto &&v) -> std::string {
×
1208
                using T = std::decay_t<decltype(v)>;
1209
                if constexpr (std::is_same_v<T, std::string>)
1210
                    return v;
×
1211
                else if constexpr (std::is_same_v<T, bool>)
1212
                    return v ? "true" : "false";
×
1213
                else if constexpr (std::is_same_v<T, int64_t>)
1214
                    return std::to_string(v);
×
1215
                else if constexpr (std::is_same_v<T, uint64_t>)
1216
                    return std::to_string(v);
×
1217
                else if constexpr (std::is_same_v<T, double>)
1218
                    return std::to_string(v);
×
1219
                else
1220
                    return {};
1221
            },
1222
            lit.value);
×
1223
    };
1224

1225
    return std::visit(
1226
        [&](const auto &n) -> std::optional<LeafVec> {
×
1227
            using T = std::decay_t<decltype(n)>;
1228
            if constexpr (std::is_same_v<T, q_ns::CompareNode>) {
1229
                if (n.op != q_ns::CompareOp::EQ) return std::nullopt;
×
1230
                return LeafVec{{n.field.path, literal_to_string(n.value)}};
×
1231
            } else if constexpr (std::is_same_v<T, q_ns::AndNode>) {
1232
                auto l = extract_eq_leaves(*n.left);
×
1233
                if (!l) return std::nullopt;
×
1234
                auto r = extract_eq_leaves(*n.right);
×
1235
                if (!r) return std::nullopt;
×
1236
                l->insert(l->end(), r->begin(), r->end());
×
1237
                return l;
×
1238
            } else {
×
1239
                return std::nullopt;
×
1240
            }
1241
        },
1242
        node.data);
×
1243
}
1244

1245
// True iff every checkpoint in `chunk_idxs` has dim_stats min == max == literal
1246
// for every leaf. Empty leaves -> false (no shortcut). Missing dim_stats for
1247
// any (chunk, leaf) -> false (we don't know, play safe).
1248
static bool all_chunks_uniform_match(
×
1249
    const dftracer::utils::utilities::indexer::IndexDatabase &db, int fid,
1250
    const std::vector<std::pair<std::string, std::string>> &leaves,
1251
    const std::vector<std::uint64_t> &chunk_idxs) {
1252
    if (leaves.empty() || chunk_idxs.empty()) return false;
×
1253
    namespace indexing = dftracer::utils::utilities::composites::dft::indexing;
1254

1255
    for (const auto &[dim, val] : leaves) {
×
1256
        auto rows = db.query_chunk_dimension_stats_for_dimension(fid, dim);
×
1257
        if (rows.empty()) return false;
×
1258
        std::unordered_map<std::uint64_t,
1259
                           const indexing::ChunkDimensionStatsResult *>
1260
            by_ckpt;
×
1261
        by_ckpt.reserve(rows.size());
×
1262
        for (const auto &r : rows) by_ckpt.emplace(r.checkpoint_idx, &r);
×
1263
        for (auto cidx : chunk_idxs) {
×
1264
            auto it = by_ckpt.find(cidx);
×
1265
            if (it == by_ckpt.end()) return false;
×
1266
            const auto &ds = *it->second;
×
1267
            if (ds.min_value != val || ds.max_value != val) return false;
×
1268
        }
1269
    }
×
1270
    return true;
×
1271
}
1272

1273
// Byte-range work unit for checkpoint-level parallelism. Each unit covers
1274
// one or more consecutive checkpoints from a single file. Decompression of
1275
// a single gz file is sequential per gzip stream, so splitting at
1276
// checkpoint-aligned byte offsets is what lets multiple workers share the
1277
// decode work for one file.
1278
struct ArrowWorkItem {
219✔
1279
    std::string file_path;
1280
    std::size_t start_byte = 0;
219✔
1281
    std::size_t end_byte = 0;
219✔
1282
    bool start_at_checkpoint = false;
219✔
1283
    bool end_at_checkpoint = false;
219✔
1284
    // When true, every kept chunk for this byte range is uniform-matching
1285
    // (dim_stats min == max == predicate literal for every AND-of-EQ leaf),
1286
    // so per-event predicate eval is skippable.
1287
    bool chunk_prune_only = false;
219✔
1288
    // Line-range work items override byte ranges: the worker passes these
1289
    // down as LINE_RANGE on the read, and the gzip stream resolves them to
1290
    // byte offsets via the checkpoint index. 0 = no line constraint.
1291
    std::size_t start_line = 0;
219✔
1292
    std::size_t end_line = 0;
219✔
1293
};
1294

1295
static std::vector<ArrowWorkItem> enumerate_work_items(
72✔
1296
    const std::vector<std::string> &files, const std::string &index_dir,
1297
    const std::string &query_str, std::size_t max_workers,
1298
    std::size_t clip_start_byte = 0, std::size_t clip_end_byte = 0,
1299
    std::size_t clip_start_line = 0, std::size_t clip_end_line = 0) {
1300
    namespace dft_internal =
1301
        dftracer::utils::utilities::composites::dft::internal;
1302
    namespace indexer_ns = dftracer::utils::utilities::indexer;
1303
    namespace indexing = dftracer::utils::utilities::composites::dft::indexing;
1304

1305
    std::vector<ArrowWorkItem> items;
72✔
1306
    items.reserve(files.size() * 4);
72!
1307

1308
    const bool has_line_clip = (clip_start_line > 0 || clip_end_line > 0);
72!
1309
    auto push_unsplit = [&](const std::string &fp) {
150✔
1310
        ArrowWorkItem item;
114✔
1311
        item.file_path = fp;
114!
1312
        item.start_line = clip_start_line;
114✔
1313
        item.end_line = clip_end_line;
114✔
1314
        items.push_back(std::move(item));
114!
1315
    };
114✔
1316

1317
    // Parse the query once. Pruner input copies a Query, so we keep the
1318
    // parsed form around to feed each ChunkPrunerInput without re-parsing.
1319
    std::optional<dftracer::utils::utilities::common::query::Query> parsed;
72✔
1320
    if (!query_str.empty()) {
72✔
1321
        auto r = dftracer::utils::utilities::common::query::Query::from_string(
3!
1322
            query_str);
6!
1323
        if (r) parsed = std::move(*r);
6!
1324
    }
6✔
1325

1326
    // All files in a directory-mode scan share the same `.dftindex` root.
1327
    // Group files by their resolved index path so we can open the RocksDB
1328
    // once per index and reuse it to prune every file against that handle.
1329
    std::unordered_map<std::string, std::vector<std::size_t>> by_index;
72✔
1330
    for (std::size_t i = 0; i < files.size(); ++i) {
194✔
1331
        std::string index_path =
1332
            dft_internal::determine_index_path(files[i], index_dir);
122!
1333
        by_index[index_path].push_back(i);
122!
1334
    }
122✔
1335

1336
    for (auto &entry : by_index) {
150!
1337
        const auto &index_path = entry.first;
78✔
1338
        const auto &file_idxs = entry.second;
78✔
1339
        if (!fs::exists(index_path)) {
78!
1340
            for (auto i : file_idxs) push_unsplit(files[i]);
156✔
1341
            continue;
62✔
1342
        }
31✔
1343
        std::unique_ptr<indexer_ns::IndexDatabase> idx_db;
16✔
1344
        try {
1345
            idx_db = std::make_unique<indexer_ns::IndexDatabase>(
16!
1346
                index_path,
8✔
1347
                dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
16!
1348
        } catch (...) {
8✔
1349
            for (auto i : file_idxs) push_unsplit(files[i]);
×
1350
            continue;
1351
        }
×
1352

1353
        // Resolve fid + checkpoints per file (cheap queries).
1354
        struct FileCtx {
1355
            std::size_t file_idx;
1356
            int fid;
1357
            std::vector<indexer_ns::IndexerCheckpoint> ckpts;
1358
        };
1359
        std::vector<FileCtx> file_ctxs;
16✔
1360
        file_ctxs.reserve(file_idxs.size());
16!
1361
        for (auto i : file_idxs) {
44✔
1362
            FileCtx fc;
28✔
1363
            fc.file_idx = i;
28✔
1364
            fc.fid = idx_db->get_file_info_id(
42✔
1365
                indexer_ns::internal::get_logical_path(files[i]));
42!
1366
            if (fc.fid < 0) {
28!
1367
                push_unsplit(files[i]);
×
1368
                continue;
×
1369
            }
1370
            fc.ckpts = idx_db->query_checkpoints(fc.fid);
28!
1371
            if (fc.ckpts.empty()) {
28✔
1372
                push_unsplit(files[i]);
20!
1373
                continue;
20✔
1374
            }
1375
            std::sort(fc.ckpts.begin(), fc.ckpts.end(),
8!
1376
                      [](const auto &a, const auto &b) {
×
1377
                          return a.first_line_num < b.first_line_num;
×
1378
                      });
1379
            file_ctxs.push_back(std::move(fc));
8!
1380
        }
28✔
1381

1382
        // Batch-prune all files against the shared index: dim_stats and
1383
        // chunk_statistics are loaded in one RocksDB scan each instead of
1384
        // one scan per file.
1385
        std::vector<indexing::ChunkPrunerOutput> pruner_outs(file_ctxs.size());
16!
1386
        if (parsed && !file_ctxs.empty()) {
16!
1387
            indexing::ChunkPrunerBatchInput batch_in;
×
1388
            batch_in.index_path = index_path;
×
1389
            batch_in.external_db = idx_db.get();
×
1390
            batch_in.items.reserve(file_ctxs.size());
×
1391
            for (auto &fc : file_ctxs) {
×
1392
                batch_in.items.push_back({files[fc.file_idx], *parsed});
×
1393
            }
1394
            indexing::ChunkPrunerUtility pruner;
×
1395
            auto batch_out = pruner.process_batch(batch_in);
×
1396
            if (batch_out.success) {
×
1397
                pruner_outs = std::move(batch_out.outputs);
×
1398
            }
1399
        }
×
1400

1401
        // For AND-of-EQ predicates, precompute uniform-match leaves once.
1402
        // Per-file pure_match is checked inline below and lets workers skip
1403
        // per-event predicate eval on chunks where dim_stats min == max ==
1404
        // literal for every leaf.
1405
        std::optional<std::vector<std::pair<std::string, std::string>>>
1406
            eq_leaves;
16✔
1407
        if (parsed) eq_leaves = extract_eq_leaves(parsed->root());
16!
1408

1409
        for (std::size_t fc_idx = 0; fc_idx < file_ctxs.size(); ++fc_idx) {
24✔
1410
            auto &fc = file_ctxs[fc_idx];
8✔
1411
            const auto &fp = files[fc.file_idx];
8✔
1412

1413
            // Pruner chunk_idx semantics: 0-indexed over uncompressed
1414
            // slices. fc.ckpts holds gzip recovery points; recovery point
1415
            // fc.ckpts[k] sits at the START of pruner chunk (k+1). Pruner
1416
            // chunk 0 has no recovery point at its start (decoded from
1417
            // gzip stream start). Total pruner chunks = fc.ckpts.size()+1.
1418
            const std::size_t total_chunks = fc.ckpts.size() + 1;
8✔
1419
            auto chunk_start_byte = [&](std::uint64_t cidx) -> std::size_t {
16✔
1420
                if (cidx == 0) return 0;
12✔
1421
                return fc.ckpts[cidx - 1].uc_offset;
8✔
1422
            };
10✔
1423
            auto chunk_end_byte = [&](std::uint64_t cidx) -> std::size_t {
16✔
1424
                if (cidx == 0)
12✔
1425
                    return fc.ckpts.empty() ? 0 : fc.ckpts[0].uc_offset;
4!
1426
                std::size_t k = cidx - 1;
8✔
1427
                return fc.ckpts[k].uc_offset + fc.ckpts[k].uc_size;
8✔
1428
            };
10✔
1429
            // Line ranges for a chunk. Chunk 0 covers everything before the
1430
            // first recovery point; chunk k>=1 spans recovery point (k-1).
1431
            auto chunk_first_line = [&](std::uint64_t cidx) -> std::size_t {
16✔
1432
                if (cidx == 0) return 1;
12✔
1433
                return fc.ckpts[cidx - 1].first_line_num;
8✔
1434
            };
10✔
1435
            auto chunk_last_line = [&](std::uint64_t cidx) -> std::size_t {
16✔
1436
                if (cidx == 0) {
12✔
1437
                    if (fc.ckpts.empty()) return SIZE_MAX;
4!
1438
                    return fc.ckpts[0].first_line_num > 0
4!
1439
                               ? fc.ckpts[0].first_line_num - 1
4!
1440
                               : 0;
2✔
1441
                }
1442
                return fc.ckpts[cidx - 1].last_line_num;
8✔
1443
            };
10✔
1444

1445
            std::vector<std::uint64_t> keep_chunks;
8✔
1446
            keep_chunks.reserve(total_chunks);
8!
1447
            if (parsed) {
8!
1448
                const auto &pr = pruner_outs[fc_idx];
×
1449
                if (pr.success && !pr.file_may_match) {
×
1450
                    continue;  // whole file pruned
×
1451
                }
1452
                if (pr.success && !pr.candidate_checkpoints.empty() &&
×
1453
                    pr.candidate_checkpoints.size() < pr.total_checkpoints) {
×
1454
                    for (auto cidx : pr.candidate_checkpoints) {
×
1455
                        if (cidx < total_chunks) keep_chunks.push_back(cidx);
×
1456
                    }
1457
                    std::sort(keep_chunks.begin(), keep_chunks.end());
×
1458
                    keep_chunks.erase(
×
1459
                        std::unique(keep_chunks.begin(), keep_chunks.end()),
×
1460
                        keep_chunks.end());
×
1461
                } else {
1462
                    for (std::uint64_t c = 0; c < total_chunks; ++c)
×
1463
                        keep_chunks.push_back(c);
×
1464
                }
1465
            } else {
1466
                for (std::uint64_t c = 0; c < total_chunks; ++c)
24✔
1467
                    keep_chunks.push_back(c);
16!
1468
            }
1469

1470
            // Intersect with the user's line range so workers only touch
1471
            // chunks that actually overlap it. Each work item carries the
1472
            // sub-line-range; LINE_RANGE on the read maps it back to bytes
1473
            // via the same checkpoint table the gzip stream uses.
1474
            if (has_line_clip) {
8✔
1475
                std::size_t lo = clip_start_line > 0 ? clip_start_line : 1;
4!
1476
                std::size_t hi = clip_end_line > 0 ? clip_end_line : SIZE_MAX;
4!
1477
                std::vector<std::uint64_t> filtered;
4✔
1478
                filtered.reserve(keep_chunks.size());
4!
1479
                for (auto c : keep_chunks) {
12✔
1480
                    std::size_t cf = chunk_first_line(c);
8!
1481
                    std::size_t cl = chunk_last_line(c);
8!
1482
                    if (cl < lo || cf > hi) continue;
8✔
1483
                    filtered.push_back(c);
4!
1484
                }
1485
                keep_chunks = std::move(filtered);
4✔
1486
            }
4✔
1487

1488
            if (keep_chunks.empty()) continue;
8✔
1489

1490
            // All-or-nothing per file: if every kept chunk is uniform-matching
1491
            // for every leaf, every work item from this file gets the
1492
            // chunk_prune_only fast path. Mixed files fall back to per-event
1493
            // eval to stay safe.
1494
            bool file_pure_match = false;
8✔
1495
            if (eq_leaves && !eq_leaves->empty() && idx_db) {
8!
1496
                file_pure_match = all_chunks_uniform_match(
×
1497
                    *idx_db, fc.fid, *eq_leaves, keep_chunks);
×
1498
            }
1499

1500
            std::size_t target_ranges = std::max<std::size_t>(1, max_workers);
8!
1501
            std::size_t per_range = std::max<std::size_t>(
8!
1502
                1, (keep_chunks.size() + target_ranges - 1) / target_ranges);
8✔
1503

1504
            std::size_t group_start = 0;
8✔
1505
            while (group_start < keep_chunks.size()) {
20✔
1506
                std::size_t group_end = group_start;
12✔
1507
                std::size_t emitted = 0;
12✔
1508
                while (group_end < keep_chunks.size() && emitted < per_range) {
24✔
1509
                    if (group_end > group_start &&
12!
1510
                        keep_chunks[group_end] !=
×
1511
                            keep_chunks[group_end - 1] + 1) {
×
1512
                        break;
×
1513
                    }
1514
                    ++group_end;
12✔
1515
                    ++emitted;
12✔
1516
                }
1517
                std::uint64_t scidx = keep_chunks[group_start];
12✔
1518
                std::uint64_t ecidx = keep_chunks[group_end - 1];
12✔
1519
                std::size_t start_byte = chunk_start_byte(scidx);
12!
1520
                std::size_t end_byte = chunk_end_byte(ecidx);
12!
1521
                // start_at_checkpoint: a gzip recovery point sits at
1522
                // start_byte (true for any cidx>=1; false for the implicit
1523
                // chunk 0 which decodes from stream start).
1524
                bool start_at_checkpoint = (scidx >= 1);
12✔
1525
                bool end_at_checkpoint = (group_end < keep_chunks.size());
12✔
1526
                if (has_line_clip) {
12✔
1527
                    std::size_t lo = clip_start_line > 0 ? clip_start_line : 1;
4!
1528
                    std::size_t hi =
4✔
1529
                        clip_end_line > 0 ? clip_end_line : SIZE_MAX;
4!
1530
                    std::size_t cluster_first = chunk_first_line(scidx);
4!
1531
                    std::size_t cluster_last = chunk_last_line(ecidx);
4!
1532
                    std::size_t item_start =
2✔
1533
                        std::max<std::size_t>(lo, cluster_first);
4!
1534
                    std::size_t item_end =
2✔
1535
                        std::min<std::size_t>(hi, cluster_last);
4!
1536
                    if (item_start > item_end) {
4!
1537
                        group_start = group_end;
×
1538
                        continue;
×
1539
                    }
1540
                    ArrowWorkItem item;
4✔
1541
                    item.file_path = fp;
4!
1542
                    item.chunk_prune_only = file_pure_match;
4✔
1543
                    item.start_line = item_start;
4✔
1544
                    item.end_line = item_end;
4✔
1545
                    items.push_back(std::move(item));
4!
1546
                    group_start = group_end;
4✔
1547
                    continue;
2✔
1548
                }
4✔
1549
                if (clip_end_byte > clip_start_byte) {
8✔
1550
                    if (start_byte < clip_start_byte) {
4✔
1551
                        start_byte = clip_start_byte;
×
1552
                        start_at_checkpoint = false;
×
1553
                    }
1554
                    if (end_byte > clip_end_byte) {
4✔
1555
                        end_byte = clip_end_byte;
4✔
1556
                        end_at_checkpoint = false;
4✔
1557
                    }
2✔
1558
                    if (start_byte >= end_byte) {
4✔
1559
                        group_start = group_end;
2✔
1560
                        continue;
2✔
1561
                    }
1562
                }
1✔
1563
                items.push_back({fp, start_byte, end_byte, start_at_checkpoint,
6!
1564
                                 end_at_checkpoint, file_pure_match});
6✔
1565
                group_start = group_end;
6✔
1566
            }
1567
        }
8✔
1568
    }
16✔
1569
    return items;
108✔
1570
}
72!
1571

1572
static CoroTask<void> send_work_items_to_channel(
464!
1573
    std::shared_ptr<dftracer::utils::coro::Channel<ArrowWorkItem>> chan,
1574
    const std::vector<ArrowWorkItem> *items, std::atomic<bool> *cancelled) {
36!
1575
    for (const auto &it : *items) {
346✔
1576
        if (cancelled->load(std::memory_order_acquire)) break;
186!
1577
        if (!co_await chan->send(it)) break;
284!
1578
    }
62✔
1579
    chan->close();
36!
1580
    co_return;
36✔
1581
}
320!
1582

1583
static CoroTask<void> checkpoint_worker(
314!
1584
    std::shared_ptr<dftracer::utils::coro::Channel<ArrowWorkItem>> work_chan,
1585
    dftracer::utils::coro::Channel<ArrowExportResult> *out_chan,
1586
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1587
    ReadConfig rc, std::size_t batch_size, bool normalize,
1588
    std::atomic<bool> *cancelled) {
52!
1589
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer(
104!
1590
        out_chan);
52✔
1591
    auto guard = producer.guard();
52!
1592

1593
    // Cache readers keyed by file path so we don't re-probe the same file
1594
    // when successive work items land on it.
1595
    std::unordered_map<std::string, std::shared_ptr<TraceReader>> readers;
52✔
1596

1597
    while (auto item = co_await work_chan->receive()) {
346!
1598
        if (cancelled->load(std::memory_order_acquire)) co_return;
184!
1599

1600
        auto &reader_ptr = readers[item->file_path];
184!
1601
        if (!reader_ptr) {
184✔
1602
            TraceReaderConfig cfg;
62✔
1603
            cfg.file_path = item->file_path;
62!
1604
            cfg.index_dir = index_dir;
62!
1605
            cfg.checkpoint_size = checkpoint_size;
62✔
1606
            cfg.auto_build_index = auto_build_index;
62✔
1607
            reader_ptr = std::make_shared<TraceReader>(std::move(cfg));
62!
1608
        }
62✔
1609

1610
        ReadConfig local_rc = rc;
184!
1611
        if (item->start_line > 0 || item->end_line > 0) {
184!
1612
            // Line-range work items: the read drives off LINE_RANGE; the
1613
            // gzip stream resolves it back to byte offsets via checkpoints.
1614
            local_rc.start_line = item->start_line;
126✔
1615
            local_rc.end_line = item->end_line;
126✔
1616
            local_rc.start_byte = 0;
126✔
1617
            local_rc.end_byte = 0;
126✔
1618
            local_rc.start_at_checkpoint = false;
126✔
1619
            local_rc.end_at_checkpoint = false;
126✔
1620
        } else {
126✔
1621
            local_rc.start_byte = item->start_byte;
58✔
1622
            local_rc.end_byte = item->end_byte;
58✔
1623
            local_rc.start_at_checkpoint = item->start_at_checkpoint;
58✔
1624
            local_rc.end_at_checkpoint = item->end_at_checkpoint;
58✔
1625
        }
1626
        // Pruning already happened at enumeration time; avoid the per-
1627
        // work-item RocksDB opens that would otherwise dwarf the actual
1628
        // read cost at directory scale (256 files * N ranges).
1629
        local_rc.skip_pruning = true;
184✔
1630
        // chunks pre-classified as uniform-matching skip per-event eval.
1631
        if (item->chunk_prune_only) local_rc.chunk_prune_only = true;
184!
1632

1633
        if (!normalize) {
184!
1634
            auto batch_gen = reader_ptr->read_arrow(local_rc, batch_size);
184!
1635
            while (auto batch_opt = co_await batch_gen.next()) {
569!
1636
                if (cancelled->load(std::memory_order_acquire)) co_return;
243!
1637
                if (!co_await producer.send(std::move(*batch_opt))) co_return;
324!
1638
            }
305!
1639
            continue;
62✔
1640
        }
62✔
1641

1642
        auto gen = reader_ptr->read_json(local_rc);
×
1643
        RecordBatchBuilder builder;
×
1644
        builder.reserve(batch_size);
×
1645
        StringArena arena;
×
1646

1647
        while (auto opt = co_await gen.next()) {
×
1648
            if (cancelled->load(std::memory_order_acquire)) co_return;
×
1649
            if (!build_arrow_row(builder, *opt->parser, arena, normalize))
×
1650
                continue;
1651
            if (builder.num_rows() >= batch_size) {
×
1652
                auto result = builder.finish();
×
1653
                arena.clear();
×
1654
                if (!co_await producer.send(std::move(result))) co_return;
×
1655
                if (!builder.is_schema_locked()) builder.lock_schema();
×
1656
                builder.reset(true);
×
1657
                builder.reserve(batch_size);
×
1658
            }
×
1659
        }
×
1660
        if (builder.num_rows() > 0) {
×
1661
            if (!co_await producer.send(builder.finish())) co_return;
×
1662
        }
1663
    }
236!
1664
    co_return;
52✔
1665
}
1,288!
1666

1667
static CoroTask<void> spawn_arrow_producers(
216!
1668
    CoroScope &child,
1669
    dftracer::utils::coro::Channel<ArrowExportResult> *out_chan,
1670
    const std::vector<ArrowWorkItem> *work_items, const std::string *index_dir,
1671
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
1672
    std::size_t batch_size, bool normalize, std::atomic<bool> *cancelled_ptr,
1673
    std::size_t max_workers) {
36!
1674
    std::size_t num_workers = std::min(work_items->size(), max_workers);
36!
1675
    if (num_workers == 0) num_workers = 1;
36!
1676
    auto work_chan =
36✔
1677
        dftracer::utils::coro::make_channel<ArrowWorkItem>(num_workers);
36!
1678

1679
    for (std::size_t i = 0; i < num_workers; ++i) {
88✔
1680
        child.spawn([out_chan, wc = work_chan, idx = *index_dir,
154!
1681
                     checkpoint_size, auto_build_index, r = *rc, batch_size,
156!
1682
                     normalize, cancelled_ptr](CoroScope &) {
258✔
1683
            return checkpoint_worker(wc, out_chan, idx, checkpoint_size,
156!
1684
                                     auto_build_index, r, batch_size, normalize,
102!
1685
                                     cancelled_ptr);
102!
1686
        });
1687
    }
52✔
1688

1689
    child.spawn([wc = work_chan, work_items, cancelled_ptr](CoroScope &) {
108!
1690
        return send_work_items_to_channel(wc, work_items, cancelled_ptr);
72!
1691
    });
1692
    co_return;
72✔
1693
}
108!
1694

1695
static CoroTask<void> produce_arrow_batches_for_files(
294!
1696
    CoroScope &scope, ArrowIteratorState *sp, std::vector<std::string> files,
1697
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1698
    ReadConfig rc, std::size_t batch_size, bool normalize,
1699
    std::size_t max_workers) {
37!
1700
    try {
1701
        if (files.empty()) {
109✔
1702
            sp->channel->close();
1!
1703
            co_return;
38✔
1704
        }
1705

1706
        auto work_items = enumerate_work_items(
216!
1707
            files, index_dir, rc.query, max_workers, rc.start_byte, rc.end_byte,
108✔
1708
            rc.start_line, rc.end_line);
108✔
1709
        if (work_items.empty()) {
108!
1710
            sp->channel->close();
×
1711
            co_return;
1712
        }
1713

1714
        auto *chan_ptr = sp->channel.get();
108✔
1715
        auto *cancelled_ptr = &sp->cancelled;
108✔
1716

1717
        co_await scope.scope([chan_ptr, &work_items, &index_dir,
1,080!
1718
                              checkpoint_size, auto_build_index, &rc,
216✔
1719
                              batch_size, normalize, cancelled_ptr,
324✔
1720
                              max_workers](CoroScope &child) -> CoroTask<void> {
144!
1721
            co_await spawn_arrow_producers(
288!
1722
                child, chan_ptr, &work_items, &index_dir, checkpoint_size,
108✔
1723
                auto_build_index, &rc, batch_size, normalize, cancelled_ptr,
108✔
1724
                max_workers);
108✔
1725
        });
144!
1726
    } catch (...) {
36!
1727
        sp->set_error(std::current_exception());
×
1728
    }
×
1729
}
146!
1730

1731
static CoroTask<void> produce_arrow_batches_parallel(
180!
1732
    CoroScope &scope, ArrowIteratorState *sp, std::string dir_path,
1733
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1734
    ReadConfig rc, std::size_t batch_size, bool normalize,
1735
    std::size_t max_workers) {
15!
1736
    try {
1737
        PatternDirectoryScannerUtility scanner;
45!
1738
        auto scan_input = PatternDirectoryScannerUtilityInput(
90!
1739
            dir_path, {".pfw", ".pfw.gz"}, true, false);
45!
1740
        auto entries = co_await scope.spawn(scanner, scan_input);
75!
1741

1742
        std::vector<std::string> files;
45✔
1743
        files.reserve(entries.size());
45✔
1744
        for (auto &e : entries) files.push_back(e.path.string());
54!
1745
        std::sort(files.begin(), files.end());
15✔
1746

1747
        co_await produce_arrow_batches_for_files(
105!
1748
            scope, sp, std::move(files), std::move(index_dir), checkpoint_size,
45✔
1749
            auto_build_index, std::move(rc), batch_size, normalize,
45✔
1750
            max_workers);
45✔
1751
    } catch (...) {
45✔
1752
        sp->set_error(std::current_exception());
×
1753
    }
×
1754
}
210!
1755

1756
CoroTask<void> produce_arrow_batches(
136!
1757
    std::shared_ptr<ArrowIteratorState> state,
1758
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer,
1759
    TraceReaderConfig cfg, ReadConfig rc, std::size_t batch_size,
1760
    bool flatten_objects = false, bool normalize = false) {
17!
1761
    (void)flatten_objects;
1762

1763
    auto guard = producer.guard();
51!
1764
    try {
1765
        TraceReader reader(std::move(cfg));
51!
1766

1767
        if (!normalize) {
51!
1768
            auto batch_gen = reader.read_arrow(rc, batch_size);
51!
1769
            while (auto batch_opt = co_await batch_gen.next()) {
213!
1770
                if (state->cancelled.load(std::memory_order_acquire)) break;
97✔
1771
                auto result_bytes =
96✔
1772
                    dftracer::utils::python::byte_size(*batch_opt);
96!
1773
                state->bytes_in_queue.fetch_add(result_bytes,
96✔
1774
                                                std::memory_order_acq_rel);
1775
                if (!co_await producer.send(std::move(*batch_opt))) break;
128!
1776
            }
113!
1777
            co_return;
17✔
1778
        }
17✔
1779

1780
        auto gen = reader.read_json(rc);
×
1781
        RecordBatchBuilder builder;
×
1782
        builder.reserve(batch_size);
×
1783

1784
        StringArena arena;
×
1785

1786
        while (auto opt = co_await gen.next()) {
×
1787
            if (state->cancelled.load(std::memory_order_acquire)) break;
×
1788
            if (!build_arrow_row(builder, *opt->parser, arena, normalize))
×
1789
                continue;
1790

1791
            if (builder.num_rows() >= batch_size) {
×
1792
                auto result = builder.finish();
×
1793
                arena.clear();
×
1794
                auto result_bytes = dftracer::utils::python::byte_size(result);
×
1795
                state->bytes_in_queue.fetch_add(result_bytes,
1796
                                                std::memory_order_acq_rel);
1797
                if (!co_await producer.send(std::move(result))) break;
×
1798
                if (!builder.is_schema_locked()) {
×
1799
                    builder.lock_schema();
1800
                }
1801
                builder.reset(true);
×
1802
                builder.reserve(batch_size);
×
1803
            }
×
1804
        }
×
1805

1806
        if (builder.num_rows() > 0 &&
×
1807
            !state->cancelled.load(std::memory_order_acquire)) {
1808
            auto result = builder.finish();
×
1809
            auto result_bytes = dftracer::utils::python::byte_size(result);
×
1810
            state->bytes_in_queue.fetch_add(result_bytes,
1811
                                            std::memory_order_acq_rel);
1812
            co_await producer.send(std::move(result));
×
1813
        }
×
1814
    } catch (...) {
17!
1815
        state->set_error(std::current_exception());
×
1816
    }
×
1817
}
375!
1818

1819
#endif  // DFTRACER_UTILS_ENABLE_ARROW
1820

1821
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
1822

1823
struct WriteArrowStats {
39✔
1824
    std::unordered_map<std::string, PartitionWriteStats> partitions;
1825
    int64_t total_rows = 0;
39✔
1826
    int64_t total_uncompressed_bytes = 0;
39✔
1827
};
1828

1829
struct WriteArrowResult {
52✔
1830
    WriteArrowStats stats;
1831
    std::string error;
1832
    std::uint64_t chunks_scanned = 0;
39✔
1833
    std::uint64_t chunks_skipped = 0;
39✔
1834
};
1835

1836
CoroTask<WriteArrowResult> write_arrow_pipeline(
190!
1837
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
1838
    std::vector<ViewDefinition> views, std::string output_path,
1839
    int64_t chunk_size_bytes, IpcCompression compression,
1840
    std::size_t event_batch_size) {
13!
1841
    namespace dft_internal =
1842
        dftracer::utils::utilities::composites::dft::internal;
1843
    WriteArrowResult result;
13✔
1844

1845
    try {
1846
        if (views.empty()) {
13✔
1847
            views.push_back(ViewDefinition().with_name("all"));
8!
1848
        }
8✔
1849

1850
        std::string resolved_index =
13✔
1851
            index_path.empty()
26!
1852
                ? dft_internal::determine_index_path(file_path, "")
13!
1853
                : index_path;
×
1854

1855
        auto meta_input = MetadataCollectorUtilityInput::from_file(file_path)
26!
1856
                              .with_checkpoint_size(checkpoint_size)
13!
1857
                              .with_index(resolved_index);
13!
1858
        auto metadata = co_await MetadataCollectorUtility{}.process(meta_input);
39!
1859
        if (!metadata.success) {
13!
1860
            result.error =
1861
                "Failed to collect metadata: " + metadata.error_message;
×
1862
            co_return result;
×
1863
        }
1864

1865
        for (const auto &view : views) {
99✔
1866
            std::string view_output = output_path;
30!
1867
            if (views.size() > 1 || view.name != "all") {
30✔
1868
                view_output = output_path + "/" + view.name;
42!
1869
            }
6✔
1870

1871
            PartitionWriter writer;
42!
1872
            int rc_open = co_await writer.open(view_output, chunk_size_bytes,
98!
1873
                                               compression);
42✔
1874
            if (rc_open != 0) {
42!
1875
                result.error =
1876
                    "Failed to open partition writer for view: " + view.name;
×
1877
                co_return result;
×
1878
            }
1879

1880
            ViewBuilderInput builder_input;
42✔
1881
            builder_input.with_view(view)
42✔
1882
                .with_file_path(file_path)
14!
1883
                .with_index_path(resolved_index)
14!
1884
                .with_uncompressed_size(metadata.uncompressed_size)
14!
1885
                .with_num_checkpoints(metadata.num_checkpoints);
14✔
1886

1887
            auto build_output =
42✔
1888
                co_await ViewBuilderUtility{}.process(builder_input);
56!
1889
            if (!build_output.success) {
58!
1890
                result.error = "ViewBuilder failed for view: " + view.name;
×
1891
                co_return result;
×
1892
            }
1893

1894
            result.chunks_skipped += build_output.skipped_checkpoints;
58✔
1895

1896
            if (!build_output.file_may_match) {
58✔
1897
                auto stats = co_await writer.close();
24!
1898
                result.stats.partitions[view.name] = std::move(stats);
6!
1899
                continue;
1900
            }
6✔
1901

1902
            RecordBatchBuilder builder;
40!
1903
            bool schema_locked = false;
40✔
1904

1905
            for (const auto &candidate : build_output.candidates) {
64!
1906
                ViewReaderInput reader_input;
40✔
1907
                reader_input.with_file_path(file_path)
40✔
1908
                    .with_index_path(resolved_index)
8!
1909
                    .with_checkpoint_size(checkpoint_size)
8!
1910
                    .with_byte_range(candidate.start_byte, candidate.end_byte)
8!
1911
                    .with_checkpoint_idx(candidate.checkpoint_idx)
8!
1912
                    .with_event_batch_size(event_batch_size)
8!
1913
                    .with_view(view);
8!
1914
                reader_input.query = view.query;
8✔
1915

1916
                ViewReaderUtility reader;
40!
1917
                auto gen = reader.process(reader_input);
40!
1918
                while (auto opt = co_await gen.next()) {
64!
1919
                    auto arrow_batch = opt->to_arrow(builder);
24!
1920
                    int rc_write = co_await writer.write_batch(arrow_batch);
32!
1921
                    if (rc_write != 0) {
8!
1922
                        result.error =
1923
                            "Failed to write batch for view: " + view.name;
×
1924
                        co_return result;
×
1925
                    }
1926
                    if (!schema_locked) {
8!
1927
                        builder.lock_schema();
8✔
1928
                        schema_locked = true;
8✔
1929
                    }
8✔
1930
                    builder.reset(true);
8!
1931
                }
32!
1932
                result.chunks_scanned++;
8✔
1933
            }
24✔
1934

1935
            auto stats = co_await writer.close();
32!
1936
            result.stats.partitions[view.name] = std::move(stats);
8!
1937
            result.stats.total_rows +=
8✔
1938
                result.stats.partitions[view.name].total_rows;
8!
1939
            result.stats.total_uncompressed_bytes +=
8✔
1940
                result.stats.partitions[view.name].total_uncompressed_bytes;
8!
1941
        }
86✔
1942
    } catch (const std::exception &e) {
69!
1943
        result.error = e.what();
×
1944
    }
×
1945
    co_return result;
13!
1946
}
405!
1947

1948
struct ViewChunkInfo {
1949
    std::uint64_t checkpoint_idx;
1950
    std::size_t start_byte;
1951
    std::size_t end_byte;
1952
};
1953

1954
struct GetViewChunksResult {
12✔
1955
    std::vector<ViewChunkInfo> chunks;
1956
    std::uint64_t total_checkpoints = 0;
9✔
1957
    std::uint64_t skipped_checkpoints = 0;
9✔
1958
    bool file_may_match = false;
9✔
1959
    std::string error;
1960
};
1961

1962
CoroTask<GetViewChunksResult> get_view_chunks_pipeline(
30!
1963
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
1964
    ViewDefinition view) {
3!
1965
    namespace dft_internal =
1966
        dftracer::utils::utilities::composites::dft::internal;
1967
    GetViewChunksResult result;
3✔
1968

1969
    try {
1970
        std::string resolved_index =
3✔
1971
            index_path.empty()
6!
1972
                ? dft_internal::determine_index_path(file_path, "")
3!
1973
                : index_path;
×
1974

1975
        auto meta_input = MetadataCollectorUtilityInput::from_file(file_path)
3!
1976
                              .with_checkpoint_size(checkpoint_size)
3✔
1977
                              .with_index(resolved_index);
3!
1978
        auto metadata = co_await MetadataCollectorUtility{}.process(meta_input);
9!
1979
        if (!metadata.success) {
9!
1980
            result.error =
1981
                "Failed to collect metadata: " + metadata.error_message;
×
1982
            co_return result;
×
1983
        }
1984

1985
        ViewBuilderInput builder_input;
9✔
1986
        builder_input.with_view(view)
9✔
1987
            .with_file_path(file_path)
3!
1988
            .with_index_path(resolved_index)
3!
1989
            .with_uncompressed_size(metadata.uncompressed_size)
3!
1990
            .with_num_checkpoints(metadata.num_checkpoints);
3✔
1991

1992
        auto build_output =
9✔
1993
            co_await ViewBuilderUtility{}.process(builder_input);
12!
1994
        if (!build_output.success) {
3!
1995
            result.error = "ViewBuilder failed";
×
1996
            co_return result;
×
1997
        }
1998

1999
        result.file_may_match = build_output.file_may_match;
3✔
2000
        result.total_checkpoints = build_output.total_checkpoints;
3✔
2001
        result.skipped_checkpoints = build_output.skipped_checkpoints;
3✔
2002

2003
        for (const auto &candidate : build_output.candidates) {
7✔
2004
            result.chunks.push_back({candidate.checkpoint_idx,
12!
2005
                                     candidate.start_byte, candidate.end_byte});
8✔
2006
        }
4✔
2007
    } catch (const std::exception &e) {
9!
2008
        result.error = e.what();
×
2009
    }
×
2010
    co_return result;
3!
2011
}
39!
2012

2013
struct WriteViewChunkResult {
16✔
2014
    std::string output_file;
2015
    std::uint64_t events_matched = 0;
12✔
2016
    std::uint64_t events_scanned = 0;
12✔
2017
    int64_t rows_written = 0;
12✔
2018
    int64_t bytes_written = 0;
12✔
2019
    std::string error;
2020
};
2021

2022
CoroTask<WriteViewChunkResult> write_view_chunk_pipeline(
56!
2023
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
2024
    ViewDefinition view, std::uint64_t checkpoint_idx, std::size_t start_byte,
2025
    std::size_t end_byte, std::string output_file, IpcCompression compression,
2026
    std::size_t event_batch_size) {
4!
2027
    namespace dft_internal =
2028
        dftracer::utils::utilities::composites::dft::internal;
2029
    WriteViewChunkResult result;
4✔
2030
    result.output_file = output_file;
4!
2031

2032
    try {
2033
        std::string resolved_index =
4✔
2034
            index_path.empty()
8!
2035
                ? dft_internal::determine_index_path(file_path, "")
4!
2036
                : index_path;
×
2037

2038
        dftracer::utils::utilities::common::arrow::IpcWriter writer;
4!
2039
        int rc_open = co_await writer.open(output_file, compression);
12!
2040
        if (rc_open != 0) {
20!
2041
            result.error = "Failed to open output file";
×
2042
            co_return result;
×
2043
        }
2044

2045
        ViewReaderInput reader_input;
20✔
2046
        reader_input.with_file_path(file_path)
20✔
2047
            .with_index_path(resolved_index)
4!
2048
            .with_checkpoint_size(checkpoint_size)
4!
2049
            .with_byte_range(start_byte, end_byte)
4!
2050
            .with_checkpoint_idx(checkpoint_idx)
4!
2051
            .with_event_batch_size(event_batch_size)
4!
2052
            .with_view(view);
4!
2053
        reader_input.query = view.query;
4✔
2054

2055
        RecordBatchBuilder builder;
20!
2056
        bool schema_locked = false;
20✔
2057

2058
        ViewReaderUtility reader;
20!
2059
        auto gen = reader.process(reader_input);
20!
2060
        while (auto opt = co_await gen.next()) {
32!
2061
            result.events_matched += opt->events_matched;
12✔
2062
            result.events_scanned += opt->events_scanned;
12✔
2063
            auto batch = opt->to_arrow(builder);
12!
2064
            if (batch.valid()) {
12!
2065
                result.rows_written += batch.num_rows();
12✔
2066
                int rc = co_await writer.write_batch(batch);
16!
2067
                if (rc != 0) {
4!
2068
                    result.error = "Failed to write batch";
×
2069
                    co_return result;
×
2070
                }
2071
                if (!schema_locked) {
4!
2072
                    builder.lock_schema();
4✔
2073
                    schema_locked = true;
4✔
2074
                }
4✔
2075
                builder.reset(true);
4!
2076
            }
4!
2077
        }
16!
2078

2079
        int rc = co_await writer.close();
16!
2080
        if (rc != 0) {
4!
2081
            result.error = "Failed to close output file";
×
2082
        }
2083
    } catch (const std::exception &e) {
20!
2084
        result.error = e.what();
×
2085
    }
×
2086
    co_return result;
4!
2087
}
116!
2088

2089
struct ChunkDescriptor {
2090
    std::uint64_t checkpoint_idx;
2091
    std::size_t start_byte;
2092
    std::size_t end_byte;
2093
    std::string output_file;
2094
};
2095

2096
struct WriteViewChunksResult {
3✔
2097
    std::vector<WriteViewChunkResult> results;
2098
    int64_t total_rows = 0;
3✔
2099
    int64_t total_events_matched = 0;
3✔
2100
};
2101

2102
CoroTask<WriteViewChunksResult> write_view_chunks_pipeline(
8!
2103
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
2104
    ViewDefinition view, std::vector<ChunkDescriptor> chunks,
2105
    IpcCompression compression, std::size_t event_batch_size) {
1!
2106
    WriteViewChunksResult result;
3✔
2107

2108
    if (chunks.empty()) {
3!
2109
        co_return result;
1!
2110
    }
2111

2112
    std::vector<CoroTask<WriteViewChunkResult>> tasks;
3✔
2113
    tasks.reserve(chunks.size());
3!
2114

2115
    for (const auto &chunk : chunks) {
6✔
2116
        tasks.push_back(write_view_chunk_pipeline(
6!
2117
            file_path, index_path, checkpoint_size, view, chunk.checkpoint_idx,
3!
2118
            chunk.start_byte, chunk.end_byte, chunk.output_file, compression,
3!
2119
            event_batch_size));
3✔
2120
    }
3✔
2121

2122
    result.results = co_await when_all(std::move(tasks));
4!
2123

2124
    for (const auto &r : result.results) {
4✔
2125
        result.total_rows += r.rows_written;
3✔
2126
        result.total_events_matched += r.events_matched;
3✔
2127
    }
3✔
2128

2129
    co_return result;
1!
2130
}
7!
2131

2132
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
2133

2134
TraceReaderConfig build_config(TraceReaderObject *self) {
302✔
2135
    TraceReaderConfig cfg;
302✔
2136
    cfg.file_path = PyUnicode_AsUTF8(self->file_path);
302!
2137
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
302!
2138
    if (idx) cfg.index_dir = idx;
302!
2139
    cfg.checkpoint_size = self->checkpoint_size;
302✔
2140
    cfg.auto_build_index = self->auto_build_index != 0;
302✔
2141
    return cfg;
302✔
2142
}
151!
2143

2144
static Runtime *get_runtime(TraceReaderObject *self) {
300✔
2145
    if (self->runtime_obj) {
300✔
2146
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
68✔
2147
    }
2148
    return get_default_runtime();
232✔
2149
}
150✔
2150

2151
static TraceReaderIteratorObject *make_memoryview_iterator(
142✔
2152
    std::shared_ptr<MemoryViewBatchIteratorState> state) {
2153
    TraceReaderIteratorObject *it =
71✔
2154
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
142✔
2155
            &TraceReaderIteratorType, 0);
2156
    if (!it) return NULL;
142✔
2157
    new (&it->batch_state)
142✔
2158
        std::shared_ptr<MemoryViewBatchIteratorState>(std::move(state));
142✔
2159
    it->current_batch = NULL;
142✔
2160
    it->batch_index = 0;
142✔
2161
    new (&it->json_dict_state) std::shared_ptr<JsonDictIteratorState>();
142✔
2162
    new (&it->json_dict_current_batch) std::shared_ptr<JsonDictBatch>();
142✔
2163
    it->json_dict_index = 0;
142✔
2164
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2165
    new (&it->arrow_state) std::shared_ptr<ArrowIteratorState>();
142✔
2166
#endif
2167
    it->mode = IteratorMode::MEMORYVIEW;
142✔
2168
    return it;
142✔
2169
}
71✔
2170

2171
static TraceReaderIteratorObject *make_json_dict_iterator(
14✔
2172
    std::shared_ptr<JsonDictIteratorState> state) {
2173
    TraceReaderIteratorObject *it =
7✔
2174
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
14✔
2175
            &TraceReaderIteratorType, 0);
2176
    if (!it) return NULL;
14✔
2177
    new (&it->batch_state) std::shared_ptr<MemoryViewBatchIteratorState>();
14✔
2178
    it->current_batch = NULL;
14✔
2179
    it->batch_index = 0;
14✔
2180
    new (&it->json_dict_state)
14✔
2181
        std::shared_ptr<JsonDictIteratorState>(std::move(state));
14✔
2182
    new (&it->json_dict_current_batch) std::shared_ptr<JsonDictBatch>();
14✔
2183
    it->json_dict_index = 0;
14✔
2184
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2185
    new (&it->arrow_state) std::shared_ptr<ArrowIteratorState>();
14✔
2186
#endif
2187
    it->mode = IteratorMode::JSON_DICT;
14✔
2188
    return it;
14✔
2189
}
7✔
2190

2191
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2192
static TraceReaderIteratorObject *make_arrow_iterator(
54✔
2193
    std::shared_ptr<ArrowIteratorState> state) {
2194
    TraceReaderIteratorObject *it =
27✔
2195
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
54✔
2196
            &TraceReaderIteratorType, 0);
2197
    if (!it) return NULL;
54✔
2198
    new (&it->batch_state) std::shared_ptr<MemoryViewBatchIteratorState>();
54✔
2199
    it->current_batch = NULL;
54✔
2200
    it->batch_index = 0;
54✔
2201
    new (&it->json_dict_state) std::shared_ptr<JsonDictIteratorState>();
54✔
2202
    new (&it->json_dict_current_batch) std::shared_ptr<JsonDictBatch>();
54✔
2203
    it->json_dict_index = 0;
54✔
2204
    new (&it->arrow_state)
54✔
2205
        std::shared_ptr<ArrowIteratorState>(std::move(state));
54✔
2206
    it->mode = IteratorMode::ARROW;
54✔
2207
    return it;
54✔
2208
}
27✔
2209
#endif
2210

2211
}  // namespace
2212

2213
static void TraceReader_dealloc(TraceReaderObject *self) {
316✔
2214
    Py_XDECREF(self->file_path);
316✔
2215
    Py_XDECREF(self->index_dir);
316✔
2216
    Py_XDECREF(self->runtime_obj);
316✔
2217
    Py_TYPE(self)->tp_free((PyObject *)self);
316✔
2218
}
316✔
2219

2220
static PyObject *TraceReader_new(PyTypeObject *type, PyObject *args,
316✔
2221
                                 PyObject *kwds) {
2222
    TraceReaderObject *self = (TraceReaderObject *)type->tp_alloc(type, 0);
316✔
2223
    if (self) {
316✔
2224
        self->file_path = NULL;
316✔
2225
        self->index_dir = NULL;
316✔
2226
        self->checkpoint_size = 32 * 1024 * 1024;
316✔
2227
        self->auto_build_index = 0;
316✔
2228
        self->has_index = 0;
316✔
2229
        self->runtime_obj = NULL;
316✔
2230
    }
158✔
2231
    return (PyObject *)self;
316✔
2232
}
2233

2234
static int TraceReader_init(TraceReaderObject *self, PyObject *args,
316✔
2235
                            PyObject *kwds) {
2236
    static const char *kwlist[] = {
2237
        "path",    "index_dir", "checkpoint_size", "auto_build_index",
2238
        "runtime", NULL};
2239

2240
    const char *file_path;
2241
    const char *index_dir = "";
316✔
2242
    std::size_t checkpoint_size = 32 * 1024 * 1024;
316✔
2243
    int auto_build_index = 0;
316✔
2244
    PyObject *runtime_arg = NULL;
316✔
2245

2246
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|snpO", (char **)kwlist,
316!
2247
                                     &file_path, &index_dir, &checkpoint_size,
2248
                                     &auto_build_index, &runtime_arg)) {
2249
        return -1;
×
2250
    }
2251

2252
    if (runtime_arg && runtime_arg != Py_None) {
316✔
2253
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
64!
2254
            // Direct C++ Runtime object
2255
            Py_INCREF(runtime_arg);
×
2256
            self->runtime_obj = runtime_arg;
×
2257
        } else {
2258
            // Python wrapper, extract _native attribute
2259
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
64!
2260
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
64!
2261
                self->runtime_obj = native;  // already incref'd by GetAttr
64✔
2262
            } else {
32✔
2263
                Py_XDECREF(native);
×
2264
                PyErr_SetString(PyExc_TypeError,
×
2265
                                "runtime must be a Runtime instance or None");
2266
                return -1;
×
2267
            }
2268
        }
2269
    }
32✔
2270

2271
    self->file_path = PyUnicode_FromString(file_path);
316!
2272
    if (!self->file_path) return -1;
316✔
2273

2274
    self->index_dir = PyUnicode_FromString(index_dir);
316!
2275
    if (!self->index_dir) {
316✔
2276
        Py_DECREF(self->file_path);
×
2277
        self->file_path = NULL;
×
2278
        return -1;
×
2279
    }
2280

2281
    self->checkpoint_size = checkpoint_size;
316✔
2282
    self->auto_build_index = auto_build_index;
316✔
2283

2284
    try {
2285
        TraceReaderConfig cfg;
316✔
2286
        cfg.file_path = file_path;
316!
2287
        cfg.index_dir = index_dir;
316!
2288
        cfg.checkpoint_size = checkpoint_size;
316✔
2289
        cfg.auto_build_index = auto_build_index != 0;
316✔
2290
        TraceReader probe(std::move(cfg));
316!
2291
        self->has_index = probe.has_index() ? 1 : 0;
316!
2292
    } catch (const std::exception &e) {
316!
2293
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2294
        Py_DECREF(self->file_path);
×
2295
        Py_DECREF(self->index_dir);
×
2296
        self->file_path = NULL;
×
2297
        self->index_dir = NULL;
×
2298
        return -1;
×
2299
    }
×
2300

2301
    return 0;
316✔
2302
}
158✔
2303

2304
static PyObject *TraceReader_iter_lines(TraceReaderObject *self, PyObject *args,
124✔
2305
                                        PyObject *kwds) {
2306
    static const char *kwlist[] = {"start_line",    "end_line",    "start_byte",
2307
                                   "end_byte",      "buffer_size", "query",
2308
                                   "memory_budget", NULL};
2309
    Py_ssize_t start_line = 0, end_line = 0;
124✔
2310
    Py_ssize_t start_byte = 0, end_byte = 0;
124✔
2311
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
124✔
2312
    const char *query_str = NULL;
124✔
2313
    Py_ssize_t memory_budget = 0;
124✔
2314

2315
    if (!PyArg_ParseTupleAndKeywords(
124!
2316
            args, kwds, "|nnnnnzn", (char **)kwlist, &start_line, &end_line,
62✔
2317
            &start_byte, &end_byte, &buffer_size, &query_str, &memory_budget)) {
2318
        return NULL;
×
2319
    }
2320

2321
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
124!
2322
        buffer_size <= 0) {
118!
2323
        PyErr_SetString(
6!
2324
            PyExc_ValueError,
3✔
2325
            "range arguments must be >= 0; buffer_size must be > 0");
2326
        return NULL;
6✔
2327
    }
2328

2329
    TraceReaderConfig cfg;
118✔
2330
    try {
2331
        cfg = build_config(self);
118!
2332
    } catch (const std::exception &e) {
59!
2333
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2334
        return NULL;
×
2335
    }
×
2336

2337
    ReadConfig rc;
118✔
2338
    rc.start_line = static_cast<std::size_t>(start_line);
118✔
2339
    rc.end_line = static_cast<std::size_t>(end_line);
118✔
2340
    rc.start_byte = static_cast<std::size_t>(start_byte);
118✔
2341
    rc.end_byte = static_cast<std::size_t>(end_byte);
118✔
2342
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
118✔
2343
    if (query_str) rc.query = query_str;
118!
2344

2345
    auto state = std::make_shared<MemoryViewBatchIteratorState>();
118!
2346
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
118!
2347
        static_cast<std::size_t>(memory_budget));
59✔
2348

2349
    Runtime *rt = get_runtime(self);
118!
2350
    std::size_t max_workers = rt->threads();
118!
2351
    constexpr std::size_t LINE_BATCH_SIZE = 1024;
118✔
2352
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
118!
2353
        state->memory_budget_bytes, LINE_BATCH_SIZE * ESTIMATED_BYTES_PER_LINE,
118✔
2354
        max_workers);
59✔
2355
    state->channel =
118✔
2356
        dftracer::utils::coro::make_channel<MemoryViewBatchData>(capacity);
177!
2357
    auto *sp = state.get();
118✔
2358

2359
    try {
2360
        bool is_dir = fs::is_directory(cfg.file_path);
118!
2361
        if (is_dir) {
118✔
2362
            auto handle = rt->scope(
6!
2363
                "iter_lines_parallel",
3!
2364
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
27!
2365
                 checkpoint_size = cfg.checkpoint_size,
6✔
2366
                 auto_build_index = cfg.auto_build_index, rc,
9!
2367
                 max_workers](CoroScope &scope) -> CoroTask<void> {
6!
2368
                    co_await produce_lines_parallel(
24!
2369
                        scope, sp, dir_path, index_dir, checkpoint_size,
9!
2370
                        auto_build_index, rc, LINE_BATCH_SIZE, max_workers);
9!
2371
                });
18!
2372
            state->task_future = handle.future;
6!
2373
        } else {
6✔
2374
            auto handle = rt->submit(
112!
2375
                produce_lines_batched(state, state->channel->producer(), cfg,
224!
2376
                                      rc, LINE_BATCH_SIZE),
56!
2377
                "iter_lines");
224!
2378
            state->task_future = handle.future;
112!
2379
        }
112✔
2380
    } catch (const std::exception &e) {
59!
2381
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2382
        return NULL;
×
2383
    }
×
2384

2385
    TraceReaderIteratorObject *it = make_memoryview_iterator(std::move(state));
118!
2386
    return (PyObject *)it;
118✔
2387
}
121✔
2388

2389
static PyObject *TraceReader_iter_raw(TraceReaderObject *self, PyObject *args,
26✔
2390
                                      PyObject *kwds) {
2391
    static const char *kwlist[] = {"start_line", "end_line",    "start_byte",
2392
                                   "end_byte",   "buffer_size", "line_aligned",
2393
                                   "multi_line", "query",       "memory_budget",
2394
                                   NULL};
2395
    Py_ssize_t start_line = 0, end_line = 0;
26✔
2396
    Py_ssize_t start_byte = 0, end_byte = 0;
26✔
2397
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
26✔
2398
    int line_aligned = 1;
26✔
2399
    int multi_line = 1;
26✔
2400
    const char *query_str = NULL;
26✔
2401
    Py_ssize_t memory_budget = 0;
26✔
2402

2403
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnppzn", (char **)kwlist,
26!
2404
                                     &start_line, &end_line, &start_byte,
2405
                                     &end_byte, &buffer_size, &line_aligned,
2406
                                     &multi_line, &query_str, &memory_budget)) {
2407
        return NULL;
×
2408
    }
2409

2410
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
26!
2411
        buffer_size <= 0) {
24!
2412
        PyErr_SetString(
2!
2413
            PyExc_ValueError,
1✔
2414
            "range arguments must be >= 0; buffer_size must be > 0");
2415
        return NULL;
2✔
2416
    }
2417

2418
    TraceReaderConfig cfg;
24✔
2419
    try {
2420
        cfg = build_config(self);
24!
2421
    } catch (const std::exception &e) {
12!
2422
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2423
        return NULL;
×
2424
    }
×
2425

2426
    ReadConfig rc;
24✔
2427
    rc.start_line = static_cast<std::size_t>(start_line);
24✔
2428
    rc.end_line = static_cast<std::size_t>(end_line);
24✔
2429
    rc.start_byte = static_cast<std::size_t>(start_byte);
24✔
2430
    rc.end_byte = static_cast<std::size_t>(end_byte);
24✔
2431
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
24✔
2432
    rc.line_aligned = line_aligned != 0;
24✔
2433
    rc.multi_line = multi_line != 0;
24✔
2434
    if (query_str) rc.query = query_str;
24!
2435

2436
    auto state = std::make_shared<MemoryViewBatchIteratorState>();
24!
2437
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
24!
2438
        static_cast<std::size_t>(memory_budget));
12✔
2439

2440
    Runtime *rt = get_runtime(self);
24!
2441
    std::size_t max_workers = rt->threads();
24!
2442
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
24!
2443
        state->memory_budget_bytes, ESTIMATED_BYTES_PER_RAW_CHUNK, max_workers);
24✔
2444
    state->channel =
24✔
2445
        dftracer::utils::coro::make_channel<MemoryViewBatchData>(capacity);
36!
2446
    auto *sp = state.get();
24✔
2447

2448
    try {
2449
        bool is_dir = fs::is_directory(cfg.file_path);
24!
2450
        if (is_dir) {
24✔
2451
            auto handle = rt->scope(
4!
2452
                "iter_raw_parallel",
2!
2453
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
18!
2454
                 checkpoint_size = cfg.checkpoint_size,
4✔
2455
                 auto_build_index = cfg.auto_build_index, rc,
6!
2456
                 max_workers](CoroScope &scope) -> CoroTask<void> {
4!
2457
                    co_await produce_raw_parallel(
16!
2458
                        scope, sp, dir_path, index_dir, checkpoint_size,
6!
2459
                        auto_build_index, rc, max_workers);
6!
2460
                });
12!
2461
            state->task_future = handle.future;
4!
2462
        } else {
4✔
2463
            auto handle = rt->submit(
20!
2464
                produce_raw_batched(state, state->channel->producer(), cfg, rc),
30!
2465
                "iter_raw");
40!
2466
            state->task_future = handle.future;
20!
2467
        }
20✔
2468
    } catch (const std::exception &e) {
12!
2469
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2470
        return NULL;
×
2471
    }
×
2472

2473
    TraceReaderIteratorObject *it = make_memoryview_iterator(std::move(state));
24!
2474
    return (PyObject *)it;
24✔
2475
}
25✔
2476

2477
static PyObject *TraceReader_read_lines(TraceReaderObject *self, PyObject *args,
92✔
2478
                                        PyObject *kwds) {
2479
    PyObject *iter = TraceReader_iter_lines(self, args, kwds);
92✔
2480
    if (!iter) return NULL;
92✔
2481
    PyObject *list = PySequence_List(iter);
88✔
2482
    Py_DECREF(iter);
44✔
2483
    return list;
88✔
2484
}
46✔
2485

2486
static PyObject *TraceReader_iter_json(TraceReaderObject *self, PyObject *args,
14✔
2487
                                       PyObject *kwds) {
2488
    static const char *kwlist[] = {"start_line", "end_line",      "start_byte",
2489
                                   "end_byte",   "buffer_size",   "query",
2490
                                   "batch_size", "memory_budget", NULL};
2491
    Py_ssize_t start_line = 0, end_line = 0;
14✔
2492
    Py_ssize_t start_byte = 0, end_byte = 0;
14✔
2493
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
14✔
2494
    const char *query_str = NULL;
14✔
2495
    Py_ssize_t batch_size = 1024;
14✔
2496
    Py_ssize_t memory_budget = 0;
14✔
2497

2498
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnznn", (char **)kwlist,
14!
2499
                                     &start_line, &end_line, &start_byte,
2500
                                     &end_byte, &buffer_size, &query_str,
2501
                                     &batch_size, &memory_budget)) {
2502
        return NULL;
×
2503
    }
2504

2505
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
14!
2506
        buffer_size <= 0 || batch_size <= 0) {
14!
2507
        PyErr_SetString(PyExc_ValueError,
×
2508
                        "range arguments must be >= 0; buffer_size and "
2509
                        "batch_size must be > 0");
2510
        return NULL;
×
2511
    }
2512

2513
    TraceReaderConfig cfg;
14✔
2514
    try {
2515
        cfg = build_config(self);
14!
2516
    } catch (const std::exception &e) {
7!
2517
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2518
        return NULL;
×
2519
    }
×
2520

2521
    ReadConfig rc;
14✔
2522
    rc.start_line = static_cast<std::size_t>(start_line);
14✔
2523
    rc.end_line = static_cast<std::size_t>(end_line);
14✔
2524
    rc.start_byte = static_cast<std::size_t>(start_byte);
14✔
2525
    rc.end_byte = static_cast<std::size_t>(end_byte);
14✔
2526
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
14✔
2527
    if (query_str) rc.query = query_str;
14!
2528

2529
    auto state = std::make_shared<JsonDictIteratorState>();
14!
2530
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
14!
2531
        static_cast<std::size_t>(memory_budget));
7✔
2532

2533
    Runtime *rt = get_runtime(self);
14!
2534
    std::size_t max_workers = rt->threads();
14!
2535
    auto bs = static_cast<std::size_t>(batch_size);
14✔
2536
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
14!
2537
        state->memory_budget_bytes, bs * ESTIMATED_BYTES_PER_JSON_EVENT,
14✔
2538
        max_workers);
7✔
2539
    state->channel =
14✔
2540
        dftracer::utils::coro::make_channel<JsonDictBatch>(capacity);
21!
2541
    auto *sp = state.get();
14✔
2542

2543
    try {
2544
        bool is_dir = fs::is_directory(cfg.file_path);
14!
2545
        if (is_dir) {
14✔
2546
            auto handle = rt->scope(
12!
2547
                "iter_json_parallel",
6!
2548
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
54!
2549
                 checkpoint_size = cfg.checkpoint_size,
12✔
2550
                 auto_build_index = cfg.auto_build_index, rc, bs,
18!
2551
                 max_workers](CoroScope &scope) -> CoroTask<void> {
12!
2552
                    co_await produce_json_dicts_parallel(
48!
2553
                        scope, sp, dir_path, index_dir, checkpoint_size,
18!
2554
                        auto_build_index, rc, bs, max_workers);
18!
2555
                });
36!
2556
            state->task_future = handle.future;
12!
2557
        } else {
12✔
2558
            auto handle =
2559
                rt->submit(produce_json_dicts(state, state->channel->producer(),
5!
2560
                                              cfg, rc, bs),
1!
2561
                           "iter_json");
4!
2562
            state->task_future = handle.future;
2!
2563
        }
2✔
2564
    } catch (const std::exception &e) {
7!
2565
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2566
        return NULL;
×
2567
    }
×
2568

2569
    TraceReaderIteratorObject *it = make_json_dict_iterator(std::move(state));
14!
2570
    return (PyObject *)it;
14✔
2571
}
14✔
2572

2573
static PyObject *TraceReader_read_json_py(TraceReaderObject *self,
2✔
2574
                                          PyObject *args, PyObject *kwds) {
2575
    PyObject *iter = TraceReader_iter_json(self, args, kwds);
2✔
2576
    if (!iter) return NULL;
2✔
2577
    PyObject *list = PySequence_List(iter);
2✔
2578
    Py_DECREF(iter);
1✔
2579
    return list;
2✔
2580
}
1✔
2581

2582
static PyObject *TraceReader_read_raw(TraceReaderObject *self, PyObject *args,
8✔
2583
                                      PyObject *kwds) {
2584
    PyObject *iter = TraceReader_iter_raw(self, args, kwds);
8✔
2585
    if (!iter) return NULL;
8✔
2586
    PyObject *list = PySequence_List(iter);
8✔
2587
    Py_DECREF(iter);
4✔
2588
    return list;
8✔
2589
}
4✔
2590

2591
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2592

2593
static PyObject *TraceReader_iter_arrow(TraceReaderObject *self, PyObject *args,
54✔
2594
                                        PyObject *kwds) {
2595
    static const char *kwlist[] = {
2596
        "batch_size", "start_line",    "end_line", "start_byte",
2597
        "end_byte",   "buffer_size",   "query",    "flatten_objects",
2598
        "normalize",  "memory_budget", NULL};
2599
    Py_ssize_t batch_size = 10000;
54✔
2600
    Py_ssize_t start_line = 0, end_line = 0;
54✔
2601
    Py_ssize_t start_byte = 0, end_byte = 0;
54✔
2602
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
54✔
2603
    const char *query_str = NULL;
54✔
2604
    int flatten_objects = 1;  // default: expand top-level objects
54✔
2605
    int normalize = 0;
54✔
2606
    Py_ssize_t memory_budget = 0;
54✔
2607

2608
    if (!PyArg_ParseTupleAndKeywords(
54!
2609
            args, kwds, "|nnnnnnzppn", (char **)kwlist, &batch_size,
27✔
2610
            &start_line, &end_line, &start_byte, &end_byte, &buffer_size,
2611
            &query_str, &flatten_objects, &normalize, &memory_budget)) {
2612
        return NULL;
×
2613
    }
2614

2615
    if (batch_size <= 0) {
54!
2616
        PyErr_SetString(PyExc_ValueError, "batch_size must be > 0");
×
2617
        return NULL;
×
2618
    }
2619
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
54!
2620
        buffer_size <= 0) {
54!
2621
        PyErr_SetString(
×
2622
            PyExc_ValueError,
2623
            "range arguments must be >= 0; buffer_size must be > 0");
2624
        return NULL;
×
2625
    }
2626

2627
    TraceReaderConfig cfg;
54✔
2628
    try {
2629
        cfg = build_config(self);
54!
2630
    } catch (const std::exception &e) {
27!
2631
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2632
        return NULL;
×
2633
    }
×
2634

2635
    ReadConfig rc;
54✔
2636
    rc.start_line = static_cast<std::size_t>(start_line);
54✔
2637
    rc.end_line = static_cast<std::size_t>(end_line);
54✔
2638
    rc.start_byte = static_cast<std::size_t>(start_byte);
54✔
2639
    rc.end_byte = static_cast<std::size_t>(end_byte);
54✔
2640
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
54✔
2641
    rc.flatten_objects = flatten_objects != 0;
54✔
2642
    if (query_str) rc.query = query_str;
54!
2643

2644
    auto state = std::make_shared<ArrowIteratorState>();
54!
2645
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
54!
2646
        static_cast<std::size_t>(memory_budget));
27✔
2647

2648
    Runtime *rt = get_runtime(self);
54!
2649
    std::size_t max_workers = rt->threads();
54!
2650
    auto bs = static_cast<std::size_t>(batch_size);
54✔
2651
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
54!
2652
        state->memory_budget_bytes, bs * ESTIMATED_BYTES_PER_ARROW_ROW,
54✔
2653
        max_workers);
27✔
2654
    state->channel =
54✔
2655
        dftracer::utils::coro::make_channel<ArrowIteratorState::BatchType>(
81!
2656
            capacity);
54✔
2657
    auto *sp = state.get();
54✔
2658

2659
    try {
2660
        bool is_dir = fs::is_directory(cfg.file_path);
54!
2661
        if (is_dir) {
54✔
2662
            auto handle = rt->scope(
10!
2663
                "iter_arrow_parallel",
5!
2664
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
45!
2665
                 checkpoint_size = cfg.checkpoint_size,
10✔
2666
                 auto_build_index = cfg.auto_build_index, rc, bs,
15!
2667
                 norm = normalize != 0,
10✔
2668
                 max_workers](CoroScope &scope) -> CoroTask<void> {
10!
2669
                    co_await produce_arrow_batches_parallel(
40!
2670
                        scope, sp, dir_path, index_dir, checkpoint_size,
15!
2671
                        auto_build_index, rc, bs, norm, max_workers);
15!
2672
                });
30!
2673
            state->task_future = handle.future;
10!
2674
        } else if (normalize) {
54!
2675
            auto handle = rt->submit(
×
2676
                produce_arrow_batches(state, state->channel->producer(), cfg,
×
2677
                                      rc, static_cast<std::size_t>(batch_size),
×
2678
                                      flatten_objects != 0, normalize != 0),
2679
                "iter_arrow");
×
2680
            state->task_future = handle.future;
×
2681
        } else {
×
2682
            std::vector<std::string> files_vec{cfg.file_path};
88!
2683
            auto handle = rt->scope(
44!
2684
                "iter_arrow_parallel",
22!
2685
                [sp, files = std::move(files_vec), index_dir = cfg.index_dir,
198!
2686
                 checkpoint_size = cfg.checkpoint_size,
44✔
2687
                 auto_build_index = cfg.auto_build_index, rc, bs,
66!
2688
                 norm = normalize != 0,
44✔
2689
                 max_workers](CoroScope &scope) mutable -> CoroTask<void> {
44!
2690
                    co_await produce_arrow_batches_for_files(
176!
2691
                        scope, sp, std::move(files), index_dir, checkpoint_size,
66!
2692
                        auto_build_index, rc, bs, norm, max_workers);
66!
2693
                });
132!
2694
            state->task_future = handle.future;
44!
2695
        }
44✔
2696
    } catch (const std::exception &e) {
27!
2697
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2698
        return NULL;
×
2699
    }
×
2700

2701
    TraceReaderIteratorObject *it = make_arrow_iterator(std::move(state));
54!
2702
    return (PyObject *)it;
54✔
2703
}
54✔
2704

2705
// Build ArrowIteratorState + spawn the producer task. Same plumbing as
2706
// TraceReader_iter_arrow but returns the state so callers can wrap it as
2707
// either a per-batch iterator or an ArrowArrayStream.
2708
static std::shared_ptr<ArrowIteratorState> spawn_arrow_producer(
54✔
2709
    TraceReaderObject *self, PyObject *args, PyObject *kwds) {
2710
    static const char *kwlist[] = {
2711
        "batch_size", "start_line",    "end_line", "start_byte",
2712
        "end_byte",   "buffer_size",   "query",    "flatten_objects",
2713
        "normalize",  "memory_budget", NULL};
2714
    Py_ssize_t batch_size = 10000;
54✔
2715
    Py_ssize_t start_line = 0, end_line = 0;
54✔
2716
    Py_ssize_t start_byte = 0, end_byte = 0;
54✔
2717
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
54✔
2718
    const char *query_str = NULL;
54✔
2719
    int flatten_objects = 1;  // default: expand top-level objects
54✔
2720
    int normalize = 0;
54✔
2721
    Py_ssize_t memory_budget = 0;
54✔
2722

2723
    if (!PyArg_ParseTupleAndKeywords(
54!
2724
            args, kwds, "|nnnnnnzppn", (char **)kwlist, &batch_size,
27✔
2725
            &start_line, &end_line, &start_byte, &end_byte, &buffer_size,
2726
            &query_str, &flatten_objects, &normalize, &memory_budget)) {
2727
        return nullptr;
×
2728
    }
2729

2730
    if (batch_size <= 0) {
54!
2731
        PyErr_SetString(PyExc_ValueError, "batch_size must be > 0");
×
2732
        return nullptr;
×
2733
    }
2734
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
54!
2735
        buffer_size <= 0) {
54!
2736
        PyErr_SetString(
×
2737
            PyExc_ValueError,
2738
            "range arguments must be >= 0; buffer_size must be > 0");
2739
        return nullptr;
×
2740
    }
2741

2742
    TraceReaderConfig cfg;
54✔
2743
    try {
2744
        cfg = build_config(self);
54!
2745
    } catch (const std::exception &e) {
27!
2746
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2747
        return nullptr;
×
2748
    }
×
2749

2750
    ReadConfig rc;
54✔
2751
    rc.start_line = static_cast<std::size_t>(start_line);
54✔
2752
    rc.end_line = static_cast<std::size_t>(end_line);
54✔
2753
    rc.start_byte = static_cast<std::size_t>(start_byte);
54✔
2754
    rc.end_byte = static_cast<std::size_t>(end_byte);
54✔
2755
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
54✔
2756
    rc.flatten_objects = flatten_objects != 0;
54✔
2757
    if (query_str) rc.query = query_str;
54!
2758

2759
    auto state = std::make_shared<ArrowIteratorState>();
54!
2760
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
54!
2761
        static_cast<std::size_t>(memory_budget));
27✔
2762

2763
    Runtime *rt = get_runtime(self);
54!
2764
    std::size_t max_workers = rt->threads();
54!
2765
    auto bs = static_cast<std::size_t>(batch_size);
54✔
2766
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
54!
2767
        state->memory_budget_bytes, bs * ESTIMATED_BYTES_PER_ARROW_ROW,
54✔
2768
        max_workers);
27✔
2769
    state->channel =
54✔
2770
        dftracer::utils::coro::make_channel<ArrowIteratorState::BatchType>(
81!
2771
            capacity);
54✔
2772
    auto *sp = state.get();
54✔
2773

2774
    try {
2775
        bool is_dir = fs::is_directory(cfg.file_path);
54!
2776
        if (is_dir) {
54✔
2777
            auto handle = rt->scope(
20!
2778
                "iter_arrow_parallel",
10!
2779
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
90!
2780
                 checkpoint_size = cfg.checkpoint_size,
20✔
2781
                 auto_build_index = cfg.auto_build_index, rc, bs,
30!
2782
                 norm = normalize != 0,
20✔
2783
                 max_workers](CoroScope &scope) -> CoroTask<void> {
20!
2784
                    co_await produce_arrow_batches_parallel(
80!
2785
                        scope, sp, dir_path, index_dir, checkpoint_size,
30!
2786
                        auto_build_index, rc, bs, norm, max_workers);
30!
2787
                });
60!
2788
            state->task_future = handle.future;
20!
2789
        } else {
20✔
2790
            auto handle = rt->submit(
34!
2791
                produce_arrow_batches(state, state->channel->producer(), cfg,
68!
2792
                                      rc, static_cast<std::size_t>(batch_size),
17!
2793
                                      flatten_objects != 0, normalize != 0),
17✔
2794
                "iter_arrow");
68!
2795
            state->task_future = handle.future;
34!
2796
        }
34✔
2797
    } catch (const std::exception &e) {
27!
2798
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2799
        return nullptr;
×
2800
    }
×
2801

2802
    return state;
54✔
2803
}
54✔
2804

2805
static PyObject *TraceReader_iter_arrow_stream(TraceReaderObject *self,
28✔
2806
                                               PyObject *args, PyObject *kwds) {
2807
    auto state = spawn_arrow_producer(self, args, kwds);
28!
2808
    if (!state) return NULL;
28!
2809
    return make_arrow_batch_stream(std::move(state));
28!
2810
}
28✔
2811

2812
static PyObject *TraceReader_read_arrow(TraceReaderObject *self, PyObject *args,
26✔
2813
                                        PyObject *kwds) {
2814
    auto state = spawn_arrow_producer(self, args, kwds);
26!
2815
    if (!state) return NULL;
26!
2816
    PyObject *stream = make_arrow_batch_stream(std::move(state));
26!
2817
    if (!stream) return NULL;
26✔
2818
    return dftracer::utils::python::wrap_arrow_stream_table(stream);
26!
2819
}
26✔
2820

2821
#endif  // DFTRACER_UTILS_ENABLE_ARROW
2822

2823
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
2824

2825
static int parse_str_list_trace(PyObject *obj, std::vector<std::string> &out,
2826
                                const char *param_name) {
2827
    if (!obj || obj == Py_None) return 0;
×
2828
    if (!PyList_Check(obj)) {
×
2829
        PyErr_Format(PyExc_TypeError, "%s must be a list of str", param_name);
2830
        return -1;
2831
    }
2832
    Py_ssize_t n = PyList_Size(obj);
2833
    for (Py_ssize_t i = 0; i < n; i++) {
×
2834
        const char *s = PyUnicode_AsUTF8(PyList_GetItem(obj, i));
×
2835
        if (!s) return -1;
×
2836
        out.emplace_back(s);
×
2837
    }
2838
    return 0;
2839
}
2840

2841
static PyObject *TraceReader_write_arrow(TraceReaderObject *self,
26✔
2842
                                         PyObject *args, PyObject *kwds) {
2843
    static const char *kwlist[] = {"path",        "views",      "chunk_size_mb",
2844
                                   "compression", "batch_size", NULL};
2845
    const char *path = NULL;
26✔
2846
    PyObject *views_obj = Py_None;
26✔
2847
    int chunk_size_mb = 32;
26✔
2848
    const char *compression_str = "zstd";
26✔
2849
    Py_ssize_t batch_size = 10000;
26✔
2850

2851
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|Oisn", (char **)kwlist,
26!
2852
                                     &path, &views_obj, &chunk_size_mb,
2853
                                     &compression_str, &batch_size)) {
2854
        return NULL;
×
2855
    }
2856

2857
    if (chunk_size_mb < 0) {
26✔
2858
        PyErr_SetString(PyExc_ValueError, "chunk_size_mb must be >= 0");
×
2859
        return NULL;
×
2860
    }
2861

2862
    std::vector<ViewDefinition> views;
26✔
2863
    if (views_obj && views_obj != Py_None) {
26!
2864
        if (!PyList_Check(views_obj)) {
10!
2865
            PyErr_SetString(PyExc_TypeError, "views must be a list or None");
×
2866
            return NULL;
×
2867
        }
2868
        Py_ssize_t n = PyList_Size(views_obj);
10!
2869
        for (Py_ssize_t i = 0; i < n; i++) {
22✔
2870
            PyObject *item = PyList_GetItem(views_obj, i);
12!
2871
            ViewDefinition vd;
12✔
2872

2873
            if (PyUnicode_Check(item)) {
12✔
2874
                const char *name = PyUnicode_AsUTF8(item);
2!
2875
                if (!name) return NULL;
2✔
2876
                std::string name_str(name);
2!
2877
                if (name_str == "io") {
2!
2878
                    vd = ViewDefinition::io_view();
2!
2879
                } else if (name_str == "compute") {
1!
2880
                    vd = ViewDefinition::compute_view();
×
2881
                } else if (name_str == "dlio") {
×
2882
                    vd = ViewDefinition::dlio_view();
×
2883
                } else {
2884
                    vd.with_name(name_str);
×
2885
                }
2886
            } else if (PyDict_Check(item)) {
12!
2887
                PyObject *name_obj = PyDict_GetItemString(item, "name");
10!
2888
                if (!name_obj || !PyUnicode_Check(name_obj)) {
10!
2889
                    PyErr_SetString(PyExc_ValueError,
×
2890
                                    "view dict must have 'name' string");
2891
                    return NULL;
×
2892
                }
2893
                vd.with_name(PyUnicode_AsUTF8(name_obj));
10!
2894

2895
                PyObject *query_obj = PyDict_GetItemString(item, "query");
10!
2896
                if (query_obj && query_obj != Py_None) {
10!
2897
                    if (!PyUnicode_Check(query_obj)) {
10!
2898
                        PyErr_SetString(PyExc_ValueError,
×
2899
                                        "view 'query' must be a string");
2900
                        return NULL;
×
2901
                    }
2902
                    vd.with_query(PyUnicode_AsUTF8(query_obj));
10!
2903
                }
5✔
2904

2905
                PyObject *meta_obj =
5✔
2906
                    PyDict_GetItemString(item, "include_metadata");
10!
2907
                if (meta_obj && meta_obj != Py_None) {
10!
2908
                    vd.with_include_metadata(PyObject_IsTrue(meta_obj));
2!
2909
                }
1✔
2910
            } else {
5✔
2911
                PyErr_SetString(PyExc_TypeError,
×
2912
                                "views list must contain strings or dicts");
2913
                return NULL;
×
2914
            }
2915
            views.push_back(std::move(vd));
12!
2916
        }
12✔
2917
    }
5✔
2918

2919
    IpcCompression compression = IpcCompression::ZSTD;
26✔
2920
    if (compression_str) {
26!
2921
        std::string comp_lower(compression_str);
26!
2922
        for (auto &c : comp_lower) c = std::tolower(c);
130!
2923
        if (comp_lower == "none") {
26✔
2924
            compression = IpcCompression::NONE;
2✔
2925
        } else if (comp_lower == "zstd") {
25✔
2926
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
2927
            compression = IpcCompression::ZSTD;
24✔
2928
#else
2929
            PyErr_SetString(
2930
                PyExc_ValueError,
2931
                "ZSTD compression not available (built without ZSTD)");
2932
            return NULL;
2933
#endif
2934
        } else {
12✔
2935
            PyErr_Format(PyExc_ValueError,
×
2936
                         "Unknown compression: %s (use 'none' or 'zstd')",
2937
                         compression_str);
2938
            return NULL;
×
2939
        }
2940
    }
26✔
2941

2942
    int64_t chunk_size_bytes =
26✔
2943
        static_cast<int64_t>(chunk_size_mb) * 1024 * 1024;
26✔
2944

2945
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
26!
2946
    std::string index_path;
26✔
2947
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
26!
2948
    if (idx && idx[0] != '\0') {
26!
2949
        index_path = idx;
×
2950
    }
2951
    std::size_t checkpoint_size = self->checkpoint_size;
26✔
2952

2953
    std::string output_path(path);
26!
2954
    WriteArrowResult result;
26✔
2955
    std::string error_msg;
26✔
2956

2957
    Py_BEGIN_ALLOW_THREADS try {
26!
2958
        Runtime *rt = get_runtime(self);
26!
2959
        result =
13✔
2960
            rt->submit(write_arrow_pipeline(
78!
2961
                           file_path, index_path, checkpoint_size,
13!
2962
                           std::move(views), output_path, chunk_size_bytes,
26!
2963
                           compression, static_cast<std::size_t>(batch_size)),
13✔
2964
                       "write_arrow")
13!
2965
                .get();
26!
2966
    } catch (const std::exception &e) {
13!
2967
        error_msg = e.what();
×
2968
    }
×
2969
    Py_END_ALLOW_THREADS
26!
2970

2971
        if (!error_msg.empty()) {
26!
2972
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
2973
        return NULL;
×
2974
    }
2975

2976
    if (!result.error.empty()) {
26!
2977
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
2978
        return NULL;
×
2979
    }
2980

2981
    // Build result dict
2982
    PyObject *dict = PyDict_New();
26!
2983
    if (!dict) return NULL;
26✔
2984

2985
    // Build files list per partition
2986
    PyObject *partitions_dict = PyDict_New();
26!
2987
    if (!partitions_dict) {
26!
2988
        Py_DECREF(dict);
×
2989
        return NULL;
×
2990
    }
2991

2992
    for (const auto &[partition_name, partition_stats] :
54!
2993
         result.stats.partitions) {
53✔
2994
        PyObject *partition_dict = PyDict_New();
28!
2995
        if (!partition_dict) {
28!
2996
            Py_DECREF(partitions_dict);
×
2997
            Py_DECREF(dict);
×
2998
            return NULL;
×
2999
        }
3000

3001
        PyObject *files_list = PyList_New(0);
28!
3002
        if (!files_list) {
28!
3003
            Py_DECREF(partition_dict);
×
3004
            Py_DECREF(partitions_dict);
×
3005
            Py_DECREF(dict);
×
3006
            return NULL;
×
3007
        }
3008

3009
        for (const auto &f : partition_stats.files) {
44✔
3010
            PyObject *file_str = PyUnicode_FromString(f.c_str());
16!
3011
            if (!file_str || PyList_Append(files_list, file_str) < 0) {
16!
3012
                Py_XDECREF(file_str);
×
3013
                Py_DECREF(files_list);
×
3014
                Py_DECREF(partition_dict);
×
3015
                Py_DECREF(partitions_dict);
×
3016
                Py_DECREF(dict);
×
3017
                return NULL;
×
3018
            }
3019
            Py_DECREF(file_str);
8!
3020
        }
3021

3022
        PyDict_SetItemString(partition_dict, "files", files_list);
28!
3023
        dict_set_steal(partition_dict, "rows",
28!
3024
                       PyLong_FromLongLong(partition_stats.total_rows));
28!
3025
        dict_set_steal(
28!
3026
            partition_dict, "bytes",
14✔
3027
            PyLong_FromLongLong(partition_stats.total_uncompressed_bytes));
28!
3028
        Py_DECREF(files_list);
14!
3029

3030
        PyObject *key = partition_name.empty()
42!
3031
                            ? PyUnicode_FromString("_default")
14!
3032
                            : PyUnicode_FromString(partition_name.c_str());
28!
3033
        PyDict_SetItem(partitions_dict, key, partition_dict);
28!
3034
        Py_DECREF(key);
14!
3035
        Py_DECREF(partition_dict);
14!
3036
    }
3037

3038
    PyDict_SetItemString(dict, "partitions", partitions_dict);
26!
3039
    dict_set_steal(dict, "total_rows",
26!
3040
                   PyLong_FromLongLong(result.stats.total_rows));
26!
3041
    dict_set_steal(dict, "total_bytes",
26!
3042
                   PyLong_FromLongLong(result.stats.total_uncompressed_bytes));
26!
3043
    dict_set_steal(dict, "chunks_scanned",
26!
3044
                   PyLong_FromUnsignedLongLong(result.chunks_scanned));
26!
3045
    dict_set_steal(dict, "chunks_skipped",
26!
3046
                   PyLong_FromUnsignedLongLong(result.chunks_skipped));
26!
3047
    Py_DECREF(partitions_dict);
13!
3048

3049
    return dict;
26✔
3050
}
26✔
3051

3052
static PyObject *TraceReader_get_view_chunks(TraceReaderObject *self,
6✔
3053
                                             PyObject *args, PyObject *kwds) {
3054
    static const char *kwlist[] = {"view", NULL};
3055
    PyObject *view_obj = Py_None;
6✔
3056

3057
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char **)kwlist,
6!
3058
                                     &view_obj)) {
3059
        return NULL;
×
3060
    }
3061

3062
    ViewDefinition view;
6✔
3063
    if (view_obj && view_obj != Py_None) {
6!
3064
        if (PyUnicode_Check(view_obj)) {
2!
3065
            const char *name = PyUnicode_AsUTF8(view_obj);
×
3066
            if (!name) return NULL;
×
3067
            std::string name_str(name);
×
3068
            if (name_str == "io") {
×
3069
                view = ViewDefinition::io_view();
×
3070
            } else if (name_str == "compute") {
×
3071
                view = ViewDefinition::compute_view();
×
3072
            } else if (name_str == "dlio") {
×
3073
                view = ViewDefinition::dlio_view();
×
3074
            } else {
3075
                view.with_name(name_str);
×
3076
            }
3077
        } else if (PyDict_Check(view_obj)) {
2!
3078
            PyObject *name_obj = PyDict_GetItemString(view_obj, "name");
2!
3079
            if (name_obj && PyUnicode_Check(name_obj)) {
2!
3080
                view.with_name(PyUnicode_AsUTF8(name_obj));
2!
3081
            }
1✔
3082
            PyObject *query_obj = PyDict_GetItemString(view_obj, "query");
2!
3083
            if (query_obj && query_obj != Py_None &&
3!
3084
                PyUnicode_Check(query_obj)) {
2✔
3085
                view.with_query(PyUnicode_AsUTF8(query_obj));
2!
3086
            }
1✔
3087
        } else {
1✔
3088
            PyErr_SetString(PyExc_TypeError, "view must be a string or dict");
×
3089
            return NULL;
×
3090
        }
3091
    }
1✔
3092

3093
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
6!
3094
    std::string index_path;
6✔
3095
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
6!
3096
    if (idx && idx[0] != '\0') {
6!
3097
        index_path = idx;
×
3098
    }
3099
    std::size_t checkpoint_size = self->checkpoint_size;
6✔
3100

3101
    GetViewChunksResult result;
6✔
3102
    std::string error_msg;
6✔
3103

3104
    Py_BEGIN_ALLOW_THREADS try {
6!
3105
        Runtime *rt = get_runtime(self);
6!
3106
        result = rt->submit(get_view_chunks_pipeline(file_path, index_path,
18!
3107
                                                     checkpoint_size, view),
3!
3108
                            "get_view_chunks")
3!
3109
                     .get();
6!
3110
    } catch (const std::exception &e) {
3!
3111
        error_msg = e.what();
×
3112
    }
×
3113
    Py_END_ALLOW_THREADS
6!
3114

3115
        if (!error_msg.empty()) {
6!
3116
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
3117
        return NULL;
×
3118
    }
3119

3120
    if (!result.error.empty()) {
6!
3121
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
3122
        return NULL;
×
3123
    }
3124

3125
    PyObject *dict = PyDict_New();
6!
3126
    if (!dict) return NULL;
6!
3127

3128
    PyObject *chunks_list = PyList_New(result.chunks.size());
6!
3129
    if (!chunks_list) {
6!
3130
        Py_DECREF(dict);
×
3131
        return NULL;
×
3132
    }
3133

3134
    for (std::size_t i = 0; i < result.chunks.size(); ++i) {
14✔
3135
        const auto &chunk = result.chunks[i];
8✔
3136
        PyObject *chunk_dict = PyDict_New();
8!
3137
        if (!chunk_dict) {
8!
3138
            Py_DECREF(chunks_list);
×
3139
            Py_DECREF(dict);
×
3140
            return NULL;
×
3141
        }
3142
        dict_set_steal(chunk_dict, "checkpoint_idx",
8!
3143
                       PyLong_FromUnsignedLongLong(chunk.checkpoint_idx));
8!
3144
        dict_set_steal(chunk_dict, "start_byte",
8!
3145
                       PyLong_FromSize_t(chunk.start_byte));
8!
3146
        dict_set_steal(chunk_dict, "end_byte",
8!
3147
                       PyLong_FromSize_t(chunk.end_byte));
8!
3148
        PyList_SetItem(chunks_list, i, chunk_dict);
8!
3149
    }
4✔
3150

3151
    PyDict_SetItemString(dict, "chunks", chunks_list);
6!
3152
    dict_set_steal(dict, "total_checkpoints",
6!
3153
                   PyLong_FromUnsignedLongLong(result.total_checkpoints));
6!
3154
    dict_set_steal(dict, "skipped_checkpoints",
6!
3155
                   PyLong_FromUnsignedLongLong(result.skipped_checkpoints));
6!
3156
    dict_set_steal(dict, "file_may_match",
6!
3157
                   PyBool_FromLong(result.file_may_match ? 1 : 0));
6✔
3158
    Py_DECREF(chunks_list);
3!
3159

3160
    return dict;
6✔
3161
}
6✔
3162

3163
static PyObject *TraceReader_write_view_chunk(TraceReaderObject *self,
2✔
3164
                                              PyObject *args, PyObject *kwds) {
3165
    static const char *kwlist[] = {
3166
        "output_file", "checkpoint_idx", "start_byte", "end_byte",
3167
        "view",        "compression",    "batch_size", NULL};
3168
    const char *output_file = NULL;
2✔
3169
    unsigned long long checkpoint_idx = 0;
2✔
3170
    Py_ssize_t start_byte = 0;
2✔
3171
    Py_ssize_t end_byte = 0;
2✔
3172
    PyObject *view_obj = Py_None;
2✔
3173
    const char *compression_str = "zstd";
2✔
3174
    Py_ssize_t batch_size = 10000;
2✔
3175

3176
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "sKnn|Osn", (char **)kwlist,
2!
3177
                                     &output_file, &checkpoint_idx, &start_byte,
3178
                                     &end_byte, &view_obj, &compression_str,
3179
                                     &batch_size)) {
3180
        return NULL;
×
3181
    }
3182

3183
    IpcCompression compression = IpcCompression::ZSTD;
2✔
3184
    if (compression_str) {
2✔
3185
        std::string comp_lower(compression_str);
2!
3186
        for (auto &c : comp_lower) c = std::tolower(c);
10!
3187
        if (comp_lower == "none") {
2!
3188
            compression = IpcCompression::NONE;
×
3189
        } else if (comp_lower == "zstd") {
2✔
3190
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
3191
            compression = IpcCompression::ZSTD;
2✔
3192
#else
3193
            PyErr_SetString(PyExc_ValueError, "ZSTD compression not available");
3194
            return NULL;
3195
#endif
3196
        }
1✔
3197
    }
2✔
3198

3199
    ViewDefinition view;
2✔
3200
    if (view_obj && view_obj != Py_None) {
2!
3201
        if (PyUnicode_Check(view_obj)) {
×
3202
            const char *name = PyUnicode_AsUTF8(view_obj);
×
3203
            if (!name) return NULL;
×
3204
            std::string name_str(name);
×
3205
            if (name_str == "io") {
×
3206
                view = ViewDefinition::io_view();
×
3207
            } else if (name_str == "compute") {
×
3208
                view = ViewDefinition::compute_view();
×
3209
            } else if (name_str == "dlio") {
×
3210
                view = ViewDefinition::dlio_view();
×
3211
            } else {
3212
                view.with_name(name_str);
×
3213
            }
3214
        } else if (PyDict_Check(view_obj)) {
×
3215
            PyObject *name_obj = PyDict_GetItemString(view_obj, "name");
×
3216
            if (name_obj && PyUnicode_Check(name_obj)) {
×
3217
                view.with_name(PyUnicode_AsUTF8(name_obj));
×
3218
            }
3219
            PyObject *query_obj = PyDict_GetItemString(view_obj, "query");
×
3220
            if (query_obj && query_obj != Py_None &&
×
3221
                PyUnicode_Check(query_obj)) {
×
3222
                view.with_query(PyUnicode_AsUTF8(query_obj));
×
3223
            }
3224
        }
3225
    }
3226

3227
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
2!
3228
    std::string index_path;
2✔
3229
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
2!
3230
    if (idx && idx[0] != '\0') {
2!
3231
        index_path = idx;
×
3232
    }
3233
    std::size_t checkpoint_size = self->checkpoint_size;
2✔
3234

3235
    WriteViewChunkResult result;
2✔
3236
    std::string error_msg;
2✔
3237

3238
    Py_BEGIN_ALLOW_THREADS try {
2!
3239
        Runtime *rt = get_runtime(self);
2!
3240
        result =
1✔
3241
            rt->submit(write_view_chunk_pipeline(
7!
3242
                           file_path, index_path, checkpoint_size, view,
1!
3243
                           checkpoint_idx, static_cast<std::size_t>(start_byte),
1✔
3244
                           static_cast<std::size_t>(end_byte),
1✔
3245
                           std::string(output_file), compression,
3!
3246
                           static_cast<std::size_t>(batch_size)),
1✔
3247
                       "write_view_chunk")
1!
3248
                .get();
2!
3249
    } catch (const std::exception &e) {
1!
3250
        error_msg = e.what();
×
3251
    }
×
3252
    Py_END_ALLOW_THREADS
2!
3253

3254
        if (!error_msg.empty()) {
2!
3255
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
3256
        return NULL;
×
3257
    }
3258

3259
    if (!result.error.empty()) {
2!
3260
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
3261
        return NULL;
×
3262
    }
3263

3264
    PyObject *dict = PyDict_New();
2!
3265
    if (!dict) return NULL;
2✔
3266

3267
    dict_set_steal(dict, "output_file",
2!
3268
                   PyUnicode_FromString(result.output_file.c_str()));
1!
3269
    dict_set_steal(dict, "events_matched",
2!
3270
                   PyLong_FromUnsignedLongLong(result.events_matched));
2!
3271
    dict_set_steal(dict, "events_scanned",
2!
3272
                   PyLong_FromUnsignedLongLong(result.events_scanned));
2!
3273
    dict_set_steal(dict, "rows_written",
2!
3274
                   PyLong_FromLongLong(result.rows_written));
2!
3275
    dict_set_steal(dict, "bytes_written",
2!
3276
                   PyLong_FromLongLong(result.bytes_written));
2!
3277

3278
    return dict;
2✔
3279
}
2✔
3280

3281
static PyObject *TraceReader_write_view_chunks(TraceReaderObject *self,
2✔
3282
                                               PyObject *args, PyObject *kwds) {
3283
    static const char *kwlist[] = {"chunks",      "output_dir", "view",
3284
                                   "compression", "batch_size", NULL};
3285
    PyObject *chunks_list = NULL;
2✔
3286
    const char *output_dir = NULL;
2✔
3287
    PyObject *view_obj = Py_None;
2✔
3288
    const char *compression_str = "zstd";
2✔
3289
    Py_ssize_t batch_size = 10000;
2✔
3290

3291
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "Os|Osn", (char **)kwlist,
2!
3292
                                     &chunks_list, &output_dir, &view_obj,
3293
                                     &compression_str, &batch_size)) {
3294
        return NULL;
×
3295
    }
3296

3297
    if (!PyList_Check(chunks_list)) {
2✔
3298
        PyErr_SetString(PyExc_TypeError, "chunks must be a list");
×
3299
        return NULL;
×
3300
    }
3301

3302
    IpcCompression compression = IpcCompression::ZSTD;
2✔
3303
    if (strcmp(compression_str, "none") == 0) {
2!
3304
        compression = IpcCompression::NONE;
×
3305
    } else if (strcmp(compression_str, "zstd") != 0) {
2!
3306
        PyErr_SetString(PyExc_ValueError,
×
3307
                        "compression must be 'zstd' or 'none'");
3308
        return NULL;
×
3309
    }
3310

3311
    ViewDefinition view;
2✔
3312
    if (view_obj && view_obj != Py_None) {
2!
3313
        if (PyUnicode_Check(view_obj)) {
×
3314
            const char *name = PyUnicode_AsUTF8(view_obj);
×
3315
            if (!name) return NULL;
×
3316
            std::string name_str(name);
×
3317
            if (name_str == "io") {
×
3318
                view = ViewDefinition::io_view();
×
3319
            } else if (name_str == "compute") {
×
3320
                view = ViewDefinition::compute_view();
×
3321
            } else if (name_str == "dlio") {
×
3322
                view = ViewDefinition::dlio_view();
×
3323
            } else {
3324
                view.with_name(name_str);
×
3325
            }
3326
        } else if (PyDict_Check(view_obj)) {
×
3327
            PyObject *name_obj = PyDict_GetItemString(view_obj, "name");
×
3328
            if (name_obj && PyUnicode_Check(name_obj)) {
×
3329
                view.with_name(PyUnicode_AsUTF8(name_obj));
×
3330
            }
3331
            PyObject *query_obj = PyDict_GetItemString(view_obj, "query");
×
3332
            if (query_obj && query_obj != Py_None &&
×
3333
                PyUnicode_Check(query_obj)) {
×
3334
                view.with_query(PyUnicode_AsUTF8(query_obj));
×
3335
            }
3336
        }
3337
    }
3338

3339
    std::vector<ChunkDescriptor> chunks;
2✔
3340
    Py_ssize_t num_chunks = PyList_Size(chunks_list);
2!
3341
    chunks.reserve(static_cast<std::size_t>(num_chunks));
2!
3342

3343
    for (Py_ssize_t i = 0; i < num_chunks; i++) {
8✔
3344
        PyObject *chunk_dict = PyList_GetItem(chunks_list, i);
6!
3345
        if (!PyDict_Check(chunk_dict)) {
6!
3346
            PyErr_SetString(PyExc_TypeError, "each chunk must be a dict");
×
3347
            return NULL;
×
3348
        }
3349

3350
        ChunkDescriptor desc;
6✔
3351

3352
        PyObject *cp_idx = PyDict_GetItemString(chunk_dict, "checkpoint_idx");
6!
3353
        PyObject *start = PyDict_GetItemString(chunk_dict, "start_byte");
6!
3354
        PyObject *end = PyDict_GetItemString(chunk_dict, "end_byte");
6!
3355

3356
        if (!cp_idx || !start || !end) {
6!
3357
            PyErr_SetString(
×
3358
                PyExc_KeyError,
3359
                "chunk must have checkpoint_idx, start_byte, end_byte");
3360
            return NULL;
×
3361
        }
3362

3363
        desc.checkpoint_idx =
6✔
3364
            static_cast<std::uint64_t>(PyLong_AsUnsignedLongLong(cp_idx));
6!
3365
        desc.start_byte =
6✔
3366
            static_cast<std::size_t>(PyLong_AsUnsignedLongLong(start));
6!
3367
        desc.end_byte =
6✔
3368
            static_cast<std::size_t>(PyLong_AsUnsignedLongLong(end));
6!
3369

3370
        char filename[64];
3371
        snprintf(filename, sizeof(filename), "chunk-%05llu.arrow",
9✔
3372
                 (unsigned long long)desc.checkpoint_idx);
6✔
3373
        desc.output_file = std::string(output_dir) + "/" + filename;
6!
3374

3375
        chunks.push_back(std::move(desc));
6!
3376
    }
6✔
3377

3378
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
2!
3379
    std::string index_path;
2✔
3380
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
2!
3381
    if (idx && idx[0] != '\0') {
2!
3382
        index_path = idx;
×
3383
    }
3384
    std::size_t checkpoint_size = self->checkpoint_size;
2✔
3385

3386
    WriteViewChunksResult result;
2✔
3387
    std::string error_msg;
2✔
3388

3389
    Py_BEGIN_ALLOW_THREADS try {
2!
3390
        Runtime *rt = get_runtime(self);
2!
3391
        result = rt->submit(write_view_chunks_pipeline(
7!
3392
                                file_path, index_path, checkpoint_size, view,
1!
3393
                                std::move(chunks), compression,
2✔
3394
                                static_cast<std::size_t>(batch_size)),
1✔
3395
                            "write_view_chunks")
1!
3396
                     .get();
2!
3397
    } catch (const std::exception &e) {
1!
3398
        error_msg = e.what();
×
3399
    }
×
3400
    Py_END_ALLOW_THREADS
2!
3401

3402
        if (!error_msg.empty()) {
2!
3403
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
3404
        return NULL;
×
3405
    }
3406

3407
    PyObject *dict = PyDict_New();
2!
3408
    if (!dict) return NULL;
2✔
3409

3410
    PyObject *results_list =
1✔
3411
        PyList_New(static_cast<Py_ssize_t>(result.results.size()));
2!
3412
    if (!results_list) {
2!
3413
        Py_DECREF(dict);
×
3414
        return NULL;
×
3415
    }
3416

3417
    for (std::size_t i = 0; i < result.results.size(); i++) {
8✔
3418
        const auto &r = result.results[i];
6✔
3419
        PyObject *item = PyDict_New();
6!
3420
        if (!item) {
6!
3421
            Py_DECREF(results_list);
×
3422
            Py_DECREF(dict);
×
3423
            return NULL;
×
3424
        }
3425
        dict_set_steal(item, "output_file",
6!
3426
                       PyUnicode_FromString(r.output_file.c_str()));
3!
3427
        dict_set_steal(item, "rows_written",
6!
3428
                       PyLong_FromLongLong(r.rows_written));
6!
3429
        dict_set_steal(item, "events_matched",
6!
3430
                       PyLong_FromUnsignedLongLong(r.events_matched));
6!
3431
        if (!r.error.empty()) {
6!
NEW
3432
            dict_set_steal(item, "error",
×
3433
                           PyUnicode_FromString(r.error.c_str()));
×
3434
        }
3435
        PyList_SetItem(results_list, static_cast<Py_ssize_t>(i), item);
6!
3436
    }
3✔
3437

3438
    PyDict_SetItemString(dict, "results", results_list);
2!
3439
    Py_DECREF(results_list);
1!
3440
    dict_set_steal(dict, "total_rows", PyLong_FromLongLong(result.total_rows));
2!
3441
    dict_set_steal(dict, "total_events_matched",
2!
3442
                   PyLong_FromLongLong(result.total_events_matched));
2!
3443

3444
    return dict;
2✔
3445
}
2✔
3446

3447
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
3448

3449
static PyObject *TraceReader_enter(TraceReaderObject *self,
116!
3450
                                   PyObject *Py_UNUSED(ignored)) {
3451
    Py_INCREF(self);
58✔
3452
    return (PyObject *)self;
116✔
3453
}
3454

3455
static PyObject *TraceReader_exit(TraceReaderObject *self, PyObject *args) {
114✔
3456
    Py_RETURN_NONE;
114✔
3457
}
3458

3459
static PyObject *TraceReader_get_file_path(TraceReaderObject *self,
14✔
3460
                                           void *closure) {
3461
    Py_INCREF(self->file_path);
14!
3462
    return self->file_path;
14✔
3463
}
3464

3465
static PyObject *TraceReader_get_index_dir(TraceReaderObject *self,
6✔
3466
                                           void *closure) {
3467
    Py_INCREF(self->index_dir);
6✔
3468
    return self->index_dir;
6✔
3469
}
3470

3471
static PyObject *TraceReader_get_has_index(TraceReaderObject *self,
12✔
3472
                                           void *closure) {
3473
    return PyBool_FromLong(self->has_index);
12✔
3474
}
3475

3476
static PyObject *TraceReader_get_num_lines_prop(TraceReaderObject *self,
8✔
3477
                                                void *closure) {
3478
    try {
3479
        TraceReaderConfig cfg = build_config(self);
8!
3480
        TraceReader reader(std::move(cfg));
8!
3481
        std::size_t n = reader.get_num_lines();
8!
3482
        if (n > 0) return PyLong_FromSize_t(n);
8!
3483
    } catch (...) {
8!
3484
    }
×
3485
    PyObject *empty_args = PyTuple_New(0);
8✔
3486
    if (!empty_args) return NULL;
8✔
3487
    PyObject *list = TraceReader_read_lines(self, empty_args, NULL);
8✔
3488
    Py_DECREF(empty_args);
4✔
3489
    if (!list) return NULL;
8✔
3490
    Py_ssize_t n = PyList_GET_SIZE(list);
8✔
3491
    Py_DECREF(list);
4✔
3492
    return PyLong_FromSsize_t(n);
8✔
3493
}
4✔
3494

3495
static PyObject *TraceReader_get_max_bytes(TraceReaderObject *self,
24✔
3496
                                           PyObject *Py_UNUSED(ignored)) {
3497
    try {
3498
        TraceReaderConfig cfg = build_config(self);
24!
3499
        TraceReader reader(std::move(cfg));
24!
3500
        return PyLong_FromSize_t(reader.get_max_bytes());
24!
3501
    } catch (const std::exception &e) {
24!
3502
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
3503
        return NULL;
×
3504
    }
×
3505
}
12✔
3506

3507
static PyObject *TraceReader_get_num_lines(TraceReaderObject *self,
6✔
3508
                                           PyObject *Py_UNUSED(ignored)) {
3509
    try {
3510
        TraceReaderConfig cfg = build_config(self);
6!
3511
        TraceReader reader(std::move(cfg));
6!
3512
        return PyLong_FromSize_t(reader.get_num_lines());
6!
3513
    } catch (const std::exception &e) {
6!
3514
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
3515
        return NULL;
×
3516
    }
×
3517
}
3✔
3518

3519
static PyMethodDef TraceReader_methods[] = {
3520
    {"iter_lines", (PyCFunction)TraceReader_iter_lines,
3521
     METH_VARARGS | METH_KEYWORDS,
3522
     "Return an iterator over decoded lines.\n"
3523
     "\n"
3524
     "Args:\n"
3525
     "    start_line (int): First line (0 = beginning).\n"
3526
     "    end_line (int): Last line (0 = end of file).\n"
3527
     "    start_byte (int): First byte offset (0 = beginning).\n"
3528
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3529
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3530
    {"iter_raw", (PyCFunction)TraceReader_iter_raw,
3531
     METH_VARARGS | METH_KEYWORDS,
3532
     "Return an iterator over raw byte chunks.\n"
3533
     "\n"
3534
     "Args:\n"
3535
     "    start_line (int): First line (0 = beginning).\n"
3536
     "    end_line (int): Last line (0 = end of file).\n"
3537
     "    start_byte (int): First byte offset (0 = beginning).\n"
3538
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3539
     "    buffer_size (int): Internal read buffer size in bytes.\n"
3540
     "    line_aligned (bool): Align chunks to line boundaries.\n"
3541
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
3542
    {"read_lines", (PyCFunction)TraceReader_read_lines,
3543
     METH_VARARGS | METH_KEYWORDS,
3544
     "Read all lines and return as list.\n"
3545
     "\n"
3546
     "Args:\n"
3547
     "    start_line (int): First line (0 = beginning).\n"
3548
     "    end_line (int): Last line (0 = end of file).\n"
3549
     "    start_byte (int): First byte offset (0 = beginning).\n"
3550
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3551
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3552
    {"iter_json", (PyCFunction)TraceReader_iter_json,
3553
     METH_VARARGS | METH_KEYWORDS,
3554
     "Return an iterator over parsed JSON events as Python dicts.\n"
3555
     "\n"
3556
     "Each event is parsed once in C++ (single-pass simdjson ondemand)\n"
3557
     "and yielded as a Python dict. No double-parsing overhead.\n"
3558
     "\n"
3559
     "Args:\n"
3560
     "    start_line (int): First line (0 = beginning).\n"
3561
     "    end_line (int): Last line (0 = end of file).\n"
3562
     "    start_byte (int): First byte offset (0 = beginning).\n"
3563
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3564
     "    buffer_size (int): Internal read buffer size in bytes.\n"
3565
     "    query (str): Optional query filter.\n"
3566
     "    batch_size (int): Events per internal batch (default 1024).\n"},
3567
    {"read_json", (PyCFunction)TraceReader_read_json_py,
3568
     METH_VARARGS | METH_KEYWORDS,
3569
     "Read all events as parsed Python dicts (list).\n"
3570
     "\n"
3571
     "Equivalent to list(iter_json(...)).\n"},
3572
    {"read_raw", (PyCFunction)TraceReader_read_raw,
3573
     METH_VARARGS | METH_KEYWORDS,
3574
     "Read all raw chunks and return as list.\n"
3575
     "\n"
3576
     "Args:\n"
3577
     "    start_line (int): First line (0 = beginning).\n"
3578
     "    end_line (int): Last line (0 = end of file).\n"
3579
     "    start_byte (int): First byte offset (0 = beginning).\n"
3580
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3581
     "    buffer_size (int): Internal read buffer size in bytes.\n"
3582
     "    line_aligned (bool): Align chunks to line boundaries.\n"
3583
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
3584
#ifdef DFTRACER_UTILS_ENABLE_ARROW
3585
    {"iter_arrow", (PyCFunction)TraceReader_iter_arrow,
3586
     METH_VARARGS | METH_KEYWORDS,
3587
     "Return an iterator over Arrow record batches.\n"
3588
     "\n"
3589
     "Args:\n"
3590
     "    batch_size (int): Maximum rows per Arrow batch.\n"
3591
     "    start_line (int): First line (0 = beginning).\n"
3592
     "    end_line (int): Last line (0 = end of file).\n"
3593
     "    start_byte (int): First byte offset (0 = beginning).\n"
3594
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3595
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3596
    {"iter_arrow_stream", (PyCFunction)TraceReader_iter_arrow_stream,
3597
     METH_VARARGS | METH_KEYWORDS,
3598
     "Return an _ArrowBatchStream that exposes Arrow record batches\n"
3599
     "via the Arrow C Data Interface stream protocol\n"
3600
     "(__arrow_c_stream__). PyArrow can drain the producer channel\n"
3601
     "with a single call, without per-batch Python iteration.\n"},
3602
    {"read_arrow", (PyCFunction)TraceReader_read_arrow,
3603
     METH_VARARGS | METH_KEYWORDS,
3604
     "Read all events as a materialized ArrowTable.\n"
3605
     "\n"
3606
     "Args:\n"
3607
     "    batch_size (int): Maximum rows per Arrow batch.\n"
3608
     "    start_line (int): First line (0 = beginning).\n"
3609
     "    end_line (int): Last line (0 = end of file).\n"
3610
     "    start_byte (int): First byte offset (0 = beginning).\n"
3611
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3612
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3613
#endif
3614
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
3615
    {"write_arrow", (PyCFunction)TraceReader_write_arrow,
3616
     METH_VARARGS | METH_KEYWORDS,
3617
     "Write trace data to partitioned Arrow IPC files.\n"
3618
     "\n"
3619
     "Args:\n"
3620
     "    path (str): Output directory path.\n"
3621
     "    partition_by (list[str] or None): Column names to partition by.\n"
3622
     "    num_buckets (int): Number of hash buckets (0 = no bucketing).\n"
3623
     "    chunk_size_mb (int): Max uncompressed MB per file (default 32).\n"
3624
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
3625
     "    batch_size (int): Rows per internal batch (default 10000).\n"
3626
     "    normalize (bool): Use normalized schema (default False).\n"
3627
     "\n"
3628
     "Returns:\n"
3629
     "    dict: Statistics including partitions, total_rows, total_bytes.\n"},
3630
    {"get_view_chunks", (PyCFunction)TraceReader_get_view_chunks,
3631
     METH_VARARGS | METH_KEYWORDS,
3632
     "Get candidate chunks for a view after bloom filter pruning.\n"
3633
     "\n"
3634
     "Args:\n"
3635
     "    view (str or dict): View name ('io', 'compute', 'dlio') or\n"
3636
     "                        dict with 'name' and optional 'query'.\n"
3637
     "\n"
3638
     "Returns:\n"
3639
     "    dict: chunks list, total_checkpoints, skipped_checkpoints.\n"},
3640
    {"write_view_chunk", (PyCFunction)TraceReader_write_view_chunk,
3641
     METH_VARARGS | METH_KEYWORDS,
3642
     "Write a single chunk to an Arrow IPC file.\n"
3643
     "\n"
3644
     "Args:\n"
3645
     "    output_file (str): Path to output Arrow IPC file.\n"
3646
     "    checkpoint_idx (int): Checkpoint index.\n"
3647
     "    start_byte (int): Start byte offset.\n"
3648
     "    end_byte (int): End byte offset.\n"
3649
     "    view (str or dict): View definition.\n"
3650
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
3651
     "    batch_size (int): Events per batch (default 10000).\n"
3652
     "\n"
3653
     "Returns:\n"
3654
     "    dict: output_file, events_matched, rows_written, bytes_written.\n"},
3655
    {"write_view_chunks", (PyCFunction)TraceReader_write_view_chunks,
3656
     METH_VARARGS | METH_KEYWORDS,
3657
     "Write multiple chunks to Arrow IPC files in parallel.\n"
3658
     "\n"
3659
     "All chunks are processed concurrently on the Runtime thread pool.\n"
3660
     "\n"
3661
     "Args:\n"
3662
     "    chunks (list): List of dicts with checkpoint_idx, start_byte, "
3663
     "end_byte.\n"
3664
     "    output_dir (str): Directory for output Arrow IPC files.\n"
3665
     "    view (str or dict): View definition.\n"
3666
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
3667
     "    batch_size (int): Events per batch (default 10000).\n"
3668
     "\n"
3669
     "Returns:\n"
3670
     "    dict: results list, total_rows, total_events_matched.\n"},
3671
#endif
3672
    {"get_max_bytes", (PyCFunction)TraceReader_get_max_bytes, METH_NOARGS,
3673
     "Get the maximum byte position (0 if unknown for compressed\n"
3674
     "files without index)."},
3675
    {"get_num_lines", (PyCFunction)TraceReader_get_num_lines, METH_NOARGS,
3676
     "Get the total number of lines (0 if unknown for files without\n"
3677
     "index)."},
3678
    {"__enter__", (PyCFunction)TraceReader_enter, METH_NOARGS,
3679
     "Enter the runtime context for the with statement."},
3680
    {"__exit__", (PyCFunction)TraceReader_exit, METH_VARARGS,
3681
     "Exit the runtime context for the with statement.\n"
3682
     "\n"
3683
     "TraceReader does not own the shared RocksDB instance for an index path;\n"
3684
     "any shared DB lifetime remains manager-owned on the native side."},
3685
    {NULL}};
3686

3687
static PyGetSetDef TraceReader_getsetters[] = {
3688
    {"path", (getter)TraceReader_get_file_path, NULL,
3689
     "Path to the trace file or directory", NULL},
3690
    {"index_dir", (getter)TraceReader_get_index_dir, NULL,
3691
     "Directory for index files", NULL},
3692
    {"has_index", (getter)TraceReader_get_has_index, NULL,
3693
     "True if a checkpoint index was found", NULL},
3694
    {"num_lines", (getter)TraceReader_get_num_lines_prop, NULL,
3695
     "Total line count (reads all lines if needed)", NULL},
3696
    {NULL}};
3697

3698
PyTypeObject TraceReaderType = {
3699
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.TraceReader",
3700
    sizeof(TraceReaderObject),                /* tp_basicsize */
3701
    0,                                        /* tp_itemsize */
3702
    (destructor)TraceReader_dealloc,          /* tp_dealloc */
3703
    0,                                        /* tp_vectorcall_offset */
3704
    0,                                        /* tp_getattr */
3705
    0,                                        /* tp_setattr */
3706
    0,                                        /* tp_as_async */
3707
    0,                                        /* tp_repr */
3708
    0,                                        /* tp_as_number */
3709
    0,                                        /* tp_as_sequence */
3710
    0,                                        /* tp_as_mapping */
3711
    0,                                        /* tp_hash */
3712
    0,                                        /* tp_call */
3713
    0,                                        /* tp_str */
3714
    0,                                        /* tp_getattro */
3715
    0,                                        /* tp_setattro */
3716
    0,                                        /* tp_as_buffer */
3717
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
3718
    "TraceReader(file_path: str, index_dir: str = '',\n"
3719
    "            checkpoint_size: int = 33554432,\n"
3720
    "            auto_build_index: bool = False,\n"
3721
    "            runtime: Runtime | None = None)\n"
3722
    "--\n"
3723
    "\n"
3724
    "Smart trace file reader that auto-selects sequential or indexed\n"
3725
    "reading based on whether a ``.dftindex`` store exists.\n"
3726
    "\n"
3727
    "Args:\n"
3728
    "    file_path (str): Path to the trace file (.pfw.gz or plain "
3729
    "text).\n"
3730
    "    index_dir (str): Directory to search for ``.dftindex`` "
3731
    "stores.\n"
3732
    "        Empty string (default) searches next to the trace file.\n"
3733
    "    checkpoint_size (int): Checkpoint interval in bytes for index\n"
3734
    "        building (default 32 MB).\n"
3735
    "    auto_build_index (bool): If True, automatically build an "
3736
    "index\n"
3737
    "        when none exists.\n"
3738
    "    runtime (Runtime or None): Runtime instance for thread pool "
3739
    "control.\n"
3740
    "        If None, uses the default global Runtime.\n"
3741
    "\n"
3742
    "Raises:\n"
3743
    "    RuntimeError: If *file_path* does not exist or cannot be "
3744
    "opened.\n",                /* tp_doc */
3745
    0,                          /* tp_traverse */
3746
    0,                          /* tp_clear */
3747
    0,                          /* tp_richcompare */
3748
    0,                          /* tp_weaklistoffset */
3749
    0,                          /* tp_iter */
3750
    0,                          /* tp_iternext */
3751
    TraceReader_methods,        /* tp_methods */
3752
    0,                          /* tp_members */
3753
    TraceReader_getsetters,     /* tp_getset */
3754
    0,                          /* tp_base */
3755
    0,                          /* tp_dict */
3756
    0,                          /* tp_descr_get */
3757
    0,                          /* tp_descr_set */
3758
    0,                          /* tp_dictoffset */
3759
    (initproc)TraceReader_init, /* tp_init */
3760
    0,                          /* tp_alloc */
3761
    TraceReader_new,            /* tp_new */
3762
};
3763

3764
int init_trace_reader(PyObject *m) {
2✔
3765
    if (PyType_Ready(&TraceReaderType) < 0) return -1;
2✔
3766

3767
    Py_INCREF(&TraceReaderType);
1✔
3768
    if (PyModule_AddObject(m, "TraceReader", (PyObject *)&TraceReaderType) <
2✔
3769
        0) {
3770
        Py_DECREF(&TraceReaderType);
3771
        Py_DECREF(m);
3772
        return -1;
×
3773
    }
3774

3775
    return 0;
2✔
3776
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc