• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 27737850362

18 Jun 2026 05:01AM UTC coverage: 49.902% (-2.2%) from 52.111%
27737850362

Pull #79

github

web-flow
Merge 9372743c5 into 53ad1e86c
Pull Request #79: Add Valgrind memory checking (C++, Python, MPI) and fix the bugs it found

16075 of 43887 branches covered (36.63%)

Branch coverage included in aggregate %.

113 of 128 new or added lines in 11 files covered. (88.28%)

668 existing lines in 104 files now uncovered.

21472 of 31355 relevant lines covered (68.48%)

13060.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.45
/src/dftracer/utils/python/trace_reader.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <Python.h>
3
#include <dftracer/utils/core/common/config.h>
4
#include <dftracer/utils/core/common/filesystem.h>
5
#include <dftracer/utils/core/common/memory_budget.h>
6
#include <dftracer/utils/core/coro/channel.h>
7
#include <dftracer/utils/core/coro/task.h>
8
#include <dftracer/utils/core/coro/when_all.h>
9
#include <dftracer/utils/core/tasks/coro_scope.h>
10
#include <dftracer/utils/core/utils/string.h>
11
#include <dftracer/utils/python/arrow_helpers.h>
12
#include <dftracer/utils/python/batch_byte_size.h>
13
#include <dftracer/utils/python/json.h>
14
#include <dftracer/utils/python/py_dict_helpers.h>
15
#include <dftracer/utils/python/runtime.h>
16
#include <dftracer/utils/python/trace_reader.h>
17
#include <dftracer/utils/python/trace_reader_iterator.h>
18
#include <dftracer/utils/utilities/common/query/query.h>
19
#include <dftracer/utils/utilities/composites/dft/indexing/chunk_pruner_utility.h>
20
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
21
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
22
#include <dftracer/utils/utilities/indexer/index_database.h>
23
#include <dftracer/utils/utilities/indexer/internal/helpers.h>
24
#include <dftracer/utils/utilities/reader/trace_reader.h>
25

26
#include <algorithm>
27
#include <cctype>
28
#include <cstddef>
29
#include <cstdio>
30
#include <cstring>
31
#include <exception>
32
#include <memory>
33
#include <string>
34
#include <unordered_map>
35
#include <vector>
36
#ifdef DFTRACER_UTILS_ENABLE_ARROW
37
#include <dftracer/utils/python/arrow_stream_capsule.h>
38
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
39
#include <dftracer/utils/utilities/common/json/parser.h>
40
#endif
41
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
42
#include <dftracer/utils/utilities/common/arrow/ipc_writer.h>
43
#include <dftracer/utils/utilities/common/arrow/partition_writer.h>
44
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
45
#include <dftracer/utils/utilities/composites/dft/metadata_collector_utility.h>
46
#include <dftracer/utils/utilities/composites/dft/views/view_builder_utility.h>
47
#include <dftracer/utils/utilities/composites/dft/views/view_definition.h>
48
#include <dftracer/utils/utilities/composites/dft/views/view_reader_utility.h>
49
#endif
50

51
namespace {
52

53
using dftracer::utils::CoroScope;
54
using dftracer::utils::Runtime;
55
using dftracer::utils::coro::CoroTask;
56
using dftracer::utils::coro::when_all;
57
using dftracer::utils::utilities::filesystem::PatternDirectoryScannerUtility;
58
using dftracer::utils::utilities::filesystem::
59
    PatternDirectoryScannerUtilityInput;
60
using dftracer::utils::utilities::reader::ReadConfig;
61
using dftracer::utils::utilities::reader::TraceReader;
62
using dftracer::utils::utilities::reader::TraceReaderConfig;
63
#ifdef DFTRACER_UTILS_ENABLE_ARROW
64
using dftracer::utils::utilities::common::arrow::ColumnType;
65
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
66
using dftracer::utils::utilities::common::json::JsonParser;
67
using dftracer::utils::utilities::common::json::JsonValueHelper;
68
#endif
69
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
70
using dftracer::utils::utilities::common::arrow::IpcCompression;
71
using dftracer::utils::utilities::common::arrow::PartitionWriter;
72
using dftracer::utils::utilities::common::arrow::PartitionWriteStats;
73
using dftracer::utils::utilities::composites::dft::MetadataCollectorUtility;
74
using dftracer::utils::utilities::composites::dft::
75
    MetadataCollectorUtilityInput;
76
using dftracer::utils::utilities::composites::dft::views::ViewBuilderInput;
77
using dftracer::utils::utilities::composites::dft::views::ViewBuilderUtility;
78
using dftracer::utils::utilities::composites::dft::views::ViewDefinition;
79
using dftracer::utils::utilities::composites::dft::views::ViewReaderInput;
80
using dftracer::utils::utilities::composites::dft::views::ViewReaderUtility;
81
#endif
82

83
using dftracer::utils::python::MemoryViewBatchData;
84
using dftracer::utils::python::MemoryViewBatchIteratorState;
85

86
CoroTask<void> produce_lines_batched(
56!
87
    std::shared_ptr<MemoryViewBatchIteratorState> state,
88
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer,
89
    TraceReaderConfig cfg, ReadConfig rc, std::size_t batch_size) {
90
    auto guard = producer.guard();
91
    try {
92
        TraceReader reader(std::move(cfg));
93
        auto gen = reader.read_lines(rc);
94
        MemoryViewBatchData batch;
95
        std::size_t count = 0;
96

97
        while (auto opt = co_await gen.next()) {
98
            if (state->cancelled.load(std::memory_order_acquire)) break;
99
            auto sv = opt->content;
100
            Py_ssize_t offset = static_cast<Py_ssize_t>(batch.buffer.size());
101
            batch.buffer.insert(batch.buffer.end(), sv.begin(), sv.end());
102
            batch.offsets.push_back(offset);
103
            batch.lengths.push_back(static_cast<Py_ssize_t>(sv.size()));
104
            ++count;
105

106
            if (count >= batch_size) {
107
                auto batch_bytes = dftracer::utils::python::byte_size(batch);
108
                state->bytes_in_queue.fetch_add(batch_bytes,
109
                                                std::memory_order_acq_rel);
110
                if (!co_await producer.send(std::move(batch))) break;
111
                batch = MemoryViewBatchData{};
112
                count = 0;
113
            }
114
        }
115
        if (count > 0 && !state->cancelled.load(std::memory_order_acquire)) {
116
            auto batch_bytes = dftracer::utils::python::byte_size(batch);
117
            state->bytes_in_queue.fetch_add(batch_bytes,
118
                                            std::memory_order_acq_rel);
119
            co_await producer.send(std::move(batch));
120
        }
121
    } catch (...) {
122
        state->set_error(std::current_exception());
123
    }
124
}
112!
125

126
CoroTask<void> produce_raw_batched(
10!
127
    std::shared_ptr<MemoryViewBatchIteratorState> state,
128
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer,
129
    TraceReaderConfig cfg, ReadConfig rc) {
130
    auto guard = producer.guard();
131
    try {
132
        TraceReader reader(std::move(cfg));
133
        auto gen = reader.read_raw(rc);
134
        while (auto opt = co_await gen.next()) {
135
            if (state->cancelled.load(std::memory_order_acquire)) break;
136
            MemoryViewBatchData batch;
137
            batch.buffer.assign(opt->data(), opt->data() + opt->size());
138
            batch.offsets.push_back(0);
139
            batch.lengths.push_back(static_cast<Py_ssize_t>(opt->size()));
140
            auto batch_bytes = dftracer::utils::python::byte_size(batch);
141
            state->bytes_in_queue.fetch_add(batch_bytes,
142
                                            std::memory_order_acq_rel);
143
            if (!co_await producer.send(std::move(batch))) break;
144
        }
145
    } catch (...) {
146
        state->set_error(std::current_exception());
147
    }
148
}
20!
149

150
using dftracer::utils::utilities::common::json::JsonParser;
151
using dftracer::utils::utilities::common::json::JsonValueHelper;
152

153
static constexpr std::size_t ESTIMATED_BYTES_PER_LINE = 256;
154
static constexpr std::size_t ESTIMATED_BYTES_PER_RAW_CHUNK = 4 * 1024 * 1024;
155
static constexpr std::size_t ESTIMATED_BYTES_PER_JSON_EVENT = 512;
156
static constexpr std::size_t ESTIMATED_BYTES_PER_ARROW_ROW = 1024;
157

158
static void insert_simdjson_value(ArgsMap &map, std::string_view key,
2,091✔
159
                                  simdjson::ondemand::value val) {
160
    auto type = val.type();
2,091✔
161
    if (type.error()) return;
2,093!
162
    switch (type.value_unsafe()) {
2,092!
163
        case simdjson::ondemand::json_type::string: {
923✔
164
            auto r = val.get_string();
923✔
165
            if (!r.error()) map.insert(key, std::string(r.value_unsafe()));
936!
166
            break;
936✔
167
        }
168
        case simdjson::ondemand::json_type::number: {
1,165✔
169
            auto ri = val.get_int64();
1,165✔
170
            if (!ri.error()) {
1,157!
171
                auto v = ri.value_unsafe();
1,152✔
172
                if (v >= 0)
1,154!
173
                    map.insert(key, static_cast<std::uint64_t>(v));
1,155!
174
                else
175
                    map.insert(key, v);
×
176
            } else {
177
                auto rd = val.get_double();
×
178
                if (!rd.error()) map.insert(key, rd.value_unsafe());
×
179
            }
180
            break;
1,149✔
181
        }
UNCOV
182
        case simdjson::ondemand::json_type::boolean: {
×
183
            auto r = val.get_bool();
×
184
            if (!r.error()) map.insert(key, r.value_unsafe());
×
185
            break;
×
186
        }
UNCOV
187
        default:
×
188
            break;
×
189
    }
190
}
191

192
static void parse_json_to_event(JsonParser &parser, JsonDictEvent &ev) {
237✔
193
    ev.top.set_valid(true);
237✔
194
    parser.for_each_field(
233!
195
        [&](std::string_view key, simdjson::ondemand::value val) {
1,842✔
196
            if (key == "args") {
1,842✔
197
                auto obj = val.get_object();
230✔
198
                if (!obj.error()) {
229✔
199
                    ev.args.set_valid(true);
227✔
200
                    for (auto field : obj.value_unsafe()) {
699✔
201
                        if (field.error()) continue;
469!
202
                        auto fkey = field.unescaped_key();
471✔
203
                        if (fkey.error()) continue;
464!
204
                        auto fval = field.value();
467✔
205
                        if (fval.error()) continue;
465!
206
                        insert_simdjson_value(ev.args, fkey.value_unsafe(),
466!
207
                                              fval.value_unsafe());
464✔
208
                    }
209
                }
210
            } else {
211
                insert_simdjson_value(ev.top, key, val);
1,622✔
212
            }
213
        });
1,864✔
214
}
233✔
215

216
CoroTask<void> produce_json_dicts(
1!
217
    std::shared_ptr<JsonDictIteratorState> state,
218
    dftracer::utils::coro::ChannelProducer<JsonDictBatch> producer,
219
    TraceReaderConfig cfg, ReadConfig rc, std::size_t batch_size) {
220
    auto guard = producer.guard();
221
    try {
222
        TraceReader reader(std::move(cfg));
223
        auto gen = reader.read_json(rc);
224
        JsonDictBatch batch;
225
        batch.events.reserve(batch_size);
226

227
        while (auto opt = co_await gen.next()) {
228
            if (state->cancelled.load(std::memory_order_acquire)) break;
229

230
            JsonDictEvent ev;
231
            parse_json_to_event(*opt->parser, ev);
232
            batch.events.push_back(std::move(ev));
233

234
            if (batch.events.size() >= batch_size) {
235
                auto batch_bytes = dftracer::utils::python::byte_size(batch);
236
                state->bytes_in_queue.fetch_add(batch_bytes,
237
                                                std::memory_order_acq_rel);
238
                if (!co_await producer.send(std::move(batch))) break;
239
                batch = JsonDictBatch{};
240
                batch.events.reserve(batch_size);
241
            }
242
        }
243
        if (!batch.events.empty() &&
244
            !state->cancelled.load(std::memory_order_acquire)) {
245
            auto batch_bytes = dftracer::utils::python::byte_size(batch);
246
            state->bytes_in_queue.fetch_add(batch_bytes,
247
                                            std::memory_order_acq_rel);
248
            co_await producer.send(std::move(batch));
249
        }
250
    } catch (...) {
251
        state->set_error(std::current_exception());
252
    }
253
}
2!
254

255
static CoroTask<void> send_files_to_channel(
8!
256
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
257
    const std::vector<std::string> *files, std::atomic<bool> *cancelled) {
258
    for (const auto &fp : *files) {
259
        if (cancelled->load(std::memory_order_acquire)) break;
260
        if (!co_await file_chan->send(fp)) break;
261
    }
262
    file_chan->close();
263
    co_return;
264
}
16!
265

266
static CoroTask<void> json_dict_file_worker(
9!
267
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
268
    dftracer::utils::coro::Channel<JsonDictBatch> *out_chan,
269
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
270
    ReadConfig rc, std::size_t batch_size, std::atomic<bool> *cancelled) {
271
    dftracer::utils::coro::ChannelProducer<JsonDictBatch> producer(out_chan);
272
    auto guard = producer.guard();
273

274
    while (auto file_path = co_await file_chan->receive()) {
275
        if (cancelled->load(std::memory_order_acquire)) co_return;
276
        TraceReaderConfig cfg;
277
        cfg.file_path = std::move(*file_path);
278
        cfg.index_dir = index_dir;
279
        cfg.checkpoint_size = checkpoint_size;
280
        cfg.auto_build_index = auto_build_index;
281

282
        TraceReader reader(std::move(cfg));
283
        auto gen = reader.read_json(rc);
284
        JsonDictBatch batch;
285
        batch.events.reserve(batch_size);
286

287
        while (auto opt = co_await gen.next()) {
288
            if (cancelled->load(std::memory_order_acquire)) co_return;
289
            JsonDictEvent ev;
290
            parse_json_to_event(*opt->parser, ev);
291
            batch.events.push_back(std::move(ev));
292
            if (batch.events.size() >= batch_size) {
293
                if (!co_await producer.send(std::move(batch))) co_return;
294
                batch = JsonDictBatch{};
295
                batch.events.reserve(batch_size);
296
            }
297
        }
298
        if (!batch.events.empty()) {
299
            if (!co_await producer.send(std::move(batch))) co_return;
300
        }
301
    }
302
    co_return;
303
}
18!
304

305
static CoroTask<void> spawn_json_dict_producers(
5!
306
    CoroScope &child, dftracer::utils::coro::Channel<JsonDictBatch> *out_chan,
307
    const std::vector<std::string> *files, const std::string *index_dir,
308
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
309
    std::size_t batch_size, std::atomic<bool> *cancelled_ptr,
310
    std::size_t max_workers) {
311
    std::size_t num_workers = std::min(files->size(), max_workers);
312
    auto file_chan =
313
        dftracer::utils::coro::make_channel<std::string>(num_workers);
314

315
    for (std::size_t i = 0; i < num_workers; ++i) {
316
        child.spawn([out_chan, fc = file_chan, idx = *index_dir,
9✔
317
                     checkpoint_size, auto_build_index, r = *rc, batch_size,
318
                     cancelled_ptr](CoroScope &) {
27✔
319
            return json_dict_file_worker(fc, out_chan, idx, checkpoint_size,
18✔
320
                                         auto_build_index, r, batch_size,
9✔
321
                                         cancelled_ptr);
9!
322
        });
323
    }
324

325
    child.spawn([fc = file_chan, files, cancelled_ptr](CoroScope &) {
5✔
326
        return send_files_to_channel(fc, files, cancelled_ptr);
5!
327
    });
328
    co_return;
329
}
10!
330

331
static CoroTask<void> produce_json_dicts_parallel(
6!
332
    CoroScope &scope, JsonDictIteratorState *sp, std::string dir_path,
333
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
334
    ReadConfig rc, std::size_t batch_size, std::size_t max_workers) {
335
    try {
336
        PatternDirectoryScannerUtility scanner;
337
        auto scan_input = PatternDirectoryScannerUtilityInput(
338
            dir_path, {".pfw", ".pfw.gz"}, true, false);
339
        auto entries = co_await scope.spawn(scanner, scan_input);
340

341
        std::vector<std::string> files;
342
        files.reserve(entries.size());
343
        for (auto &e : entries) files.push_back(e.path.string());
344
        std::sort(files.begin(), files.end());
345

346
        if (files.empty()) {
347
            sp->channel->close();
348
            co_return;
349
        }
350

351
        auto *chan_ptr = sp->channel.get();
352
        auto *cancelled_ptr = &sp->cancelled;
353

354
        co_await scope.scope([chan_ptr, &files, &index_dir, checkpoint_size,
5!
355
                              auto_build_index, &rc, batch_size, cancelled_ptr,
356
                              max_workers](CoroScope &child) -> CoroTask<void> {
357
            co_await spawn_json_dict_producers(
358
                child, chan_ptr, &files, &index_dir, checkpoint_size,
359
                auto_build_index, &rc, batch_size, cancelled_ptr, max_workers);
360
        });
10!
361
    } catch (...) {
362
        sp->set_error(std::current_exception());
363
    }
364
}
12!
365

366
static CoroTask<void> lines_file_worker(
4!
367
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
368
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
369
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
370
    ReadConfig rc, std::size_t batch_size, std::atomic<bool> *cancelled) {
371
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer(
372
        out_chan);
373
    auto guard = producer.guard();
374

375
    while (auto file_path = co_await file_chan->receive()) {
376
        if (cancelled->load(std::memory_order_acquire)) co_return;
377
        TraceReaderConfig cfg;
378
        cfg.file_path = std::move(*file_path);
379
        cfg.index_dir = index_dir;
380
        cfg.checkpoint_size = checkpoint_size;
381
        cfg.auto_build_index = auto_build_index;
382

383
        TraceReader reader(std::move(cfg));
384
        auto gen = reader.read_lines(rc);
385
        MemoryViewBatchData batch;
386
        std::size_t count = 0;
387

388
        while (auto opt = co_await gen.next()) {
389
            if (cancelled->load(std::memory_order_acquire)) co_return;
390
            auto sv = opt->content;
391
            Py_ssize_t offset = static_cast<Py_ssize_t>(batch.buffer.size());
392
            batch.buffer.insert(batch.buffer.end(), sv.begin(), sv.end());
393
            batch.offsets.push_back(offset);
394
            batch.lengths.push_back(static_cast<Py_ssize_t>(sv.size()));
395
            ++count;
396
            if (count >= batch_size) {
397
                if (!co_await producer.send(std::move(batch))) co_return;
398
                batch = MemoryViewBatchData{};
399
                count = 0;
400
            }
401
        }
402
        if (count > 0) {
403
            if (!co_await producer.send(std::move(batch))) co_return;
404
        }
405
    }
406
    co_return;
407
}
8!
408

409
static CoroTask<void> spawn_lines_producers(
2!
410
    CoroScope &child,
411
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
412
    const std::vector<std::string> *files, const std::string *index_dir,
413
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
414
    std::size_t batch_size, std::atomic<bool> *cancelled_ptr,
415
    std::size_t max_workers) {
416
    std::size_t num_workers = std::min(files->size(), max_workers);
417
    auto file_chan =
418
        dftracer::utils::coro::make_channel<std::string>(num_workers);
419

420
    for (std::size_t i = 0; i < num_workers; ++i) {
421
        child.spawn([out_chan, fc = file_chan, idx = *index_dir,
4✔
422
                     checkpoint_size, auto_build_index, r = *rc, batch_size,
423
                     cancelled_ptr](CoroScope &) {
12✔
424
            return lines_file_worker(fc, out_chan, idx, checkpoint_size,
8✔
425
                                     auto_build_index, r, batch_size,
4✔
426
                                     cancelled_ptr);
4!
427
        });
428
    }
429

430
    child.spawn([fc = file_chan, files, cancelled_ptr](CoroScope &) {
2✔
431
        return send_files_to_channel(fc, files, cancelled_ptr);
2!
432
    });
433
    co_return;
434
}
4!
435

436
static CoroTask<void> produce_lines_parallel(
3!
437
    CoroScope &scope, MemoryViewBatchIteratorState *sp, std::string dir_path,
438
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
439
    ReadConfig rc, std::size_t batch_size, std::size_t max_workers) {
440
    try {
441
        PatternDirectoryScannerUtility scanner;
442
        auto scan_input = PatternDirectoryScannerUtilityInput(
443
            dir_path, {".pfw", ".pfw.gz"}, true, false);
444
        auto entries = co_await scope.spawn(scanner, scan_input);
445

446
        std::vector<std::string> files;
447
        files.reserve(entries.size());
448
        for (auto &e : entries) files.push_back(e.path.string());
449
        std::sort(files.begin(), files.end());
450

451
        if (files.empty()) {
452
            sp->channel->close();
453
            co_return;
454
        }
455

456
        auto *chan_ptr = sp->channel.get();
457
        auto *cancelled_ptr = &sp->cancelled;
458

459
        co_await scope.scope([chan_ptr, &files, &index_dir, checkpoint_size,
2!
460
                              auto_build_index, &rc, batch_size, cancelled_ptr,
461
                              max_workers](CoroScope &child) -> CoroTask<void> {
462
            co_await spawn_lines_producers(
463
                child, chan_ptr, &files, &index_dir, checkpoint_size,
464
                auto_build_index, &rc, batch_size, cancelled_ptr, max_workers);
465
        });
4!
466
    } catch (...) {
467
        sp->set_error(std::current_exception());
468
    }
469
}
6!
470

471
static CoroTask<void> raw_file_worker(
2!
472
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
473
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
474
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
475
    ReadConfig rc, std::atomic<bool> *cancelled) {
476
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer(
477
        out_chan);
478
    auto guard = producer.guard();
479

480
    while (auto file_path = co_await file_chan->receive()) {
481
        if (cancelled->load(std::memory_order_acquire)) co_return;
482
        TraceReaderConfig cfg;
483
        cfg.file_path = std::move(*file_path);
484
        cfg.index_dir = index_dir;
485
        cfg.checkpoint_size = checkpoint_size;
486
        cfg.auto_build_index = auto_build_index;
487

488
        TraceReader reader(std::move(cfg));
489
        auto gen = reader.read_raw(rc);
490
        while (auto opt = co_await gen.next()) {
491
            if (cancelled->load(std::memory_order_acquire)) co_return;
492
            MemoryViewBatchData batch;
493
            batch.buffer.assign(opt->data(), opt->data() + opt->size());
494
            batch.offsets.push_back(0);
495
            batch.lengths.push_back(static_cast<Py_ssize_t>(opt->size()));
496
            if (!co_await producer.send(std::move(batch))) co_return;
497
        }
498
    }
499
    co_return;
500
}
4!
501

502
static CoroTask<void> spawn_raw_producers(
1!
503
    CoroScope &child,
504
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
505
    const std::vector<std::string> *files, const std::string *index_dir,
506
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
507
    std::atomic<bool> *cancelled_ptr, std::size_t max_workers) {
508
    std::size_t num_workers = std::min(files->size(), max_workers);
509
    auto file_chan =
510
        dftracer::utils::coro::make_channel<std::string>(num_workers);
511

512
    for (std::size_t i = 0; i < num_workers; ++i) {
513
        child.spawn([out_chan, fc = file_chan, idx = *index_dir,
2✔
514
                     checkpoint_size, auto_build_index, r = *rc,
515
                     cancelled_ptr](CoroScope &) {
6✔
516
            return raw_file_worker(fc, out_chan, idx, checkpoint_size,
4✔
517
                                   auto_build_index, r, cancelled_ptr);
2!
518
        });
519
    }
520

521
    child.spawn([fc = file_chan, files, cancelled_ptr](CoroScope &) {
1✔
522
        return send_files_to_channel(fc, files, cancelled_ptr);
1!
523
    });
524
    co_return;
525
}
2!
526

527
static CoroTask<void> produce_raw_parallel(
2!
528
    CoroScope &scope, MemoryViewBatchIteratorState *sp, std::string dir_path,
529
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
530
    ReadConfig rc, std::size_t max_workers) {
531
    try {
532
        PatternDirectoryScannerUtility scanner;
533
        auto scan_input = PatternDirectoryScannerUtilityInput(
534
            dir_path, {".pfw", ".pfw.gz"}, true, false);
535
        auto entries = co_await scope.spawn(scanner, scan_input);
536

537
        std::vector<std::string> files;
538
        files.reserve(entries.size());
539
        for (auto &e : entries) files.push_back(e.path.string());
540
        std::sort(files.begin(), files.end());
541

542
        if (files.empty()) {
543
            sp->channel->close();
544
            co_return;
545
        }
546

547
        auto *chan_ptr = sp->channel.get();
548
        auto *cancelled_ptr = &sp->cancelled;
549

550
        co_await scope.scope([chan_ptr, &files, &index_dir, checkpoint_size,
1!
551
                              auto_build_index, &rc, cancelled_ptr,
552
                              max_workers](CoroScope &child) -> CoroTask<void> {
553
            co_await spawn_raw_producers(child, chan_ptr, &files, &index_dir,
554
                                         checkpoint_size, auto_build_index, &rc,
555
                                         cancelled_ptr, max_workers);
556
        });
2!
557
    } catch (...) {
558
        sp->set_error(std::current_exception());
559
    }
560
}
4!
561

562
#ifdef DFTRACER_UTILS_ENABLE_ARROW
563

564
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
565
using dftracer::utils::utilities::common::arrow::ColumnType;
566
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
567

568
// Bump arena for string_views that must survive until builder.finish().
569
struct StringArena {
570
    static constexpr std::size_t BLOCK_SIZE = 64 * 1024;
571
    std::vector<std::vector<char>> blocks;
572
    std::size_t pos = 0;
573

574
    StringArena() { blocks.emplace_back(BLOCK_SIZE); }
×
575

576
    std::string_view push(const char *data, std::size_t len) {
×
577
        if (pos + len > blocks.back().size()) {
×
578
            blocks.emplace_back(std::max(BLOCK_SIZE, len));
×
579
            pos = 0;
×
580
        }
581
        char *dst = blocks.back().data() + pos;
×
582
        std::memcpy(dst, data, len);
×
583
        pos += len;
×
584
        return {dst, len};
×
585
    }
586

587
    void clear() {
×
588
        if (blocks.size() > 1) blocks.resize(1);
×
589
        pos = 0;
×
590
    }
×
591
};
592

593
// --- Row type constants (must match Python TYPE_* constants) ---
594
enum RowType : int8_t {
595
    ROW_EVENT = 0,
596
    ROW_FILE_HASH = 1,
597
    ROW_HOST_HASH = 2,
598
    ROW_STRING_HASH = 3,
599
    ROW_METADATA = 4,
600
    ROW_PROC_METADATA = 5,
601
    ROW_PROFILE = 6,
602
    ROW_SYSTEM = 7,
603
};
604

605
// --- IO category constants (must match Python IOCategory values) ---
606
enum IOCat : int8_t {
607
    IO_READ = 1,
608
    IO_WRITE = 2,
609
    IO_METADATA = 3,
610
    IO_PCTL = 4,
611
    IO_IPC = 5,
612
    IO_OTHER = 6,
613
    IO_SYNC = 7,
614
};
615

616
static int8_t get_io_cat(std::string_view func) {
×
617
    // READ
618
    if (func == "fread" || func == "pread" || func == "preadv" ||
×
619
        func == "read" || func == "readv")
×
620
        return IO_READ;
×
621
    // WRITE
622
    if (func == "fwrite" || func == "pwrite" || func == "pwritev" ||
×
623
        func == "write" || func == "writev")
×
624
        return IO_WRITE;
×
625
    // SYNC
626
    if (func == "fsync" || func == "fdatasync" || func == "msync" ||
×
627
        func == "sync")
×
628
        return IO_SYNC;
×
629
    // PCTL
630
    if (func == "exec" || func == "exit" || func == "fork" || func == "kill" ||
×
631
        func == "pipe" || func == "wait")
×
632
        return IO_PCTL;
×
633
    // IPC
634
    if (func == "msgctl" || func == "msgget" || func == "msgrcv" ||
×
635
        func == "msgsnd" || func == "semctl" || func == "semget" ||
×
636
        func == "semop" || func == "shmat" || func == "shmctl" ||
×
637
        func == "shmdt" || func == "shmget")
×
638
        return IO_IPC;
×
639
    // METADATA
640
    if (func == "__fxstat" || func == "__fxstat64" || func == "__lxstat" ||
×
641
        func == "__lxstat64" || func == "__xstat" || func == "__xstat64" ||
×
642
        func == "access" || func == "close" || func == "closedir" ||
×
643
        func == "fclose" || func == "fcntl" || func == "fopen" ||
×
644
        func == "fopen64" || func == "fseek" || func == "fstat" ||
×
645
        func == "fstatat" || func == "ftell" || func == "ftruncate" ||
×
646
        func == "link" || func == "lseek" || func == "lseek64" ||
×
647
        func == "mkdir" || func == "open" || func == "open64" ||
×
648
        func == "opendir" || func == "readdir" || func == "readlink" ||
×
649
        func == "remove" || func == "rename" || func == "rmdir" ||
×
650
        func == "seek" || func == "stat" || func == "unlink")
×
651
        return IO_METADATA;
×
652
    return IO_OTHER;
×
653
}
654

655
static bool str_iequal(std::string_view a, const char *b) {
×
656
    std::size_t len = std::strlen(b);
×
657
    if (a.size() != len) return false;
×
658
    for (std::size_t i = 0; i < len; ++i) {
×
659
        if (std::tolower(static_cast<unsigned char>(a[i])) !=
×
660
            static_cast<unsigned char>(b[i]))
×
661
            return false;
×
662
    }
663
    return true;
×
664
}
665

666
static bool str_contains_lower(std::string_view s, const char *needle) {
×
667
    std::size_t nlen = std::strlen(needle);
×
668
    if (s.size() < nlen) return false;
×
669
    for (std::size_t i = 0; i <= s.size() - nlen; ++i) {
×
670
        bool match = true;
×
671
        for (std::size_t j = 0; j < nlen; ++j) {
×
672
            if (std::tolower(static_cast<unsigned char>(s[i + j])) !=
×
673
                static_cast<unsigned char>(needle[j])) {
×
674
                match = false;
×
675
                break;
×
676
            }
677
        }
678
        if (match) return true;
×
679
    }
680
    return false;
×
681
}
682

683
// Normalize a raw JSON row (parsed with simdjson) into the semantic
684
// output schema.  Appends one row to `builder` with the full set of output
685
// columns.  Returns false if the row should be skipped (no valid name).
686
static bool normalize_row(RecordBatchBuilder &builder, StringArena &arena,
×
687
                          JsonParser &parser) {
688
    using SVH = JsonValueHelper;
689

690
    // --- Extract top-level fields ---
691
    auto ph = parser.get_string("ph").value_or(std::string_view{});
×
692
    auto name_sv = parser.get_string("name").value_or(std::string_view{});
×
693
    auto cat_sv = parser.get_string("cat").value_or(std::string_view{});
×
694
    auto pid_opt = parser.get_int64("pid");
×
695
    auto tid_opt = parser.get_int64("tid");
×
696
    auto ts_opt = parser.get_int64("ts");
×
697
    auto dur_opt = parser.get_int64("dur");
×
698

699
    // Helper lambdas to access args fields (need to rewind after each access)
700
    // We'll do a single pass over args instead
701
    std::optional<std::string_view> args_name, args_value, args_hhash,
×
702
        args_fhash;
×
703
    std::optional<int64_t> args_epoch, args_step, args_size_sum, args_ret;
×
704
    std::optional<int64_t> args_offset, args_image_idx, args_image_size;
×
705
    std::unordered_map<std::string, int64_t> args_int_map;
×
706
    std::unordered_map<std::string, double> args_float_map;
×
707

708
    parser.rewind();
×
709
    parser.for_each_field(
×
710
        "args", [&](std::string_view key, simdjson::ondemand::value val) {
×
711
            if (key == "name") {
×
712
                if (auto s = SVH::get_string(val)) args_name = s;
×
713
            } else if (key == "value") {
×
714
                if (auto s = SVH::get_string(val)) args_value = s;
×
715
            } else if (key == "hhash") {
×
716
                if (auto s = SVH::get_string(val)) args_hhash = s;
×
717
            } else if (key == "fhash") {
×
718
                if (auto s = SVH::get_string(val)) args_fhash = s;
×
719
            } else if (key == "epoch") {
×
720
                if (auto i = SVH::get_int64(val)) args_epoch = i;
×
721
            } else if (key == "step") {
×
722
                if (auto i = SVH::get_int64(val)) args_step = i;
×
723
            } else if (key == "size_sum") {
×
724
                if (auto i = SVH::get_int64(val)) args_size_sum = i;
×
725
            } else if (key == "ret") {
×
726
                if (auto i = SVH::get_int64(val)) args_ret = i;
×
727
            } else if (key == "offset") {
×
728
                if (auto i = SVH::get_int64(val)) args_offset = i;
×
729
            } else if (key == "image_idx") {
×
730
                if (auto i = SVH::get_int64(val)) args_image_idx = i;
×
731
            } else if (key == "image_size") {
×
732
                if (auto i = SVH::get_int64(val)) args_image_size = i;
×
733
            } else {
734
                // Store other int/float args for profile/sys columns
735
                if (auto i = SVH::get_int64(val)) {
×
736
                    args_int_map[std::string(key)] = *i;
×
737
                } else if (auto d = SVH::get_double(val)) {
×
738
                    args_float_map[std::string(key)] = *d;
×
739
                }
740
            }
741
        });
×
742

743
    // --- Type classification ---
744
    bool is_M = (ph == "M");
×
745
    bool is_C = (ph == "C");
×
746
    bool is_event = !is_M && !is_C;
×
747

748
    int8_t row_type = ROW_EVENT;
×
749
    if (is_M) {
×
750
        if (name_sv == "FH")
×
751
            row_type = ROW_FILE_HASH;
×
752
        else if (name_sv == "HH")
×
753
            row_type = ROW_HOST_HASH;
×
754
        else if (name_sv == "SH")
×
755
            row_type = ROW_STRING_HASH;
×
756
        else if (name_sv == "PR")
×
757
            row_type = ROW_PROC_METADATA;
×
758
        else
759
            row_type = ROW_METADATA;
×
760
    } else if (is_C) {
×
761
        row_type = str_iequal(cat_sv, "sys") ? ROW_SYSTEM : ROW_PROFILE;
×
762
    }
763
    bool is_hash = (row_type >= ROW_FILE_HASH && row_type <= ROW_STRING_HASH) ||
×
764
                   row_type == ROW_PROC_METADATA;
765
    bool is_profile = (row_type == ROW_PROFILE);
×
766
    bool is_sys = (row_type == ROW_SYSTEM);
×
767

768
    // Name: metadata rows use args.name if available
769
    std::string_view out_name = name_sv;
×
770
    if (is_M && args_name && !args_name->empty()) {
×
771
        out_name = *args_name;
×
772
    }
773
    if (out_name.empty()) return false;  // skip rows without name
×
774

775
    // --- Declare all output columns ---
776
    auto ci_type = builder.add_or_get_column("type", ColumnType::INT64);
×
777
    auto ci_cat = builder.add_or_get_column("cat", ColumnType::STRING);
×
778
    auto ci_name = builder.add_or_get_column("name", ColumnType::STRING);
×
779
    auto ci_pid = builder.add_or_get_column("pid", ColumnType::INT64);
×
780
    auto ci_tid = builder.add_or_get_column("tid", ColumnType::INT64);
×
781
    auto ci_hash = builder.add_or_get_column("hash", ColumnType::STRING);
×
782
    auto ci_value = builder.add_or_get_column("value", ColumnType::STRING);
×
783
    auto ci_host_hash =
784
        builder.add_or_get_column("host_hash", ColumnType::STRING);
×
785
    auto ci_file_hash =
786
        builder.add_or_get_column("file_hash", ColumnType::STRING);
×
787
    auto ci_epoch = builder.add_or_get_column("epoch", ColumnType::INT64);
×
788
    auto ci_step = builder.add_or_get_column("step", ColumnType::INT64);
×
789
    auto ci_ts = builder.add_or_get_column("ts", ColumnType::INT64);
×
790
    auto ci_dur = builder.add_or_get_column("dur", ColumnType::INT64);
×
791
    auto ci_te = builder.add_or_get_column("te", ColumnType::INT64);
×
792
    [[maybe_unused]] auto ci_trange =
793
        builder.add_or_get_column("trange", ColumnType::INT64);
×
794
    auto ci_io_cat = builder.add_or_get_column("io_cat", ColumnType::INT64);
×
795
    auto ci_size = builder.add_or_get_column("size", ColumnType::INT64);
×
796
    auto ci_offset = builder.add_or_get_column("offset", ColumnType::INT64);
×
797
    auto ci_image_id = builder.add_or_get_column("image_id", ColumnType::INT64);
×
798

799
    // --- Populate core columns ---
800
    builder.append_int64(ci_type, row_type);
×
801

802
    // cat (lowercased) - write into arena
803
    if (!cat_sv.empty()) {
×
804
        char lbuf[256];
805
        std::size_t clen = std::min(cat_sv.size(), sizeof(lbuf));
×
806
        for (std::size_t i = 0; i < clen; ++i)
×
807
            lbuf[i] = static_cast<char>(
×
808
                std::tolower(static_cast<unsigned char>(cat_sv[i])));
×
809
        builder.append_string(ci_cat, arena.push(lbuf, clen));
×
810
    } else {
811
        builder.append_null(ci_cat);
×
812
    }
813

814
    builder.append_string(ci_name, out_name);
×
815

816
    if (pid_opt) builder.append_int64(ci_pid, *pid_opt);
×
817
    if (tid_opt) builder.append_int64(ci_tid, *tid_opt);
×
818

819
    // hash / value
820
    if (is_hash && args_value && !args_value->empty())
×
821
        builder.append_string(ci_hash, *args_value);
×
822
    if (row_type == ROW_METADATA && args_value && !args_value->empty())
×
823
        builder.append_string(ci_value, *args_value);
×
824

825
    // host_hash / file_hash
826
    if (args_hhash && !args_hhash->empty())
×
827
        builder.append_string(ci_host_hash, *args_hhash);
×
828
    if (args_fhash && !args_fhash->empty())
×
829
        builder.append_string(ci_file_hash, *args_fhash);
×
830

831
    // epoch / step
832
    if (args_epoch && *args_epoch >= 0)
×
833
        builder.append_int64(ci_epoch, *args_epoch);
×
834
    if (args_step && *args_step >= 0) builder.append_int64(ci_step, *args_step);
×
835

836
    // --- Temporal ---
837
    bool has_ts = (is_event || is_C) && ts_opt.has_value();
×
838
    bool has_dur = dur_opt.has_value();
×
839
    int64_t ts_val = 0, dur_val = 0;
×
840
    if (has_ts) {
×
841
        ts_val = *ts_opt;
×
842
        builder.append_int64(ci_ts, ts_val);
×
843
    }
844
    if (is_event && has_ts && has_dur) {
×
845
        dur_val = *dur_opt;
×
846
        builder.append_int64(ci_dur, dur_val);
×
847
        builder.append_int64(ci_te, ts_val + dur_val);
×
848
    }
849

850
    // --- IO columns (events only) ---
851
    if (is_event) {
×
852
        bool is_posix_stdio =
853
            str_iequal(cat_sv, "posix") || str_iequal(cat_sv, "stdio");
×
854
        int8_t io_cat = IO_OTHER;
×
855

856
        // size priority: size_sum > POSIX ret > image_size
857
        if (args_size_sum) {
×
858
            builder.append_int64(ci_size, *args_size_sum);
×
859
            if (is_posix_stdio) io_cat = get_io_cat(out_name);
×
860
        } else if (is_posix_stdio) {
×
861
            io_cat = get_io_cat(out_name);
×
862
            if (args_ret && *args_ret > 0 &&
×
863
                (io_cat == IO_READ || io_cat == IO_WRITE))
×
864
                builder.append_int64(ci_size, *args_ret);
×
865
            if (args_offset && *args_offset >= 0)
×
866
                builder.append_int64(ci_offset, *args_offset);
×
867
        } else {
868
            if (args_image_idx && *args_image_idx > 0)
×
869
                builder.append_int64(ci_image_id, *args_image_idx);
×
870
            if (args_image_size && *args_image_size > 0 &&
×
871
                !str_contains_lower(out_name, "open"))
×
872
                builder.append_int64(ci_size, *args_image_size);
×
873
        }
874
        builder.append_int64(ci_io_cat, io_cat);
×
875
    }
876

877
    // --- Profile columns ---
878
    if (is_profile) {
×
879
        bool is_posix_stdio =
880
            str_iequal(cat_sv, "posix") || str_iequal(cat_sv, "stdio");
×
881
        int8_t io_cat = is_posix_stdio ? get_io_cat(out_name) : IO_OTHER;
×
882
        builder.append_int64(ci_io_cat, io_cat);
×
883

884
        static const char *profile_keys[] = {
885
            "count",      "count_max",  "count_min",  "count_sum",
886
            "dft_cnt",    "dur",        "dur_max",    "dur_min",
887
            "dur_sum",    "epoch",      "flags",      "offset",
888
            "offset_max", "offset_min", "offset_sum", "ret",
889
            "ret_max",    "ret_min",    "ret_sum",    "whence",
890
            "whence_max", "whence_min", "whence_sum", nullptr};
891
        for (const char **pk = profile_keys; *pk; ++pk) {
×
892
            auto it = args_int_map.find(*pk);
×
893
            if (it != args_int_map.end()) {
×
894
                auto idx = builder.add_or_get_column(*pk, ColumnType::INT64);
×
895
                builder.append_int64(idx, it->second);
×
896
            }
897
        }
898
    }
899

900
    // --- System columns ---
901
    if (is_sys) {
×
902
        static const char *sys_keys[] = {
903
            "user_pct", "system_pct",  "iowait_pct",   "idle_pct",
904
            "irq_pct",  "softirq_pct", "MemAvailable", "MemFree",
905
            "Cached",   "Dirty",       "Active",       nullptr};
906
        for (const char **sk = sys_keys; *sk; ++sk) {
×
907
            auto it = args_float_map.find(*sk);
×
908
            if (it != args_float_map.end()) {
×
909
                auto idx = builder.add_or_get_column(*sk, ColumnType::DOUBLE);
×
910
                builder.append_double(idx, it->second);
×
911
            }
912
        }
913
    }
914

915
    builder.end_row();
×
916
    return true;
×
917
}
×
918

919
// Flatten a simdjson object into "prefix.key" columns using native types.
920
// On type mismatch (same key, different type across rows), appends null.
UNCOV
921
static void flatten_object_into(RecordBatchBuilder &builder, StringArena &arena,
×
922
                                std::string_view prefix,
923
                                simdjson::ondemand::object obj) {
924
    using SVH = JsonValueHelper;
925
    char key_buf[512];
926

UNCOV
927
    for (auto field : obj) {
×
UNCOV
928
        if (field.error()) continue;
×
929

UNCOV
930
        auto key_result = field.unescaped_key();
×
UNCOV
931
        if (key_result.error()) continue;
×
UNCOV
932
        std::string_view sk = key_result.value_unsafe();
×
933

UNCOV
934
        auto val_result = field.value();
×
UNCOV
935
        if (val_result.error()) continue;
×
UNCOV
936
        auto sub_val = val_result.value_unsafe();
×
937

UNCOV
938
        std::size_t needed = prefix.size() + 1 + sk.size();
×
UNCOV
939
        if (needed >= sizeof(key_buf)) continue;
×
UNCOV
940
        std::memcpy(key_buf, prefix.data(), prefix.size());
×
UNCOV
941
        key_buf[prefix.size()] = '.';
×
UNCOV
942
        std::memcpy(key_buf + prefix.size() + 1, sk.data(), sk.size());
×
UNCOV
943
        std::string_view full_key(key_buf, needed);
×
944

UNCOV
945
        auto type_result = sub_val.type();
×
UNCOV
946
        if (type_result.error()) continue;
×
UNCOV
947
        auto json_type = type_result.value_unsafe();
×
948

UNCOV
949
        switch (json_type) {
×
UNCOV
950
            case simdjson::ondemand::json_type::number: {
×
UNCOV
951
                auto num_result = sub_val.get_number();
×
UNCOV
952
                if (num_result.error()) break;
×
UNCOV
953
                auto num = num_result.value_unsafe();
×
UNCOV
954
                if (num.is_int64()) {
×
955
                    auto idx =
UNCOV
956
                        builder.add_or_get_column(full_key, ColumnType::INT64);
×
UNCOV
957
                    if (builder.column_type(idx) == ColumnType::INT64)
×
UNCOV
958
                        builder.append_int64(idx, num.get_int64());
×
959
                    else
UNCOV
960
                        builder.append_null(idx);
×
UNCOV
961
                } else if (num.is_uint64()) {
×
962
                    auto idx =
UNCOV
963
                        builder.add_or_get_column(full_key, ColumnType::UINT64);
×
UNCOV
964
                    if (builder.column_type(idx) == ColumnType::UINT64)
×
UNCOV
965
                        builder.append_uint64(idx, num.get_uint64());
×
966
                    else
UNCOV
967
                        builder.append_null(idx);
×
968
                } else {
969
                    auto idx =
UNCOV
970
                        builder.add_or_get_column(full_key, ColumnType::DOUBLE);
×
UNCOV
971
                    if (builder.column_type(idx) == ColumnType::DOUBLE)
×
UNCOV
972
                        builder.append_double(idx, num.get_double());
×
973
                    else
UNCOV
974
                        builder.append_null(idx);
×
975
                }
UNCOV
976
                break;
×
977
            }
UNCOV
978
            case simdjson::ondemand::json_type::string: {
×
UNCOV
979
                auto str_result = sub_val.get_string();
×
UNCOV
980
                if (str_result.error()) break;
×
UNCOV
981
                auto str = str_result.value_unsafe();
×
982
                auto idx =
UNCOV
983
                    builder.add_or_get_column(full_key, ColumnType::STRING);
×
UNCOV
984
                if (builder.column_type(idx) == ColumnType::STRING)
×
UNCOV
985
                    builder.append_string(idx, str);
×
986
                else
UNCOV
987
                    builder.append_null(idx);
×
UNCOV
988
                break;
×
989
            }
UNCOV
990
            case simdjson::ondemand::json_type::boolean: {
×
UNCOV
991
                auto bool_result = sub_val.get_bool();
×
UNCOV
992
                if (bool_result.error()) break;
×
UNCOV
993
                auto b = bool_result.value_unsafe();
×
994
                auto idx =
UNCOV
995
                    builder.add_or_get_column(full_key, ColumnType::BOOL);
×
UNCOV
996
                if (builder.column_type(idx) == ColumnType::BOOL)
×
UNCOV
997
                    builder.append_bool(idx, b);
×
998
                else
UNCOV
999
                    builder.append_null(idx);
×
UNCOV
1000
                break;
×
1001
            }
UNCOV
1002
            case simdjson::ondemand::json_type::null: {
×
UNCOV
1003
                auto existing = builder.find_column(full_key);
×
UNCOV
1004
                if (existing) builder.append_null(*existing);
×
UNCOV
1005
                break;
×
1006
            }
UNCOV
1007
            case simdjson::ondemand::json_type::object:
×
1008
            case simdjson::ondemand::json_type::array: {
1009
                // Serialize nested object/array to JSON string
UNCOV
1010
                auto json_str = SVH::to_json_string(sub_val);
×
1011
                auto idx =
UNCOV
1012
                    builder.add_or_get_column(full_key, ColumnType::STRING);
×
UNCOV
1013
                if (json_str) {
×
UNCOV
1014
                    builder.append_string(
×
UNCOV
1015
                        idx, arena.push(json_str->data(), json_str->size()));
×
1016
                } else {
UNCOV
1017
                    builder.append_null(idx);
×
1018
                }
UNCOV
1019
                break;
×
UNCOV
1020
            }
×
UNCOV
1021
            default:
×
UNCOV
1022
                break;
×
1023
        }
1024
    }
UNCOV
1025
}
×
1026

1027
static bool build_arrow_row(RecordBatchBuilder &builder, JsonParser &parser,
×
1028
                            StringArena &arena, bool normalize) {
1029
    if (normalize) return normalize_row(builder, arena, parser);
×
1030

1031
    using SVH = JsonValueHelper;
1032
    parser.for_each_field([&](std::string_view key_sv,
×
1033
                              simdjson::ondemand::value val) {
1034
        auto type_result = val.type();
×
1035
        if (type_result.error()) return;
×
1036
        auto json_type = type_result.value_unsafe();
×
1037
        switch (json_type) {
×
UNCOV
1038
            case simdjson::ondemand::json_type::number: {
×
1039
                auto num_result = val.get_number();
×
1040
                if (num_result.error()) break;
×
1041
                auto num = num_result.value_unsafe();
×
1042
                if (num.is_int64()) {
×
1043
                    std::size_t idx =
1044
                        builder.add_or_get_column(key_sv, ColumnType::INT64);
×
1045
                    builder.append_int64(idx, num.get_int64());
×
1046
                } else if (num.is_uint64()) {
×
1047
                    std::size_t idx =
1048
                        builder.add_or_get_column(key_sv, ColumnType::UINT64);
×
1049
                    builder.append_uint64(idx, num.get_uint64());
×
1050
                } else {
1051
                    std::size_t idx =
1052
                        builder.add_or_get_column(key_sv, ColumnType::DOUBLE);
×
1053
                    builder.append_double(idx, num.get_double());
×
1054
                }
1055
                break;
×
1056
            }
UNCOV
1057
            case simdjson::ondemand::json_type::string: {
×
1058
                auto str_result = val.get_string();
×
1059
                if (str_result.error()) break;
×
1060
                auto str = str_result.value_unsafe();
×
1061
                std::size_t idx =
1062
                    builder.add_or_get_column(key_sv, ColumnType::STRING);
×
1063
                builder.append_string(idx, str);
×
1064
                break;
×
1065
            }
UNCOV
1066
            case simdjson::ondemand::json_type::boolean: {
×
1067
                auto bool_result = val.get_bool();
×
1068
                if (bool_result.error()) break;
×
1069
                auto b = bool_result.value_unsafe();
×
1070
                std::size_t idx =
1071
                    builder.add_or_get_column(key_sv, ColumnType::BOOL);
×
1072
                builder.append_bool(idx, b);
×
1073
                break;
×
1074
            }
UNCOV
1075
            case simdjson::ondemand::json_type::null: {
×
1076
                auto existing = builder.find_column(key_sv);
×
1077
                if (existing) builder.append_null(*existing);
×
1078
                break;
×
1079
            }
UNCOV
1080
            case simdjson::ondemand::json_type::object:
×
1081
            case simdjson::ondemand::json_type::array: {
1082
                auto json_str = SVH::to_json_string(val);
×
1083
                std::size_t idx =
1084
                    builder.add_or_get_column(key_sv, ColumnType::STRING);
×
1085
                if (json_str) {
×
1086
                    builder.append_string(
×
1087
                        idx, arena.push(json_str->data(), json_str->size()));
×
1088
                } else {
1089
                    builder.append_null(idx);
×
1090
                }
UNCOV
1091
                break;
×
1092
            }
×
UNCOV
1093
            default:
×
1094
                break;
×
1095
        }
1096
    });
1097
    builder.end_row();
×
1098
    return true;
×
1099
}
1100

UNCOV
1101
static bool process_json_line(RecordBatchBuilder &builder, JsonParser &parser,
×
1102
                              StringArena &arena, std::string_view content,
1103
                              bool normalize) {
1104
    const char *trimmed;
1105
    std::size_t trimmed_length;
UNCOV
1106
    if (!dftracer::utils::json_trim_and_validate_with_comma(
×
1107
            content.data(), content.size(), trimmed, trimmed_length))
UNCOV
1108
        return false;
×
UNCOV
1109
    if (!parser.parse(std::string_view(trimmed, trimmed_length))) return false;
×
UNCOV
1110
    return build_arrow_row(builder, parser, arena, normalize);
×
1111
}
1112

UNCOV
1113
static CoroTask<void> produce_arrow_for_file(
×
1114
    dftracer::utils::coro::Channel<ArrowExportResult> *chan,
1115
    std::string file_path, std::string index_dir, std::size_t checkpoint_size,
1116
    bool auto_build_index, ReadConfig rc, std::size_t batch_size,
1117
    bool normalize, std::atomic<bool> *cancelled) {
1118
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer(chan);
1119
    auto guard = producer.guard();
1120

1121
    TraceReaderConfig cfg;
1122
    cfg.file_path = std::move(file_path);
1123
    cfg.index_dir = std::move(index_dir);
1124
    cfg.checkpoint_size = checkpoint_size;
1125
    cfg.auto_build_index = auto_build_index;
1126

1127
    TraceReader reader(std::move(cfg));
1128

1129
    // Fast path: non-normalized Arrow build happens inside TraceReader.
1130
    // Normalize still goes through read_json + build_arrow_row for the
1131
    // richer schema derivation.
1132
    if (!normalize) {
1133
        auto batch_gen = reader.read_arrow(rc, batch_size);
1134
        while (auto batch_opt = co_await batch_gen.next()) {
1135
            if (cancelled->load(std::memory_order_acquire)) co_return;
1136
            if (!co_await producer.send(std::move(*batch_opt))) co_return;
1137
        }
1138
        co_return;
1139
    }
1140

1141
    auto gen = reader.read_json(rc);
1142
    RecordBatchBuilder builder;
1143
    builder.reserve(batch_size);
1144
    StringArena arena;
1145

1146
    while (auto opt = co_await gen.next()) {
1147
        if (cancelled->load(std::memory_order_acquire)) co_return;
1148
        if (!build_arrow_row(builder, *opt->parser, arena, normalize)) continue;
1149
        if (builder.num_rows() >= batch_size) {
1150
            auto result = builder.finish();
1151
            arena.clear();
1152
            if (!co_await producer.send(std::move(result))) co_return;
1153
            if (!builder.is_schema_locked()) builder.lock_schema();
1154
            builder.reset(true);
1155
            builder.reserve(batch_size);
1156
        }
1157
    }
1158
    if (builder.num_rows() > 0) {
1159
        co_await producer.send(builder.finish());
1160
    }
1161
    co_return;
UNCOV
1162
}
×
1163

UNCOV
1164
static CoroTask<void> file_worker(
×
1165
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
1166
    dftracer::utils::coro::Channel<ArrowExportResult> *out_chan,
1167
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1168
    ReadConfig rc, std::size_t batch_size, bool normalize,
1169
    std::atomic<bool> *cancelled) {
1170
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer(
1171
        out_chan);
1172
    auto guard = producer.guard();
1173

1174
    while (auto file_path = co_await file_chan->receive()) {
1175
        if (cancelled->load(std::memory_order_acquire)) co_return;
1176
        TraceReaderConfig cfg;
1177
        cfg.file_path = std::move(*file_path);
1178
        cfg.index_dir = index_dir;
1179
        cfg.checkpoint_size = checkpoint_size;
1180
        cfg.auto_build_index = auto_build_index;
1181

1182
        TraceReader reader(std::move(cfg));
1183

1184
        if (!normalize) {
1185
            auto batch_gen = reader.read_arrow(rc, batch_size);
1186
            while (auto batch_opt = co_await batch_gen.next()) {
1187
                if (cancelled->load(std::memory_order_acquire)) co_return;
1188
                if (!co_await producer.send(std::move(*batch_opt))) co_return;
1189
            }
1190
            continue;
1191
        }
1192

1193
        auto gen = reader.read_json(rc);
1194
        RecordBatchBuilder builder;
1195
        builder.reserve(batch_size);
1196
        StringArena arena;
1197

1198
        while (auto opt = co_await gen.next()) {
1199
            if (cancelled->load(std::memory_order_acquire)) co_return;
1200
            if (!build_arrow_row(builder, *opt->parser, arena, normalize))
1201
                continue;
1202
            if (builder.num_rows() >= batch_size) {
1203
                auto result = builder.finish();
1204
                arena.clear();
1205
                if (!co_await producer.send(std::move(result))) co_return;
1206
                if (!builder.is_schema_locked()) builder.lock_schema();
1207
                builder.reset(true);
1208
                builder.reserve(batch_size);
1209
            }
1210
        }
1211
        if (builder.num_rows() > 0) {
1212
            if (!co_await producer.send(builder.finish())) co_return;
1213
        }
1214
    }
1215
    co_return;
UNCOV
1216
}
×
1217

1218
// Extract AND-of-EQ leaves from a Query AST. Returns nullopt if the predicate
1219
// shape is anything else (NE, range ops, IN, NOT, OR), in which case the
1220
// uniform-match shortcut does not apply.
1221
static std::optional<std::vector<std::pair<std::string, std::string>>>
1222
extract_eq_leaves(
×
1223
    const dftracer::utils::utilities::common::query::QueryNode &node) {
1224
    namespace q_ns = dftracer::utils::utilities::common::query;
1225
    using LeafVec = std::vector<std::pair<std::string, std::string>>;
1226

1227
    auto literal_to_string = [](const q_ns::LiteralNode &lit) -> std::string {
×
1228
        return std::visit(
1229
            [](auto &&v) -> std::string {
×
1230
                using T = std::decay_t<decltype(v)>;
1231
                if constexpr (std::is_same_v<T, std::string>)
1232
                    return v;
×
1233
                else if constexpr (std::is_same_v<T, bool>)
1234
                    return v ? "true" : "false";
×
1235
                else if constexpr (std::is_same_v<T, int64_t>)
1236
                    return std::to_string(v);
×
1237
                else if constexpr (std::is_same_v<T, uint64_t>)
1238
                    return std::to_string(v);
×
1239
                else if constexpr (std::is_same_v<T, double>)
1240
                    return std::to_string(v);
×
1241
                else
1242
                    return {};
1243
            },
1244
            lit.value);
×
1245
    };
1246

1247
    return std::visit(
1248
        [&](const auto &n) -> std::optional<LeafVec> {
×
1249
            using T = std::decay_t<decltype(n)>;
1250
            if constexpr (std::is_same_v<T, q_ns::CompareNode>) {
1251
                if (n.op != q_ns::CompareOp::EQ) return std::nullopt;
×
1252
                return LeafVec{{n.field.path, literal_to_string(n.value)}};
×
1253
            } else if constexpr (std::is_same_v<T, q_ns::AndNode>) {
1254
                auto l = extract_eq_leaves(*n.left);
×
1255
                if (!l) return std::nullopt;
×
1256
                auto r = extract_eq_leaves(*n.right);
×
1257
                if (!r) return std::nullopt;
×
1258
                l->insert(l->end(), r->begin(), r->end());
×
1259
                return l;
×
1260
            } else {
×
1261
                return std::nullopt;
×
1262
            }
1263
        },
1264
        node.data);
×
1265
}
1266

1267
// True iff every checkpoint in `chunk_idxs` has dim_stats min == max == literal
1268
// for every leaf. Empty leaves -> false (no shortcut). Missing dim_stats for
1269
// any (chunk, leaf) -> false (we don't know, play safe).
1270
static bool all_chunks_uniform_match(
×
1271
    const dftracer::utils::utilities::indexer::IndexDatabase &db, int fid,
1272
    const std::vector<std::pair<std::string, std::string>> &leaves,
1273
    const std::vector<std::uint64_t> &chunk_idxs) {
1274
    if (leaves.empty() || chunk_idxs.empty()) return false;
×
1275
    namespace indexing = dftracer::utils::utilities::composites::dft::indexing;
1276

1277
    for (const auto &[dim, val] : leaves) {
×
1278
        auto rows = db.query_chunk_dimension_stats_for_dimension(fid, dim);
×
1279
        if (rows.empty()) return false;
×
1280
        std::unordered_map<std::uint64_t,
1281
                           const indexing::ChunkDimensionStatsResult *>
1282
            by_ckpt;
×
1283
        by_ckpt.reserve(rows.size());
×
1284
        for (const auto &r : rows) by_ckpt.emplace(r.checkpoint_idx, &r);
×
1285
        for (auto cidx : chunk_idxs) {
×
1286
            auto it = by_ckpt.find(cidx);
×
1287
            if (it == by_ckpt.end()) return false;
×
1288
            const auto &ds = *it->second;
×
1289
            if (ds.min_value != val || ds.max_value != val) return false;
×
1290
        }
1291
    }
×
1292
    return true;
×
1293
}
1294

1295
// Byte-range work unit for checkpoint-level parallelism. Each unit covers
1296
// one or more consecutive checkpoints from a single file. Decompression of
1297
// a single gz file is sequential per gzip stream, so splitting at
1298
// checkpoint-aligned byte offsets is what lets multiple workers share the
1299
// decode work for one file.
1300
struct ArrowWorkItem {
1301
    std::string file_path;
1302
    std::size_t start_byte = 0;
1303
    std::size_t end_byte = 0;
1304
    bool start_at_checkpoint = false;
1305
    bool end_at_checkpoint = false;
1306
    // When true, every kept chunk for this byte range is uniform-matching
1307
    // (dim_stats min == max == predicate literal for every AND-of-EQ leaf),
1308
    // so per-event predicate eval is skippable.
1309
    bool chunk_prune_only = false;
1310
    // Line-range work items override byte ranges: the worker passes these
1311
    // down as LINE_RANGE on the read, and the gzip stream resolves them to
1312
    // byte offsets via the checkpoint index. 0 = no line constraint.
1313
    std::size_t start_line = 0;
1314
    std::size_t end_line = 0;
1315
};
1316

1317
static std::vector<ArrowWorkItem> enumerate_work_items(
36✔
1318
    const std::vector<std::string> &files, const std::string &index_dir,
1319
    const std::string &query_str, std::size_t max_workers,
1320
    std::size_t clip_start_byte = 0, std::size_t clip_end_byte = 0,
1321
    std::size_t clip_start_line = 0, std::size_t clip_end_line = 0) {
1322
    namespace dft_internal =
1323
        dftracer::utils::utilities::composites::dft::internal;
1324
    namespace indexer_ns = dftracer::utils::utilities::indexer;
1325
    namespace indexing = dftracer::utils::utilities::composites::dft::indexing;
1326

1327
    std::vector<ArrowWorkItem> items;
36✔
1328
    items.reserve(files.size() * 4);
36!
1329

1330
    const bool has_line_clip = (clip_start_line > 0 || clip_end_line > 0);
36!
1331
    auto push_unsplit = [&](const std::string &fp) {
57✔
1332
        ArrowWorkItem item;
57✔
1333
        item.file_path = fp;
57!
1334
        item.start_line = clip_start_line;
57✔
1335
        item.end_line = clip_end_line;
57✔
1336
        items.push_back(std::move(item));
57!
1337
    };
57✔
1338

1339
    // Parse the query once. Pruner input copies a Query, so we keep the
1340
    // parsed form around to feed each ChunkPrunerInput without re-parsing.
1341
    std::optional<dftracer::utils::utilities::common::query::Query> parsed;
36✔
1342
    if (!query_str.empty()) {
36✔
1343
        auto r = dftracer::utils::utilities::common::query::Query::from_string(
1344
            query_str);
3!
1345
        if (r) parsed = std::move(*r);
3!
1346
    }
3✔
1347

1348
    // All files in a directory-mode scan share the same `.dftindex` root.
1349
    // Group files by their resolved index path so we can open the RocksDB
1350
    // once per index and reuse it to prune every file against that handle.
1351
    std::unordered_map<std::string, std::vector<std::size_t>> by_index;
36✔
1352
    for (std::size_t i = 0; i < files.size(); ++i) {
97✔
1353
        std::string index_path =
1354
            dft_internal::determine_index_path(files[i], index_dir);
61!
1355
        by_index[index_path].push_back(i);
61!
1356
    }
61✔
1357

1358
    for (auto &entry : by_index) {
75✔
1359
        const auto &index_path = entry.first;
39✔
1360
        const auto &file_idxs = entry.second;
39✔
1361
        if (!fs::exists(index_path)) {
39!
1362
            for (auto i : file_idxs) push_unsplit(files[i]);
78!
1363
            continue;
31✔
1364
        }
31✔
1365
        std::unique_ptr<indexer_ns::IndexDatabase> idx_db;
8✔
1366
        try {
1367
            idx_db = std::make_unique<indexer_ns::IndexDatabase>(
8✔
1368
                index_path,
1369
                dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
8!
UNCOV
1370
        } catch (...) {
×
1371
            for (auto i : file_idxs) push_unsplit(files[i]);
×
UNCOV
1372
            continue;
×
1373
        }
×
1374

1375
        // Resolve fid + checkpoints per file (cheap queries).
1376
        struct FileCtx {
1377
            std::size_t file_idx;
1378
            int fid;
1379
            std::vector<indexer_ns::IndexerCheckpoint> ckpts;
1380
        };
1381
        std::vector<FileCtx> file_ctxs;
8✔
1382
        file_ctxs.reserve(file_idxs.size());
8!
1383
        for (auto i : file_idxs) {
22✔
1384
            FileCtx fc;
14✔
1385
            fc.file_idx = i;
14✔
1386
            fc.fid = idx_db->get_file_info_id(
28!
1387
                indexer_ns::internal::get_logical_path(files[i]));
28!
1388
            if (fc.fid < 0) {
14!
1389
                push_unsplit(files[i]);
×
1390
                continue;
×
1391
            }
1392
            fc.ckpts = idx_db->query_checkpoints(fc.fid);
14!
1393
            if (fc.ckpts.empty()) {
14✔
1394
                push_unsplit(files[i]);
10!
1395
                continue;
10✔
1396
            }
1397
            std::sort(fc.ckpts.begin(), fc.ckpts.end(),
4!
1398
                      [](const auto &a, const auto &b) {
×
1399
                          return a.first_line_num < b.first_line_num;
×
1400
                      });
1401
            file_ctxs.push_back(std::move(fc));
4!
1402
        }
14✔
1403

1404
        // Batch-prune all files against the shared index: dim_stats and
1405
        // chunk_statistics are loaded in one RocksDB scan each instead of
1406
        // one scan per file.
1407
        std::vector<indexing::ChunkPrunerOutput> pruner_outs(file_ctxs.size());
8!
1408
        if (parsed && !file_ctxs.empty()) {
8!
1409
            indexing::ChunkPrunerBatchInput batch_in;
×
1410
            batch_in.index_path = index_path;
×
1411
            batch_in.external_db = idx_db.get();
×
1412
            batch_in.items.reserve(file_ctxs.size());
×
1413
            for (auto &fc : file_ctxs) {
×
1414
                batch_in.items.push_back({files[fc.file_idx], *parsed});
×
1415
            }
1416
            indexing::ChunkPrunerUtility pruner;
×
1417
            auto batch_out = pruner.process_batch(batch_in);
×
1418
            if (batch_out.success) {
×
1419
                pruner_outs = std::move(batch_out.outputs);
×
1420
            }
1421
        }
×
1422

1423
        // For AND-of-EQ predicates, precompute uniform-match leaves once.
1424
        // Per-file pure_match is checked inline below and lets workers skip
1425
        // per-event predicate eval on chunks where dim_stats min == max ==
1426
        // literal for every leaf.
1427
        std::optional<std::vector<std::pair<std::string, std::string>>>
1428
            eq_leaves;
8✔
1429
        if (parsed) eq_leaves = extract_eq_leaves(parsed->root());
8!
1430

1431
        for (std::size_t fc_idx = 0; fc_idx < file_ctxs.size(); ++fc_idx) {
12✔
1432
            auto &fc = file_ctxs[fc_idx];
4✔
1433
            const auto &fp = files[fc.file_idx];
4✔
1434

1435
            // Pruner chunk_idx semantics: 0-indexed over uncompressed
1436
            // slices. fc.ckpts holds gzip recovery points; recovery point
1437
            // fc.ckpts[k] sits at the START of pruner chunk (k+1). Pruner
1438
            // chunk 0 has no recovery point at its start (decoded from
1439
            // gzip stream start). Total pruner chunks = fc.ckpts.size()+1.
1440
            const std::size_t total_chunks = fc.ckpts.size() + 1;
4✔
1441
            auto chunk_start_byte = [&](std::uint64_t cidx) -> std::size_t {
6✔
1442
                if (cidx == 0) return 0;
6✔
1443
                return fc.ckpts[cidx - 1].uc_offset;
4✔
1444
            };
4✔
1445
            auto chunk_end_byte = [&](std::uint64_t cidx) -> std::size_t {
6✔
1446
                if (cidx == 0)
6✔
1447
                    return fc.ckpts.empty() ? 0 : fc.ckpts[0].uc_offset;
2!
1448
                std::size_t k = cidx - 1;
4✔
1449
                return fc.ckpts[k].uc_offset + fc.ckpts[k].uc_size;
4✔
1450
            };
4✔
1451
            // Line ranges for a chunk. Chunk 0 covers everything before the
1452
            // first recovery point; chunk k>=1 spans recovery point (k-1).
1453
            auto chunk_first_line = [&](std::uint64_t cidx) -> std::size_t {
6✔
1454
                if (cidx == 0) return 1;
6✔
1455
                return fc.ckpts[cidx - 1].first_line_num;
4✔
1456
            };
4✔
1457
            auto chunk_last_line = [&](std::uint64_t cidx) -> std::size_t {
6✔
1458
                if (cidx == 0) {
6✔
1459
                    if (fc.ckpts.empty()) return SIZE_MAX;
2!
1460
                    return fc.ckpts[0].first_line_num > 0
2✔
1461
                               ? fc.ckpts[0].first_line_num - 1
2!
1462
                               : 0;
2✔
1463
                }
1464
                return fc.ckpts[cidx - 1].last_line_num;
4✔
1465
            };
4✔
1466

1467
            std::vector<std::uint64_t> keep_chunks;
4✔
1468
            keep_chunks.reserve(total_chunks);
4!
1469
            if (parsed) {
4!
1470
                const auto &pr = pruner_outs[fc_idx];
×
1471
                if (pr.success && !pr.file_may_match) {
×
1472
                    continue;  // whole file pruned
×
1473
                }
1474
                if (pr.success && !pr.candidate_checkpoints.empty() &&
×
1475
                    pr.candidate_checkpoints.size() < pr.total_checkpoints) {
×
1476
                    for (auto cidx : pr.candidate_checkpoints) {
×
1477
                        if (cidx < total_chunks) keep_chunks.push_back(cidx);
×
1478
                    }
1479
                    std::sort(keep_chunks.begin(), keep_chunks.end());
×
1480
                    keep_chunks.erase(
×
1481
                        std::unique(keep_chunks.begin(), keep_chunks.end()),
×
1482
                        keep_chunks.end());
×
1483
                } else {
1484
                    for (std::uint64_t c = 0; c < total_chunks; ++c)
×
1485
                        keep_chunks.push_back(c);
×
1486
                }
1487
            } else {
1488
                for (std::uint64_t c = 0; c < total_chunks; ++c)
12✔
1489
                    keep_chunks.push_back(c);
8!
1490
            }
1491

1492
            // Intersect with the user's line range so workers only touch
1493
            // chunks that actually overlap it. Each work item carries the
1494
            // sub-line-range; LINE_RANGE on the read maps it back to bytes
1495
            // via the same checkpoint table the gzip stream uses.
1496
            if (has_line_clip) {
4✔
1497
                std::size_t lo = clip_start_line > 0 ? clip_start_line : 1;
2!
1498
                std::size_t hi = clip_end_line > 0 ? clip_end_line : SIZE_MAX;
2!
1499
                std::vector<std::uint64_t> filtered;
2✔
1500
                filtered.reserve(keep_chunks.size());
2!
1501
                for (auto c : keep_chunks) {
6✔
1502
                    std::size_t cf = chunk_first_line(c);
4✔
1503
                    std::size_t cl = chunk_last_line(c);
4✔
1504
                    if (cl < lo || cf > hi) continue;
4!
1505
                    filtered.push_back(c);
2!
1506
                }
1507
                keep_chunks = std::move(filtered);
2✔
1508
            }
2✔
1509

1510
            if (keep_chunks.empty()) continue;
4!
1511

1512
            // All-or-nothing per file: if every kept chunk is uniform-matching
1513
            // for every leaf, every work item from this file gets the
1514
            // chunk_prune_only fast path. Mixed files fall back to per-event
1515
            // eval to stay safe.
1516
            bool file_pure_match = false;
4✔
1517
            if (eq_leaves && !eq_leaves->empty() && idx_db) {
4!
1518
                file_pure_match = all_chunks_uniform_match(
×
1519
                    *idx_db, fc.fid, *eq_leaves, keep_chunks);
×
1520
            }
1521

1522
            std::size_t target_ranges = std::max<std::size_t>(1, max_workers);
4✔
1523
            std::size_t per_range = std::max<std::size_t>(
4✔
1524
                1, (keep_chunks.size() + target_ranges - 1) / target_ranges);
4✔
1525

1526
            std::size_t group_start = 0;
4✔
1527
            while (group_start < keep_chunks.size()) {
10✔
1528
                std::size_t group_end = group_start;
6✔
1529
                std::size_t emitted = 0;
6✔
1530
                while (group_end < keep_chunks.size() && emitted < per_range) {
12✔
1531
                    if (group_end > group_start &&
6!
1532
                        keep_chunks[group_end] !=
×
1533
                            keep_chunks[group_end - 1] + 1) {
×
1534
                        break;
×
1535
                    }
1536
                    ++group_end;
6✔
1537
                    ++emitted;
6✔
1538
                }
1539
                std::uint64_t scidx = keep_chunks[group_start];
6✔
1540
                std::uint64_t ecidx = keep_chunks[group_end - 1];
6✔
1541
                std::size_t start_byte = chunk_start_byte(scidx);
6✔
1542
                std::size_t end_byte = chunk_end_byte(ecidx);
6✔
1543
                // start_at_checkpoint: a gzip recovery point sits at
1544
                // start_byte (true for any cidx>=1; false for the implicit
1545
                // chunk 0 which decodes from stream start).
1546
                bool start_at_checkpoint = (scidx >= 1);
6✔
1547
                bool end_at_checkpoint = (group_end < keep_chunks.size());
6✔
1548
                if (has_line_clip) {
6✔
1549
                    std::size_t lo = clip_start_line > 0 ? clip_start_line : 1;
2!
1550
                    std::size_t hi =
2✔
1551
                        clip_end_line > 0 ? clip_end_line : SIZE_MAX;
2!
1552
                    std::size_t cluster_first = chunk_first_line(scidx);
2✔
1553
                    std::size_t cluster_last = chunk_last_line(ecidx);
2✔
1554
                    std::size_t item_start =
1555
                        std::max<std::size_t>(lo, cluster_first);
2✔
1556
                    std::size_t item_end =
1557
                        std::min<std::size_t>(hi, cluster_last);
2✔
1558
                    if (item_start > item_end) {
2!
1559
                        group_start = group_end;
×
1560
                        continue;
×
1561
                    }
1562
                    ArrowWorkItem item;
2✔
1563
                    item.file_path = fp;
2!
1564
                    item.chunk_prune_only = file_pure_match;
2✔
1565
                    item.start_line = item_start;
2✔
1566
                    item.end_line = item_end;
2✔
1567
                    items.push_back(std::move(item));
2!
1568
                    group_start = group_end;
2✔
1569
                    continue;
2✔
1570
                }
2✔
1571
                if (clip_end_byte > clip_start_byte) {
4✔
1572
                    if (start_byte < clip_start_byte) {
2!
1573
                        start_byte = clip_start_byte;
×
1574
                        start_at_checkpoint = false;
×
1575
                    }
1576
                    if (end_byte > clip_end_byte) {
2!
1577
                        end_byte = clip_end_byte;
2✔
1578
                        end_at_checkpoint = false;
2✔
1579
                    }
1580
                    if (start_byte >= end_byte) {
2✔
1581
                        group_start = group_end;
1✔
1582
                        continue;
1✔
1583
                    }
1584
                }
1585
                items.push_back({fp, start_byte, end_byte, start_at_checkpoint,
3!
1586
                                 end_at_checkpoint, file_pure_match});
1587
                group_start = group_end;
3✔
1588
            }
1589
        }
4!
1590
    }
8!
1591
    return items;
72✔
1592
}
36✔
1593

1594
static CoroTask<void> send_work_items_to_channel(
36!
1595
    std::shared_ptr<dftracer::utils::coro::Channel<ArrowWorkItem>> chan,
1596
    const std::vector<ArrowWorkItem> *items, std::atomic<bool> *cancelled) {
1597
    for (const auto &it : *items) {
1598
        if (cancelled->load(std::memory_order_acquire)) break;
1599
        if (!co_await chan->send(it)) break;
1600
    }
1601
    chan->close();
1602
    co_return;
1603
}
72!
1604

1605
static CoroTask<void> checkpoint_worker(
49!
1606
    std::shared_ptr<dftracer::utils::coro::Channel<ArrowWorkItem>> work_chan,
1607
    dftracer::utils::coro::Channel<ArrowExportResult> *out_chan,
1608
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1609
    ReadConfig rc, std::size_t batch_size, bool normalize,
1610
    std::atomic<bool> *cancelled) {
1611
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer(
1612
        out_chan);
1613
    auto guard = producer.guard();
1614

1615
    // Cache readers keyed by file path so we don't re-probe the same file
1616
    // when successive work items land on it.
1617
    std::unordered_map<std::string, std::shared_ptr<TraceReader>> readers;
1618

1619
    while (auto item = co_await work_chan->receive()) {
1620
        if (cancelled->load(std::memory_order_acquire)) co_return;
1621

1622
        auto &reader_ptr = readers[item->file_path];
1623
        if (!reader_ptr) {
1624
            TraceReaderConfig cfg;
1625
            cfg.file_path = item->file_path;
1626
            cfg.index_dir = index_dir;
1627
            cfg.checkpoint_size = checkpoint_size;
1628
            cfg.auto_build_index = auto_build_index;
1629
            reader_ptr = std::make_shared<TraceReader>(std::move(cfg));
1630
        }
1631

1632
        ReadConfig local_rc = rc;
1633
        if (item->start_line > 0 || item->end_line > 0) {
1634
            // Line-range work items: the read drives off LINE_RANGE; the
1635
            // gzip stream resolves it back to byte offsets via checkpoints.
1636
            local_rc.start_line = item->start_line;
1637
            local_rc.end_line = item->end_line;
1638
            local_rc.start_byte = 0;
1639
            local_rc.end_byte = 0;
1640
            local_rc.start_at_checkpoint = false;
1641
            local_rc.end_at_checkpoint = false;
1642
        } else {
1643
            local_rc.start_byte = item->start_byte;
1644
            local_rc.end_byte = item->end_byte;
1645
            local_rc.start_at_checkpoint = item->start_at_checkpoint;
1646
            local_rc.end_at_checkpoint = item->end_at_checkpoint;
1647
        }
1648
        // Pruning already happened at enumeration time; avoid the per-
1649
        // work-item RocksDB opens that would otherwise dwarf the actual
1650
        // read cost at directory scale (256 files * N ranges).
1651
        local_rc.skip_pruning = true;
1652
        // chunks pre-classified as uniform-matching skip per-event eval.
1653
        if (item->chunk_prune_only) local_rc.chunk_prune_only = true;
1654

1655
        if (!normalize) {
1656
            auto batch_gen = reader_ptr->read_arrow(local_rc, batch_size);
1657
            while (auto batch_opt = co_await batch_gen.next()) {
1658
                if (cancelled->load(std::memory_order_acquire)) co_return;
1659
                if (!co_await producer.send(std::move(*batch_opt))) co_return;
1660
            }
1661
            continue;
1662
        }
1663

1664
        auto gen = reader_ptr->read_json(local_rc);
1665
        RecordBatchBuilder builder;
1666
        builder.reserve(batch_size);
1667
        StringArena arena;
1668

1669
        while (auto opt = co_await gen.next()) {
1670
            if (cancelled->load(std::memory_order_acquire)) co_return;
1671
            if (!build_arrow_row(builder, *opt->parser, arena, normalize))
1672
                continue;
1673
            if (builder.num_rows() >= batch_size) {
1674
                auto result = builder.finish();
1675
                arena.clear();
1676
                if (!co_await producer.send(std::move(result))) co_return;
1677
                if (!builder.is_schema_locked()) builder.lock_schema();
1678
                builder.reset(true);
1679
                builder.reserve(batch_size);
1680
            }
1681
        }
1682
        if (builder.num_rows() > 0) {
1683
            if (!co_await producer.send(builder.finish())) co_return;
1684
        }
1685
    }
1686
    co_return;
1687
}
103!
1688

1689
static CoroTask<void> spawn_arrow_producers(
36!
1690
    CoroScope &child,
1691
    dftracer::utils::coro::Channel<ArrowExportResult> *out_chan,
1692
    const std::vector<ArrowWorkItem> *work_items, const std::string *index_dir,
1693
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
1694
    std::size_t batch_size, bool normalize, std::atomic<bool> *cancelled_ptr,
1695
    std::size_t max_workers) {
1696
    std::size_t num_workers = std::min(work_items->size(), max_workers);
1697
    if (num_workers == 0) num_workers = 1;
1698
    auto work_chan =
1699
        dftracer::utils::coro::make_channel<ArrowWorkItem>(num_workers);
1700

1701
    for (std::size_t i = 0; i < num_workers; ++i) {
1702
        child.spawn([out_chan, wc = work_chan, idx = *index_dir,
52✔
1703
                     checkpoint_size, auto_build_index, r = *rc, batch_size,
1704
                     normalize, cancelled_ptr](CoroScope &) {
154✔
1705
            return checkpoint_worker(wc, out_chan, idx, checkpoint_size,
102✔
1706
                                     auto_build_index, r, batch_size, normalize,
52✔
1707
                                     cancelled_ptr);
52!
1708
        });
1709
    }
1710

1711
    child.spawn([wc = work_chan, work_items, cancelled_ptr](CoroScope &) {
36✔
1712
        return send_work_items_to_channel(wc, work_items, cancelled_ptr);
36!
1713
    });
1714
    co_return;
1715
}
72!
1716

1717
static CoroTask<void> produce_arrow_batches_for_files(
37!
1718
    CoroScope &scope, ArrowIteratorState *sp, std::vector<std::string> files,
1719
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1720
    ReadConfig rc, std::size_t batch_size, bool normalize,
1721
    std::size_t max_workers) {
1722
    try {
1723
        if (files.empty()) {
1724
            sp->channel->close();
1725
            co_return;
1726
        }
1727

1728
        auto work_items = enumerate_work_items(
1729
            files, index_dir, rc.query, max_workers, rc.start_byte, rc.end_byte,
1730
            rc.start_line, rc.end_line);
1731
        if (work_items.empty()) {
1732
            sp->channel->close();
1733
            co_return;
1734
        }
1735

1736
        auto *chan_ptr = sp->channel.get();
1737
        auto *cancelled_ptr = &sp->cancelled;
1738

1739
        co_await scope.scope([chan_ptr, &work_items, &index_dir,
36!
1740
                              checkpoint_size, auto_build_index, &rc,
1741
                              batch_size, normalize, cancelled_ptr,
1742
                              max_workers](CoroScope &child) -> CoroTask<void> {
1743
            co_await spawn_arrow_producers(
1744
                child, chan_ptr, &work_items, &index_dir, checkpoint_size,
1745
                auto_build_index, &rc, batch_size, normalize, cancelled_ptr,
1746
                max_workers);
1747
        });
72!
1748
    } catch (...) {
1749
        sp->set_error(std::current_exception());
1750
    }
1751
}
74!
1752

1753
static CoroTask<void> produce_arrow_batches_parallel(
15!
1754
    CoroScope &scope, ArrowIteratorState *sp, std::string dir_path,
1755
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1756
    ReadConfig rc, std::size_t batch_size, bool normalize,
1757
    std::size_t max_workers) {
1758
    try {
1759
        PatternDirectoryScannerUtility scanner;
1760
        auto scan_input = PatternDirectoryScannerUtilityInput(
1761
            dir_path, {".pfw", ".pfw.gz"}, true, false);
1762
        auto entries = co_await scope.spawn(scanner, scan_input);
1763

1764
        std::vector<std::string> files;
1765
        files.reserve(entries.size());
1766
        for (auto &e : entries) files.push_back(e.path.string());
1767
        std::sort(files.begin(), files.end());
1768

1769
        co_await produce_arrow_batches_for_files(
1770
            scope, sp, std::move(files), std::move(index_dir), checkpoint_size,
1771
            auto_build_index, std::move(rc), batch_size, normalize,
1772
            max_workers);
1773
    } catch (...) {
1774
        sp->set_error(std::current_exception());
1775
    }
1776
}
30!
1777

1778
CoroTask<void> produce_arrow_batches(
17!
1779
    std::shared_ptr<ArrowIteratorState> state,
1780
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer,
1781
    TraceReaderConfig cfg, ReadConfig rc, std::size_t batch_size,
1782
    bool flatten_objects = false, bool normalize = false) {
1783
    (void)flatten_objects;
1784

1785
    auto guard = producer.guard();
1786
    try {
1787
        TraceReader reader(std::move(cfg));
1788

1789
        if (!normalize) {
1790
            auto batch_gen = reader.read_arrow(rc, batch_size);
1791
            while (auto batch_opt = co_await batch_gen.next()) {
1792
                if (state->cancelled.load(std::memory_order_acquire)) break;
1793
                auto result_bytes =
1794
                    dftracer::utils::python::byte_size(*batch_opt);
1795
                state->bytes_in_queue.fetch_add(result_bytes,
1796
                                                std::memory_order_acq_rel);
1797
                if (!co_await producer.send(std::move(*batch_opt))) break;
1798
            }
1799
            co_return;
1800
        }
1801

1802
        auto gen = reader.read_json(rc);
1803
        RecordBatchBuilder builder;
1804
        builder.reserve(batch_size);
1805

1806
        StringArena arena;
1807

1808
        while (auto opt = co_await gen.next()) {
1809
            if (state->cancelled.load(std::memory_order_acquire)) break;
1810
            if (!build_arrow_row(builder, *opt->parser, arena, normalize))
1811
                continue;
1812

1813
            if (builder.num_rows() >= batch_size) {
1814
                auto result = builder.finish();
1815
                arena.clear();
1816
                auto result_bytes = dftracer::utils::python::byte_size(result);
1817
                state->bytes_in_queue.fetch_add(result_bytes,
1818
                                                std::memory_order_acq_rel);
1819
                if (!co_await producer.send(std::move(result))) break;
1820
                if (!builder.is_schema_locked()) {
1821
                    builder.lock_schema();
1822
                }
1823
                builder.reset(true);
1824
                builder.reserve(batch_size);
1825
            }
1826
        }
1827

1828
        if (builder.num_rows() > 0 &&
1829
            !state->cancelled.load(std::memory_order_acquire)) {
1830
            auto result = builder.finish();
1831
            auto result_bytes = dftracer::utils::python::byte_size(result);
1832
            state->bytes_in_queue.fetch_add(result_bytes,
1833
                                            std::memory_order_acq_rel);
1834
            co_await producer.send(std::move(result));
1835
        }
1836
    } catch (...) {
1837
        state->set_error(std::current_exception());
1838
    }
1839
}
34!
1840

1841
#endif  // DFTRACER_UTILS_ENABLE_ARROW
1842

1843
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
1844

1845
struct WriteArrowStats {
1846
    std::unordered_map<std::string, PartitionWriteStats> partitions;
1847
    int64_t total_rows = 0;
1848
    int64_t total_uncompressed_bytes = 0;
1849
};
1850

1851
struct WriteArrowResult {
1852
    WriteArrowStats stats;
1853
    std::string error;
1854
    std::uint64_t chunks_scanned = 0;
1855
    std::uint64_t chunks_skipped = 0;
1856
};
1857

1858
CoroTask<WriteArrowResult> write_arrow_pipeline(
13!
1859
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
1860
    std::vector<ViewDefinition> views, std::string output_path,
1861
    int64_t chunk_size_bytes, IpcCompression compression,
1862
    std::size_t event_batch_size) {
1863
    namespace dft_internal =
1864
        dftracer::utils::utilities::composites::dft::internal;
1865
    WriteArrowResult result;
1866

1867
    try {
1868
        if (views.empty()) {
1869
            views.push_back(ViewDefinition().with_name("all"));
1870
        }
1871

1872
        std::string resolved_index =
1873
            index_path.empty()
1874
                ? dft_internal::determine_index_path(file_path, "")
1875
                : index_path;
1876

1877
        auto meta_input = MetadataCollectorUtilityInput::from_file(file_path)
1878
                              .with_checkpoint_size(checkpoint_size)
1879
                              .with_index(resolved_index);
1880
        auto metadata = co_await MetadataCollectorUtility{}.process(meta_input);
1881
        if (!metadata.success) {
1882
            result.error =
1883
                "Failed to collect metadata: " + metadata.error_message;
1884
            co_return result;
1885
        }
1886

1887
        for (const auto &view : views) {
1888
            std::string view_output = output_path;
1889
            if (views.size() > 1 || view.name != "all") {
1890
                view_output = output_path + "/" + view.name;
1891
            }
1892

1893
            PartitionWriter writer;
1894
            int rc_open = co_await writer.open(view_output, chunk_size_bytes,
1895
                                               compression);
1896
            if (rc_open != 0) {
1897
                result.error =
1898
                    "Failed to open partition writer for view: " + view.name;
1899
                co_return result;
1900
            }
1901

1902
            ViewBuilderInput builder_input;
1903
            builder_input.with_view(view)
1904
                .with_file_path(file_path)
1905
                .with_index_path(resolved_index)
1906
                .with_uncompressed_size(metadata.uncompressed_size)
1907
                .with_num_checkpoints(metadata.num_checkpoints);
1908

1909
            auto build_output =
1910
                co_await ViewBuilderUtility{}.process(builder_input);
1911
            if (!build_output.success) {
1912
                result.error = "ViewBuilder failed for view: " + view.name;
1913
                co_return result;
1914
            }
1915

1916
            result.chunks_skipped += build_output.skipped_checkpoints;
1917

1918
            if (!build_output.file_may_match) {
1919
                auto stats = co_await writer.close();
1920
                result.stats.partitions[view.name] = std::move(stats);
1921
                continue;
1922
            }
1923

1924
            RecordBatchBuilder builder;
1925
            bool schema_locked = false;
1926

1927
            for (const auto &candidate : build_output.candidates) {
1928
                ViewReaderInput reader_input;
1929
                reader_input.with_file_path(file_path)
1930
                    .with_index_path(resolved_index)
1931
                    .with_checkpoint_size(checkpoint_size)
1932
                    .with_byte_range(candidate.start_byte, candidate.end_byte)
1933
                    .with_checkpoint_idx(candidate.checkpoint_idx)
1934
                    .with_event_batch_size(event_batch_size)
1935
                    .with_view(view);
1936
                reader_input.query = view.query;
1937

1938
                ViewReaderUtility reader;
1939
                auto gen = reader.process(reader_input);
1940
                while (auto opt = co_await gen.next()) {
1941
                    auto arrow_batch = opt->to_arrow(builder);
1942
                    int rc_write = co_await writer.write_batch(arrow_batch);
1943
                    if (rc_write != 0) {
1944
                        result.error =
1945
                            "Failed to write batch for view: " + view.name;
1946
                        co_return result;
1947
                    }
1948
                    if (!schema_locked) {
1949
                        builder.lock_schema();
1950
                        schema_locked = true;
1951
                    }
1952
                    builder.reset(true);
1953
                }
1954
                result.chunks_scanned++;
1955
            }
1956

1957
            auto stats = co_await writer.close();
1958
            result.stats.partitions[view.name] = std::move(stats);
1959
            result.stats.total_rows +=
1960
                result.stats.partitions[view.name].total_rows;
1961
            result.stats.total_uncompressed_bytes +=
1962
                result.stats.partitions[view.name].total_uncompressed_bytes;
1963
        }
1964
    } catch (const std::exception &e) {
1965
        result.error = e.what();
1966
    }
1967
    co_return result;
1968
}
26!
1969

1970
struct ViewChunkInfo {
1971
    std::uint64_t checkpoint_idx;
1972
    std::size_t start_byte;
1973
    std::size_t end_byte;
1974
};
1975

1976
struct GetViewChunksResult {
1977
    std::vector<ViewChunkInfo> chunks;
1978
    std::uint64_t total_checkpoints = 0;
1979
    std::uint64_t skipped_checkpoints = 0;
1980
    bool file_may_match = false;
1981
    std::string error;
1982
};
1983

1984
CoroTask<GetViewChunksResult> get_view_chunks_pipeline(
3!
1985
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
1986
    ViewDefinition view) {
1987
    namespace dft_internal =
1988
        dftracer::utils::utilities::composites::dft::internal;
1989
    GetViewChunksResult result;
1990

1991
    try {
1992
        std::string resolved_index =
1993
            index_path.empty()
1994
                ? dft_internal::determine_index_path(file_path, "")
1995
                : index_path;
1996

1997
        auto meta_input = MetadataCollectorUtilityInput::from_file(file_path)
1998
                              .with_checkpoint_size(checkpoint_size)
1999
                              .with_index(resolved_index);
2000
        auto metadata = co_await MetadataCollectorUtility{}.process(meta_input);
2001
        if (!metadata.success) {
2002
            result.error =
2003
                "Failed to collect metadata: " + metadata.error_message;
2004
            co_return result;
2005
        }
2006

2007
        ViewBuilderInput builder_input;
2008
        builder_input.with_view(view)
2009
            .with_file_path(file_path)
2010
            .with_index_path(resolved_index)
2011
            .with_uncompressed_size(metadata.uncompressed_size)
2012
            .with_num_checkpoints(metadata.num_checkpoints);
2013

2014
        auto build_output =
2015
            co_await ViewBuilderUtility{}.process(builder_input);
2016
        if (!build_output.success) {
2017
            result.error = "ViewBuilder failed";
2018
            co_return result;
2019
        }
2020

2021
        result.file_may_match = build_output.file_may_match;
2022
        result.total_checkpoints = build_output.total_checkpoints;
2023
        result.skipped_checkpoints = build_output.skipped_checkpoints;
2024

2025
        for (const auto &candidate : build_output.candidates) {
2026
            result.chunks.push_back({candidate.checkpoint_idx,
2027
                                     candidate.start_byte, candidate.end_byte});
2028
        }
2029
    } catch (const std::exception &e) {
2030
        result.error = e.what();
2031
    }
2032
    co_return result;
2033
}
6!
2034

2035
struct WriteViewChunkResult {
2036
    std::string output_file;
2037
    std::uint64_t events_matched = 0;
2038
    std::uint64_t events_scanned = 0;
2039
    int64_t rows_written = 0;
2040
    int64_t bytes_written = 0;
2041
    std::string error;
2042
};
2043

2044
CoroTask<WriteViewChunkResult> write_view_chunk_pipeline(
4!
2045
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
2046
    ViewDefinition view, std::uint64_t checkpoint_idx, std::size_t start_byte,
2047
    std::size_t end_byte, std::string output_file, IpcCompression compression,
2048
    std::size_t event_batch_size) {
2049
    namespace dft_internal =
2050
        dftracer::utils::utilities::composites::dft::internal;
2051
    WriteViewChunkResult result;
2052
    result.output_file = output_file;
2053

2054
    try {
2055
        std::string resolved_index =
2056
            index_path.empty()
2057
                ? dft_internal::determine_index_path(file_path, "")
2058
                : index_path;
2059

2060
        dftracer::utils::utilities::common::arrow::IpcWriter writer;
2061
        int rc_open = co_await writer.open(output_file, compression);
2062
        if (rc_open != 0) {
2063
            result.error = "Failed to open output file";
2064
            co_return result;
2065
        }
2066

2067
        ViewReaderInput reader_input;
2068
        reader_input.with_file_path(file_path)
2069
            .with_index_path(resolved_index)
2070
            .with_checkpoint_size(checkpoint_size)
2071
            .with_byte_range(start_byte, end_byte)
2072
            .with_checkpoint_idx(checkpoint_idx)
2073
            .with_event_batch_size(event_batch_size)
2074
            .with_view(view);
2075
        reader_input.query = view.query;
2076

2077
        RecordBatchBuilder builder;
2078
        bool schema_locked = false;
2079

2080
        ViewReaderUtility reader;
2081
        auto gen = reader.process(reader_input);
2082
        while (auto opt = co_await gen.next()) {
2083
            result.events_matched += opt->events_matched;
2084
            result.events_scanned += opt->events_scanned;
2085
            auto batch = opt->to_arrow(builder);
2086
            if (batch.valid()) {
2087
                result.rows_written += batch.num_rows();
2088
                int rc = co_await writer.write_batch(batch);
2089
                if (rc != 0) {
2090
                    result.error = "Failed to write batch";
2091
                    co_return result;
2092
                }
2093
                if (!schema_locked) {
2094
                    builder.lock_schema();
2095
                    schema_locked = true;
2096
                }
2097
                builder.reset(true);
2098
            }
2099
        }
2100

2101
        int rc = co_await writer.close();
2102
        if (rc != 0) {
2103
            result.error = "Failed to close output file";
2104
        }
2105
    } catch (const std::exception &e) {
2106
        result.error = e.what();
2107
    }
2108
    co_return result;
2109
}
8!
2110

2111
struct ChunkDescriptor {
2112
    std::uint64_t checkpoint_idx;
2113
    std::size_t start_byte;
2114
    std::size_t end_byte;
2115
    std::string output_file;
2116
};
2117

2118
struct WriteViewChunksResult {
2119
    std::vector<WriteViewChunkResult> results;
2120
    int64_t total_rows = 0;
2121
    int64_t total_events_matched = 0;
2122
};
2123

2124
CoroTask<WriteViewChunksResult> write_view_chunks_pipeline(
1!
2125
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
2126
    ViewDefinition view, std::vector<ChunkDescriptor> chunks,
2127
    IpcCompression compression, std::size_t event_batch_size) {
2128
    WriteViewChunksResult result;
2129

2130
    if (chunks.empty()) {
2131
        co_return result;
2132
    }
2133

2134
    std::vector<CoroTask<WriteViewChunkResult>> tasks;
2135
    tasks.reserve(chunks.size());
2136

2137
    for (const auto &chunk : chunks) {
2138
        tasks.push_back(write_view_chunk_pipeline(
2139
            file_path, index_path, checkpoint_size, view, chunk.checkpoint_idx,
2140
            chunk.start_byte, chunk.end_byte, chunk.output_file, compression,
2141
            event_batch_size));
2142
    }
2143

2144
    result.results = co_await when_all(std::move(tasks));
2145

2146
    for (const auto &r : result.results) {
2147
        result.total_rows += r.rows_written;
2148
        result.total_events_matched += r.events_matched;
2149
    }
2150

2151
    co_return result;
2152
}
2!
2153

2154
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
2155

2156
TraceReaderConfig build_config(TraceReaderObject *self) {
151✔
2157
    TraceReaderConfig cfg;
151✔
2158
    cfg.file_path = PyUnicode_AsUTF8(self->file_path);
151!
2159
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
151!
2160
    if (idx) cfg.index_dir = idx;
151!
2161
    cfg.checkpoint_size = self->checkpoint_size;
151✔
2162
    cfg.auto_build_index = self->auto_build_index != 0;
151✔
2163
    return cfg;
151✔
UNCOV
2164
}
×
2165

2166
static Runtime *get_runtime(TraceReaderObject *self) {
150✔
2167
    if (self->runtime_obj) {
150✔
2168
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
34✔
2169
    }
2170
    return get_default_runtime();
116✔
2171
}
2172

2173
static TraceReaderIteratorObject *make_memoryview_iterator(
71✔
2174
    std::shared_ptr<MemoryViewBatchIteratorState> state) {
2175
    TraceReaderIteratorObject *it =
2176
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
71✔
2177
            &TraceReaderIteratorType, 0);
2178
    if (!it) return NULL;
71!
2179
    new (&it->batch_state)
71✔
2180
        std::shared_ptr<MemoryViewBatchIteratorState>(std::move(state));
71✔
2181
    it->current_batch = NULL;
71✔
2182
    it->batch_index = 0;
71✔
2183
    new (&it->json_dict_state) std::shared_ptr<JsonDictIteratorState>();
71✔
2184
    new (&it->json_dict_current_batch) std::shared_ptr<JsonDictBatch>();
71✔
2185
    it->json_dict_index = 0;
71✔
2186
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2187
    new (&it->arrow_state) std::shared_ptr<ArrowIteratorState>();
71✔
2188
#endif
2189
    it->mode = IteratorMode::MEMORYVIEW;
71✔
2190
    return it;
71✔
2191
}
2192

2193
static TraceReaderIteratorObject *make_json_dict_iterator(
7✔
2194
    std::shared_ptr<JsonDictIteratorState> state) {
2195
    TraceReaderIteratorObject *it =
2196
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
7✔
2197
            &TraceReaderIteratorType, 0);
2198
    if (!it) return NULL;
7!
2199
    new (&it->batch_state) std::shared_ptr<MemoryViewBatchIteratorState>();
7✔
2200
    it->current_batch = NULL;
7✔
2201
    it->batch_index = 0;
7✔
2202
    new (&it->json_dict_state)
7✔
2203
        std::shared_ptr<JsonDictIteratorState>(std::move(state));
7✔
2204
    new (&it->json_dict_current_batch) std::shared_ptr<JsonDictBatch>();
7✔
2205
    it->json_dict_index = 0;
7✔
2206
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2207
    new (&it->arrow_state) std::shared_ptr<ArrowIteratorState>();
7✔
2208
#endif
2209
    it->mode = IteratorMode::JSON_DICT;
7✔
2210
    return it;
7✔
2211
}
2212

2213
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2214
static TraceReaderIteratorObject *make_arrow_iterator(
27✔
2215
    std::shared_ptr<ArrowIteratorState> state) {
2216
    TraceReaderIteratorObject *it =
2217
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
27✔
2218
            &TraceReaderIteratorType, 0);
2219
    if (!it) return NULL;
27!
2220
    new (&it->batch_state) std::shared_ptr<MemoryViewBatchIteratorState>();
27✔
2221
    it->current_batch = NULL;
27✔
2222
    it->batch_index = 0;
27✔
2223
    new (&it->json_dict_state) std::shared_ptr<JsonDictIteratorState>();
27✔
2224
    new (&it->json_dict_current_batch) std::shared_ptr<JsonDictBatch>();
27✔
2225
    it->json_dict_index = 0;
27✔
2226
    new (&it->arrow_state)
27✔
2227
        std::shared_ptr<ArrowIteratorState>(std::move(state));
27✔
2228
    it->mode = IteratorMode::ARROW;
27✔
2229
    return it;
27✔
2230
}
2231
#endif
2232

2233
}  // namespace
2234

2235
static void TraceReader_dealloc(TraceReaderObject *self) {
158✔
2236
    Py_XDECREF(self->file_path);
158✔
2237
    Py_XDECREF(self->index_dir);
158✔
2238
    Py_XDECREF(self->runtime_obj);
158✔
2239
    Py_TYPE(self)->tp_free((PyObject *)self);
158✔
2240
}
158✔
2241

2242
static PyObject *TraceReader_new(PyTypeObject *type, PyObject *args,
158✔
2243
                                 PyObject *kwds) {
2244
    TraceReaderObject *self = (TraceReaderObject *)type->tp_alloc(type, 0);
158✔
2245
    if (self) {
158!
2246
        self->file_path = NULL;
158✔
2247
        self->index_dir = NULL;
158✔
2248
        self->checkpoint_size = 32 * 1024 * 1024;
158✔
2249
        self->auto_build_index = 0;
158✔
2250
        self->has_index = 0;
158✔
2251
        self->runtime_obj = NULL;
158✔
2252
    }
2253
    return (PyObject *)self;
158✔
2254
}
2255

2256
static int TraceReader_init(TraceReaderObject *self, PyObject *args,
158✔
2257
                            PyObject *kwds) {
2258
    static const char *kwlist[] = {
2259
        "path",    "index_dir", "checkpoint_size", "auto_build_index",
2260
        "runtime", NULL};
2261

2262
    const char *file_path;
2263
    const char *index_dir = "";
158✔
2264
    std::size_t checkpoint_size = 32 * 1024 * 1024;
158✔
2265
    int auto_build_index = 0;
158✔
2266
    PyObject *runtime_arg = NULL;
158✔
2267

2268
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|snpO", (char **)kwlist,
158!
2269
                                     &file_path, &index_dir, &checkpoint_size,
2270
                                     &auto_build_index, &runtime_arg)) {
2271
        return -1;
×
2272
    }
2273

2274
    if (runtime_arg && runtime_arg != Py_None) {
158!
2275
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
32!
2276
            // Direct C++ Runtime object
2277
            Py_INCREF(runtime_arg);
×
2278
            self->runtime_obj = runtime_arg;
×
2279
        } else {
2280
            // Python wrapper, extract _native attribute
2281
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
32!
2282
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
32!
2283
                self->runtime_obj = native;  // already incref'd by GetAttr
32✔
2284
            } else {
2285
                Py_XDECREF(native);
×
2286
                PyErr_SetString(PyExc_TypeError,
×
2287
                                "runtime must be a Runtime instance or None");
2288
                return -1;
×
2289
            }
2290
        }
2291
    }
2292

2293
    self->file_path = PyUnicode_FromString(file_path);
158!
2294
    if (!self->file_path) return -1;
158!
2295

2296
    self->index_dir = PyUnicode_FromString(index_dir);
158!
2297
    if (!self->index_dir) {
158!
2298
        Py_DECREF(self->file_path);
×
2299
        self->file_path = NULL;
×
2300
        return -1;
×
2301
    }
2302

2303
    self->checkpoint_size = checkpoint_size;
158✔
2304
    self->auto_build_index = auto_build_index;
158✔
2305

2306
    try {
2307
        TraceReaderConfig cfg;
158✔
2308
        cfg.file_path = file_path;
158!
2309
        cfg.index_dir = index_dir;
158!
2310
        cfg.checkpoint_size = checkpoint_size;
158✔
2311
        cfg.auto_build_index = auto_build_index != 0;
158✔
2312
        TraceReader probe(std::move(cfg));
158!
2313
        self->has_index = probe.has_index() ? 1 : 0;
158!
2314
    } catch (const std::exception &e) {
158!
2315
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2316
        Py_DECREF(self->file_path);
×
2317
        Py_DECREF(self->index_dir);
×
2318
        self->file_path = NULL;
×
2319
        self->index_dir = NULL;
×
2320
        return -1;
×
2321
    }
×
2322

2323
    return 0;
158✔
2324
}
2325

2326
static PyObject *TraceReader_iter_lines(TraceReaderObject *self, PyObject *args,
62✔
2327
                                        PyObject *kwds) {
2328
    static const char *kwlist[] = {"start_line",    "end_line",    "start_byte",
2329
                                   "end_byte",      "buffer_size", "query",
2330
                                   "memory_budget", NULL};
2331
    Py_ssize_t start_line = 0, end_line = 0;
62✔
2332
    Py_ssize_t start_byte = 0, end_byte = 0;
62✔
2333
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
62✔
2334
    const char *query_str = NULL;
62✔
2335
    Py_ssize_t memory_budget = 0;
62✔
2336

2337
    if (!PyArg_ParseTupleAndKeywords(
62!
2338
            args, kwds, "|nnnnnzn", (char **)kwlist, &start_line, &end_line,
2339
            &start_byte, &end_byte, &buffer_size, &query_str, &memory_budget)) {
2340
        return NULL;
×
2341
    }
2342

2343
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
62!
2344
        buffer_size <= 0) {
59!
2345
        PyErr_SetString(
3!
2346
            PyExc_ValueError,
2347
            "range arguments must be >= 0; buffer_size must be > 0");
2348
        return NULL;
3✔
2349
    }
2350

2351
    TraceReaderConfig cfg;
59✔
2352
    try {
2353
        cfg = build_config(self);
59!
UNCOV
2354
    } catch (const std::exception &e) {
×
2355
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2356
        return NULL;
×
2357
    }
×
2358

2359
    ReadConfig rc;
59✔
2360
    rc.start_line = static_cast<std::size_t>(start_line);
59✔
2361
    rc.end_line = static_cast<std::size_t>(end_line);
59✔
2362
    rc.start_byte = static_cast<std::size_t>(start_byte);
59✔
2363
    rc.end_byte = static_cast<std::size_t>(end_byte);
59✔
2364
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
59✔
2365
    if (query_str) rc.query = query_str;
59!
2366

2367
    auto state = std::make_shared<MemoryViewBatchIteratorState>();
59!
2368
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
59!
2369
        static_cast<std::size_t>(memory_budget));
2370

2371
    Runtime *rt = get_runtime(self);
59!
2372
    std::size_t max_workers = rt->threads();
59!
2373
    constexpr std::size_t LINE_BATCH_SIZE = 1024;
59✔
2374
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
59!
2375
        state->memory_budget_bytes, LINE_BATCH_SIZE * ESTIMATED_BYTES_PER_LINE,
59✔
2376
        max_workers);
2377
    state->channel =
59✔
2378
        dftracer::utils::coro::make_channel<MemoryViewBatchData>(capacity);
118!
2379
    auto *sp = state.get();
59✔
2380

2381
    try {
2382
        bool is_dir = fs::is_directory(cfg.file_path);
59!
2383
        if (is_dir) {
59✔
2384
            auto handle = rt->scope(
2385
                "iter_lines_parallel",
2386
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
3!
2387
                 checkpoint_size = cfg.checkpoint_size,
3✔
2388
                 auto_build_index = cfg.auto_build_index, rc,
3✔
2389
                 max_workers](CoroScope &scope) -> CoroTask<void> {
2390
                    co_await produce_lines_parallel(
2391
                        scope, sp, dir_path, index_dir, checkpoint_size,
2392
                        auto_build_index, rc, LINE_BATCH_SIZE, max_workers);
2393
                });
12!
2394
            state->task_future = handle.future;
3✔
2395
        } else {
3✔
2396
            auto handle = rt->submit(
2397
                produce_lines_batched(state, state->channel->producer(), cfg,
112!
2398
                                      rc, LINE_BATCH_SIZE),
2399
                "iter_lines");
168!
2400
            state->task_future = handle.future;
56✔
2401
        }
56✔
UNCOV
2402
    } catch (const std::exception &e) {
×
2403
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2404
        return NULL;
×
2405
    }
×
2406

2407
    TraceReaderIteratorObject *it = make_memoryview_iterator(std::move(state));
59!
2408
    return (PyObject *)it;
59✔
2409
}
59✔
2410

2411
static PyObject *TraceReader_iter_raw(TraceReaderObject *self, PyObject *args,
13✔
2412
                                      PyObject *kwds) {
2413
    static const char *kwlist[] = {"start_line", "end_line",    "start_byte",
2414
                                   "end_byte",   "buffer_size", "line_aligned",
2415
                                   "multi_line", "query",       "memory_budget",
2416
                                   NULL};
2417
    Py_ssize_t start_line = 0, end_line = 0;
13✔
2418
    Py_ssize_t start_byte = 0, end_byte = 0;
13✔
2419
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
13✔
2420
    int line_aligned = 1;
13✔
2421
    int multi_line = 1;
13✔
2422
    const char *query_str = NULL;
13✔
2423
    Py_ssize_t memory_budget = 0;
13✔
2424

2425
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnppzn", (char **)kwlist,
13!
2426
                                     &start_line, &end_line, &start_byte,
2427
                                     &end_byte, &buffer_size, &line_aligned,
2428
                                     &multi_line, &query_str, &memory_budget)) {
2429
        return NULL;
×
2430
    }
2431

2432
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
13!
2433
        buffer_size <= 0) {
12!
2434
        PyErr_SetString(
1!
2435
            PyExc_ValueError,
2436
            "range arguments must be >= 0; buffer_size must be > 0");
2437
        return NULL;
1✔
2438
    }
2439

2440
    TraceReaderConfig cfg;
12✔
2441
    try {
2442
        cfg = build_config(self);
12!
UNCOV
2443
    } catch (const std::exception &e) {
×
2444
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2445
        return NULL;
×
2446
    }
×
2447

2448
    ReadConfig rc;
12✔
2449
    rc.start_line = static_cast<std::size_t>(start_line);
12✔
2450
    rc.end_line = static_cast<std::size_t>(end_line);
12✔
2451
    rc.start_byte = static_cast<std::size_t>(start_byte);
12✔
2452
    rc.end_byte = static_cast<std::size_t>(end_byte);
12✔
2453
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
12✔
2454
    rc.line_aligned = line_aligned != 0;
12✔
2455
    rc.multi_line = multi_line != 0;
12✔
2456
    if (query_str) rc.query = query_str;
12!
2457

2458
    auto state = std::make_shared<MemoryViewBatchIteratorState>();
12!
2459
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
12!
2460
        static_cast<std::size_t>(memory_budget));
2461

2462
    Runtime *rt = get_runtime(self);
12!
2463
    std::size_t max_workers = rt->threads();
12!
2464
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
12!
2465
        state->memory_budget_bytes, ESTIMATED_BYTES_PER_RAW_CHUNK, max_workers);
12✔
2466
    state->channel =
12✔
2467
        dftracer::utils::coro::make_channel<MemoryViewBatchData>(capacity);
24!
2468
    auto *sp = state.get();
12✔
2469

2470
    try {
2471
        bool is_dir = fs::is_directory(cfg.file_path);
12!
2472
        if (is_dir) {
12✔
2473
            auto handle = rt->scope(
2474
                "iter_raw_parallel",
2475
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
2!
2476
                 checkpoint_size = cfg.checkpoint_size,
2✔
2477
                 auto_build_index = cfg.auto_build_index, rc,
2✔
2478
                 max_workers](CoroScope &scope) -> CoroTask<void> {
2479
                    co_await produce_raw_parallel(
2480
                        scope, sp, dir_path, index_dir, checkpoint_size,
2481
                        auto_build_index, rc, max_workers);
2482
                });
8!
2483
            state->task_future = handle.future;
2✔
2484
        } else {
2✔
2485
            auto handle = rt->submit(
2486
                produce_raw_batched(state, state->channel->producer(), cfg, rc),
20!
2487
                "iter_raw");
30!
2488
            state->task_future = handle.future;
10✔
2489
        }
10✔
UNCOV
2490
    } catch (const std::exception &e) {
×
2491
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2492
        return NULL;
×
2493
    }
×
2494

2495
    TraceReaderIteratorObject *it = make_memoryview_iterator(std::move(state));
12!
2496
    return (PyObject *)it;
12✔
2497
}
12✔
2498

2499
static PyObject *TraceReader_read_lines(TraceReaderObject *self, PyObject *args,
46✔
2500
                                        PyObject *kwds) {
2501
    PyObject *iter = TraceReader_iter_lines(self, args, kwds);
46✔
2502
    if (!iter) return NULL;
46✔
2503
    PyObject *list = PySequence_List(iter);
44✔
2504
    Py_DECREF(iter);
2505
    return list;
44✔
2506
}
2507

2508
static PyObject *TraceReader_iter_json(TraceReaderObject *self, PyObject *args,
7✔
2509
                                       PyObject *kwds) {
2510
    static const char *kwlist[] = {"start_line", "end_line",      "start_byte",
2511
                                   "end_byte",   "buffer_size",   "query",
2512
                                   "batch_size", "memory_budget", NULL};
2513
    Py_ssize_t start_line = 0, end_line = 0;
7✔
2514
    Py_ssize_t start_byte = 0, end_byte = 0;
7✔
2515
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
7✔
2516
    const char *query_str = NULL;
7✔
2517
    Py_ssize_t batch_size = 1024;
7✔
2518
    Py_ssize_t memory_budget = 0;
7✔
2519

2520
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnznn", (char **)kwlist,
7!
2521
                                     &start_line, &end_line, &start_byte,
2522
                                     &end_byte, &buffer_size, &query_str,
2523
                                     &batch_size, &memory_budget)) {
2524
        return NULL;
×
2525
    }
2526

2527
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
7!
2528
        buffer_size <= 0 || batch_size <= 0) {
7!
2529
        PyErr_SetString(PyExc_ValueError,
×
2530
                        "range arguments must be >= 0; buffer_size and "
2531
                        "batch_size must be > 0");
2532
        return NULL;
×
2533
    }
2534

2535
    TraceReaderConfig cfg;
7✔
2536
    try {
2537
        cfg = build_config(self);
7!
UNCOV
2538
    } catch (const std::exception &e) {
×
2539
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2540
        return NULL;
×
2541
    }
×
2542

2543
    ReadConfig rc;
7✔
2544
    rc.start_line = static_cast<std::size_t>(start_line);
7✔
2545
    rc.end_line = static_cast<std::size_t>(end_line);
7✔
2546
    rc.start_byte = static_cast<std::size_t>(start_byte);
7✔
2547
    rc.end_byte = static_cast<std::size_t>(end_byte);
7✔
2548
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
7✔
2549
    if (query_str) rc.query = query_str;
7!
2550

2551
    auto state = std::make_shared<JsonDictIteratorState>();
7!
2552
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
7!
2553
        static_cast<std::size_t>(memory_budget));
2554

2555
    Runtime *rt = get_runtime(self);
7!
2556
    std::size_t max_workers = rt->threads();
7!
2557
    auto bs = static_cast<std::size_t>(batch_size);
7✔
2558
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
7!
2559
        state->memory_budget_bytes, bs * ESTIMATED_BYTES_PER_JSON_EVENT,
7✔
2560
        max_workers);
2561
    state->channel =
7✔
2562
        dftracer::utils::coro::make_channel<JsonDictBatch>(capacity);
14!
2563
    auto *sp = state.get();
7✔
2564

2565
    try {
2566
        bool is_dir = fs::is_directory(cfg.file_path);
7!
2567
        if (is_dir) {
7✔
2568
            auto handle = rt->scope(
2569
                "iter_json_parallel",
2570
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
6!
2571
                 checkpoint_size = cfg.checkpoint_size,
6✔
2572
                 auto_build_index = cfg.auto_build_index, rc, bs,
6✔
2573
                 max_workers](CoroScope &scope) -> CoroTask<void> {
2574
                    co_await produce_json_dicts_parallel(
2575
                        scope, sp, dir_path, index_dir, checkpoint_size,
2576
                        auto_build_index, rc, bs, max_workers);
2577
                });
24!
2578
            state->task_future = handle.future;
6✔
2579
        } else {
6✔
2580
            auto handle =
2581
                rt->submit(produce_json_dicts(state, state->channel->producer(),
2!
2582
                                              cfg, rc, bs),
2583
                           "iter_json");
3!
2584
            state->task_future = handle.future;
1✔
2585
        }
1✔
UNCOV
2586
    } catch (const std::exception &e) {
×
2587
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2588
        return NULL;
×
2589
    }
×
2590

2591
    TraceReaderIteratorObject *it = make_json_dict_iterator(std::move(state));
7!
2592
    return (PyObject *)it;
7✔
2593
}
7✔
2594

2595
static PyObject *TraceReader_read_json_py(TraceReaderObject *self,
1✔
2596
                                          PyObject *args, PyObject *kwds) {
2597
    PyObject *iter = TraceReader_iter_json(self, args, kwds);
1✔
2598
    if (!iter) return NULL;
1!
2599
    PyObject *list = PySequence_List(iter);
1✔
2600
    Py_DECREF(iter);
2601
    return list;
1✔
2602
}
2603

2604
static PyObject *TraceReader_read_raw(TraceReaderObject *self, PyObject *args,
4✔
2605
                                      PyObject *kwds) {
2606
    PyObject *iter = TraceReader_iter_raw(self, args, kwds);
4✔
2607
    if (!iter) return NULL;
4!
2608
    PyObject *list = PySequence_List(iter);
4✔
2609
    Py_DECREF(iter);
2610
    return list;
4✔
2611
}
2612

2613
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2614

2615
static PyObject *TraceReader_iter_arrow(TraceReaderObject *self, PyObject *args,
27✔
2616
                                        PyObject *kwds) {
2617
    static const char *kwlist[] = {
2618
        "batch_size", "start_line",    "end_line", "start_byte",
2619
        "end_byte",   "buffer_size",   "query",    "flatten_objects",
2620
        "normalize",  "memory_budget", NULL};
2621
    Py_ssize_t batch_size = 10000;
27✔
2622
    Py_ssize_t start_line = 0, end_line = 0;
27✔
2623
    Py_ssize_t start_byte = 0, end_byte = 0;
27✔
2624
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
27✔
2625
    const char *query_str = NULL;
27✔
2626
    int flatten_objects = 1;  // default: expand top-level objects
27✔
2627
    int normalize = 0;
27✔
2628
    Py_ssize_t memory_budget = 0;
27✔
2629

2630
    if (!PyArg_ParseTupleAndKeywords(
27!
2631
            args, kwds, "|nnnnnnzppn", (char **)kwlist, &batch_size,
2632
            &start_line, &end_line, &start_byte, &end_byte, &buffer_size,
2633
            &query_str, &flatten_objects, &normalize, &memory_budget)) {
2634
        return NULL;
×
2635
    }
2636

2637
    if (batch_size <= 0) {
27!
2638
        PyErr_SetString(PyExc_ValueError, "batch_size must be > 0");
×
2639
        return NULL;
×
2640
    }
2641
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
27!
2642
        buffer_size <= 0) {
27!
2643
        PyErr_SetString(
×
2644
            PyExc_ValueError,
2645
            "range arguments must be >= 0; buffer_size must be > 0");
2646
        return NULL;
×
2647
    }
2648

2649
    TraceReaderConfig cfg;
27✔
2650
    try {
2651
        cfg = build_config(self);
27!
UNCOV
2652
    } catch (const std::exception &e) {
×
2653
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2654
        return NULL;
×
2655
    }
×
2656

2657
    ReadConfig rc;
27✔
2658
    rc.start_line = static_cast<std::size_t>(start_line);
27✔
2659
    rc.end_line = static_cast<std::size_t>(end_line);
27✔
2660
    rc.start_byte = static_cast<std::size_t>(start_byte);
27✔
2661
    rc.end_byte = static_cast<std::size_t>(end_byte);
27✔
2662
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
27✔
2663
    rc.flatten_objects = flatten_objects != 0;
27✔
2664
    if (query_str) rc.query = query_str;
27!
2665

2666
    auto state = std::make_shared<ArrowIteratorState>();
27!
2667
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
27!
2668
        static_cast<std::size_t>(memory_budget));
2669

2670
    Runtime *rt = get_runtime(self);
27!
2671
    std::size_t max_workers = rt->threads();
27!
2672
    auto bs = static_cast<std::size_t>(batch_size);
27✔
2673
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
27!
2674
        state->memory_budget_bytes, bs * ESTIMATED_BYTES_PER_ARROW_ROW,
27✔
2675
        max_workers);
2676
    state->channel =
27✔
2677
        dftracer::utils::coro::make_channel<ArrowIteratorState::BatchType>(
54!
2678
            capacity);
27✔
2679
    auto *sp = state.get();
27✔
2680

2681
    try {
2682
        bool is_dir = fs::is_directory(cfg.file_path);
27!
2683
        if (is_dir) {
27✔
2684
            auto handle = rt->scope(
2685
                "iter_arrow_parallel",
2686
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
5!
2687
                 checkpoint_size = cfg.checkpoint_size,
5✔
2688
                 auto_build_index = cfg.auto_build_index, rc, bs,
5✔
2689
                 norm = normalize != 0,
5✔
2690
                 max_workers](CoroScope &scope) -> CoroTask<void> {
2691
                    co_await produce_arrow_batches_parallel(
2692
                        scope, sp, dir_path, index_dir, checkpoint_size,
2693
                        auto_build_index, rc, bs, norm, max_workers);
2694
                });
20!
2695
            state->task_future = handle.future;
5✔
2696
        } else if (normalize) {
27!
2697
            auto handle = rt->submit(
2698
                produce_arrow_batches(state, state->channel->producer(), cfg,
×
2699
                                      rc, static_cast<std::size_t>(batch_size),
2700
                                      flatten_objects != 0, normalize != 0),
2701
                "iter_arrow");
×
2702
            state->task_future = handle.future;
×
2703
        } else {
×
2704
            std::vector<std::string> files_vec{cfg.file_path};
66!
2705
            auto handle = rt->scope(
2706
                "iter_arrow_parallel",
2707
                [sp, files = std::move(files_vec), index_dir = cfg.index_dir,
22!
2708
                 checkpoint_size = cfg.checkpoint_size,
22✔
2709
                 auto_build_index = cfg.auto_build_index, rc, bs,
22✔
2710
                 norm = normalize != 0,
22✔
2711
                 max_workers](CoroScope &scope) mutable -> CoroTask<void> {
2712
                    co_await produce_arrow_batches_for_files(
2713
                        scope, sp, std::move(files), index_dir, checkpoint_size,
2714
                        auto_build_index, rc, bs, norm, max_workers);
2715
                });
88!
2716
            state->task_future = handle.future;
22✔
2717
        }
22✔
UNCOV
2718
    } catch (const std::exception &e) {
×
2719
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2720
        return NULL;
×
2721
    }
×
2722

2723
    TraceReaderIteratorObject *it = make_arrow_iterator(std::move(state));
27!
2724
    return (PyObject *)it;
27✔
2725
}
27✔
2726

2727
// Build ArrowIteratorState + spawn the producer task. Same plumbing as
2728
// TraceReader_iter_arrow but returns the state so callers can wrap it as
2729
// either a per-batch iterator or an ArrowArrayStream.
2730
static std::shared_ptr<ArrowIteratorState> spawn_arrow_producer(
27✔
2731
    TraceReaderObject *self, PyObject *args, PyObject *kwds) {
2732
    static const char *kwlist[] = {
2733
        "batch_size", "start_line",    "end_line", "start_byte",
2734
        "end_byte",   "buffer_size",   "query",    "flatten_objects",
2735
        "normalize",  "memory_budget", NULL};
2736
    Py_ssize_t batch_size = 10000;
27✔
2737
    Py_ssize_t start_line = 0, end_line = 0;
27✔
2738
    Py_ssize_t start_byte = 0, end_byte = 0;
27✔
2739
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
27✔
2740
    const char *query_str = NULL;
27✔
2741
    int flatten_objects = 1;  // default: expand top-level objects
27✔
2742
    int normalize = 0;
27✔
2743
    Py_ssize_t memory_budget = 0;
27✔
2744

2745
    if (!PyArg_ParseTupleAndKeywords(
27!
2746
            args, kwds, "|nnnnnnzppn", (char **)kwlist, &batch_size,
2747
            &start_line, &end_line, &start_byte, &end_byte, &buffer_size,
2748
            &query_str, &flatten_objects, &normalize, &memory_budget)) {
2749
        return nullptr;
×
2750
    }
2751

2752
    if (batch_size <= 0) {
27!
2753
        PyErr_SetString(PyExc_ValueError, "batch_size must be > 0");
×
2754
        return nullptr;
×
2755
    }
2756
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
27!
2757
        buffer_size <= 0) {
27!
2758
        PyErr_SetString(
×
2759
            PyExc_ValueError,
2760
            "range arguments must be >= 0; buffer_size must be > 0");
2761
        return nullptr;
×
2762
    }
2763

2764
    TraceReaderConfig cfg;
27✔
2765
    try {
2766
        cfg = build_config(self);
27!
UNCOV
2767
    } catch (const std::exception &e) {
×
2768
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2769
        return nullptr;
×
2770
    }
×
2771

2772
    ReadConfig rc;
27✔
2773
    rc.start_line = static_cast<std::size_t>(start_line);
27✔
2774
    rc.end_line = static_cast<std::size_t>(end_line);
27✔
2775
    rc.start_byte = static_cast<std::size_t>(start_byte);
27✔
2776
    rc.end_byte = static_cast<std::size_t>(end_byte);
27✔
2777
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
27✔
2778
    rc.flatten_objects = flatten_objects != 0;
27✔
2779
    if (query_str) rc.query = query_str;
27!
2780

2781
    auto state = std::make_shared<ArrowIteratorState>();
27!
2782
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
27!
2783
        static_cast<std::size_t>(memory_budget));
2784

2785
    Runtime *rt = get_runtime(self);
27!
2786
    std::size_t max_workers = rt->threads();
27!
2787
    auto bs = static_cast<std::size_t>(batch_size);
27✔
2788
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
27!
2789
        state->memory_budget_bytes, bs * ESTIMATED_BYTES_PER_ARROW_ROW,
27✔
2790
        max_workers);
2791
    state->channel =
27✔
2792
        dftracer::utils::coro::make_channel<ArrowIteratorState::BatchType>(
54!
2793
            capacity);
27✔
2794
    auto *sp = state.get();
27✔
2795

2796
    try {
2797
        bool is_dir = fs::is_directory(cfg.file_path);
27!
2798
        if (is_dir) {
27✔
2799
            auto handle = rt->scope(
2800
                "iter_arrow_parallel",
2801
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
10!
2802
                 checkpoint_size = cfg.checkpoint_size,
10✔
2803
                 auto_build_index = cfg.auto_build_index, rc, bs,
10✔
2804
                 norm = normalize != 0,
10✔
2805
                 max_workers](CoroScope &scope) -> CoroTask<void> {
2806
                    co_await produce_arrow_batches_parallel(
2807
                        scope, sp, dir_path, index_dir, checkpoint_size,
2808
                        auto_build_index, rc, bs, norm, max_workers);
2809
                });
40!
2810
            state->task_future = handle.future;
10✔
2811
        } else {
10✔
2812
            auto handle = rt->submit(
2813
                produce_arrow_batches(state, state->channel->producer(), cfg,
34!
2814
                                      rc, static_cast<std::size_t>(batch_size),
2815
                                      flatten_objects != 0, normalize != 0),
2816
                "iter_arrow");
51!
2817
            state->task_future = handle.future;
17✔
2818
        }
17✔
UNCOV
2819
    } catch (const std::exception &e) {
×
2820
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2821
        return nullptr;
×
2822
    }
×
2823

2824
    return state;
27✔
2825
}
27✔
2826

2827
static PyObject *TraceReader_iter_arrow_stream(TraceReaderObject *self,
14✔
2828
                                               PyObject *args, PyObject *kwds) {
2829
    auto state = spawn_arrow_producer(self, args, kwds);
14!
2830
    if (!state) return NULL;
14!
2831
    return make_arrow_batch_stream(std::move(state));
14!
2832
}
14✔
2833

2834
static PyObject *TraceReader_read_arrow(TraceReaderObject *self, PyObject *args,
13✔
2835
                                        PyObject *kwds) {
2836
    auto state = spawn_arrow_producer(self, args, kwds);
13!
2837
    if (!state) return NULL;
13!
2838
    PyObject *stream = make_arrow_batch_stream(std::move(state));
13!
2839
    if (!stream) return NULL;
13!
2840
    return dftracer::utils::python::wrap_arrow_stream_table(stream);
13!
2841
}
13✔
2842

2843
#endif  // DFTRACER_UTILS_ENABLE_ARROW
2844

2845
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
2846

UNCOV
2847
static int parse_str_list_trace(PyObject *obj, std::vector<std::string> &out,
×
2848
                                const char *param_name) {
UNCOV
2849
    if (!obj || obj == Py_None) return 0;
×
UNCOV
2850
    if (!PyList_Check(obj)) {
×
UNCOV
2851
        PyErr_Format(PyExc_TypeError, "%s must be a list of str", param_name);
×
UNCOV
2852
        return -1;
×
2853
    }
UNCOV
2854
    Py_ssize_t n = PyList_Size(obj);
×
UNCOV
2855
    for (Py_ssize_t i = 0; i < n; i++) {
×
UNCOV
2856
        const char *s = PyUnicode_AsUTF8(PyList_GetItem(obj, i));
×
UNCOV
2857
        if (!s) return -1;
×
UNCOV
2858
        out.emplace_back(s);
×
2859
    }
UNCOV
2860
    return 0;
×
2861
}
2862

2863
static PyObject *TraceReader_write_arrow(TraceReaderObject *self,
13✔
2864
                                         PyObject *args, PyObject *kwds) {
2865
    static const char *kwlist[] = {"path",        "views",      "chunk_size_mb",
2866
                                   "compression", "batch_size", NULL};
2867
    const char *path = NULL;
13✔
2868
    PyObject *views_obj = Py_None;
13✔
2869
    int chunk_size_mb = 32;
13✔
2870
    const char *compression_str = "zstd";
13✔
2871
    Py_ssize_t batch_size = 10000;
13✔
2872

2873
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|Oisn", (char **)kwlist,
13!
2874
                                     &path, &views_obj, &chunk_size_mb,
2875
                                     &compression_str, &batch_size)) {
2876
        return NULL;
×
2877
    }
2878

2879
    if (chunk_size_mb < 0) {
13!
2880
        PyErr_SetString(PyExc_ValueError, "chunk_size_mb must be >= 0");
×
2881
        return NULL;
×
2882
    }
2883

2884
    std::vector<ViewDefinition> views;
13✔
2885
    if (views_obj && views_obj != Py_None) {
13!
2886
        if (!PyList_Check(views_obj)) {
5!
2887
            PyErr_SetString(PyExc_TypeError, "views must be a list or None");
×
2888
            return NULL;
×
2889
        }
2890
        Py_ssize_t n = PyList_Size(views_obj);
5!
2891
        for (Py_ssize_t i = 0; i < n; i++) {
11✔
2892
            PyObject *item = PyList_GetItem(views_obj, i);
6!
2893
            ViewDefinition vd;
6✔
2894

2895
            if (PyUnicode_Check(item)) {
6✔
2896
                const char *name = PyUnicode_AsUTF8(item);
1!
2897
                if (!name) return NULL;
1!
2898
                std::string name_str(name);
1!
2899
                if (name_str == "io") {
1!
2900
                    vd = ViewDefinition::io_view();
1!
UNCOV
2901
                } else if (name_str == "compute") {
×
2902
                    vd = ViewDefinition::compute_view();
×
2903
                } else if (name_str == "dlio") {
×
2904
                    vd = ViewDefinition::dlio_view();
×
2905
                } else {
2906
                    vd.with_name(name_str);
×
2907
                }
2908
            } else if (PyDict_Check(item)) {
6!
2909
                PyObject *name_obj = PyDict_GetItemString(item, "name");
5!
2910
                if (!name_obj || !PyUnicode_Check(name_obj)) {
5!
2911
                    PyErr_SetString(PyExc_ValueError,
×
2912
                                    "view dict must have 'name' string");
2913
                    return NULL;
×
2914
                }
2915
                vd.with_name(PyUnicode_AsUTF8(name_obj));
5!
2916

2917
                PyObject *query_obj = PyDict_GetItemString(item, "query");
5!
2918
                if (query_obj && query_obj != Py_None) {
5!
2919
                    if (!PyUnicode_Check(query_obj)) {
5!
2920
                        PyErr_SetString(PyExc_ValueError,
×
2921
                                        "view 'query' must be a string");
2922
                        return NULL;
×
2923
                    }
2924
                    vd.with_query(PyUnicode_AsUTF8(query_obj));
5!
2925
                }
2926

2927
                PyObject *meta_obj =
2928
                    PyDict_GetItemString(item, "include_metadata");
5!
2929
                if (meta_obj && meta_obj != Py_None) {
5!
2930
                    vd.with_include_metadata(PyObject_IsTrue(meta_obj));
1!
2931
                }
2932
            } else {
2933
                PyErr_SetString(PyExc_TypeError,
×
2934
                                "views list must contain strings or dicts");
2935
                return NULL;
×
2936
            }
2937
            views.push_back(std::move(vd));
6!
2938
        }
6!
2939
    }
2940

2941
    IpcCompression compression = IpcCompression::ZSTD;
13✔
2942
    if (compression_str) {
13!
2943
        std::string comp_lower(compression_str);
13!
2944
        for (auto &c : comp_lower) c = std::tolower(c);
65✔
2945
        if (comp_lower == "none") {
13✔
2946
            compression = IpcCompression::NONE;
1✔
2947
        } else if (comp_lower == "zstd") {
12!
2948
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
2949
            compression = IpcCompression::ZSTD;
12✔
2950
#else
2951
            PyErr_SetString(
2952
                PyExc_ValueError,
2953
                "ZSTD compression not available (built without ZSTD)");
2954
            return NULL;
2955
#endif
2956
        } else {
2957
            PyErr_Format(PyExc_ValueError,
×
2958
                         "Unknown compression: %s (use 'none' or 'zstd')",
2959
                         compression_str);
2960
            return NULL;
×
2961
        }
2962
    }
13!
2963

2964
    int64_t chunk_size_bytes =
13✔
2965
        static_cast<int64_t>(chunk_size_mb) * 1024 * 1024;
13✔
2966

2967
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
13!
2968
    std::string index_path;
13✔
2969
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
13!
2970
    if (idx && idx[0] != '\0') {
13!
2971
        index_path = idx;
×
2972
    }
2973
    std::size_t checkpoint_size = self->checkpoint_size;
13✔
2974

2975
    std::string output_path(path);
13!
2976
    WriteArrowResult result;
13✔
2977
    std::string error_msg;
13✔
2978

2979
    Py_BEGIN_ALLOW_THREADS try {
13!
2980
        Runtime *rt = get_runtime(self);
13!
2981
        result =
2982
            rt->submit(write_arrow_pipeline(
39!
2983
                           file_path, index_path, checkpoint_size,
2984
                           std::move(views), output_path, chunk_size_bytes,
13✔
2985
                           compression, static_cast<std::size_t>(batch_size)),
2986
                       "write_arrow")
2987
                .get();
13!
UNCOV
2988
    } catch (const std::exception &e) {
×
2989
        error_msg = e.what();
×
2990
    }
×
2991
    Py_END_ALLOW_THREADS
13!
2992

2993
        if (!error_msg.empty()) {
13!
2994
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
2995
        return NULL;
×
2996
    }
2997

2998
    if (!result.error.empty()) {
13!
2999
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
3000
        return NULL;
×
3001
    }
3002

3003
    // Build result dict
3004
    PyObject *dict = PyDict_New();
13!
3005
    if (!dict) return NULL;
13!
3006

3007
    // Build files list per partition
3008
    PyObject *partitions_dict = PyDict_New();
13!
3009
    if (!partitions_dict) {
13!
3010
        Py_DECREF(dict);
3011
        return NULL;
×
3012
    }
3013

3014
    for (const auto &[partition_name, partition_stats] :
27✔
3015
         result.stats.partitions) {
40✔
3016
        PyObject *partition_dict = PyDict_New();
14!
3017
        if (!partition_dict) {
14!
3018
            Py_DECREF(partitions_dict);
3019
            Py_DECREF(dict);
3020
            return NULL;
×
3021
        }
3022

3023
        PyObject *files_list = PyList_New(0);
14!
3024
        if (!files_list) {
14!
3025
            Py_DECREF(partition_dict);
3026
            Py_DECREF(partitions_dict);
3027
            Py_DECREF(dict);
3028
            return NULL;
×
3029
        }
3030

3031
        for (const auto &f : partition_stats.files) {
22✔
3032
            PyObject *file_str = PyUnicode_FromString(f.c_str());
8!
3033
            if (!file_str || PyList_Append(files_list, file_str) < 0) {
8!
3034
                Py_XDECREF(file_str);
×
3035
                Py_DECREF(files_list);
3036
                Py_DECREF(partition_dict);
3037
                Py_DECREF(partitions_dict);
3038
                Py_DECREF(dict);
3039
                return NULL;
×
3040
            }
3041
            Py_DECREF(file_str);
3042
        }
3043

3044
        PyDict_SetItemString(partition_dict, "files", files_list);
14!
3045
        dict_set_steal(partition_dict, "rows",
14!
3046
                       PyLong_FromLongLong(partition_stats.total_rows));
14!
3047
        dict_set_steal(
14!
3048
            partition_dict, "bytes",
3049
            PyLong_FromLongLong(partition_stats.total_uncompressed_bytes));
14!
3050
        Py_DECREF(files_list);
3051

3052
        PyObject *key = partition_name.empty()
14✔
3053
                            ? PyUnicode_FromString("_default")
14!
3054
                            : PyUnicode_FromString(partition_name.c_str());
14!
3055
        PyDict_SetItem(partitions_dict, key, partition_dict);
14!
3056
        Py_DECREF(key);
3057
        Py_DECREF(partition_dict);
3058
    }
3059

3060
    PyDict_SetItemString(dict, "partitions", partitions_dict);
13!
3061
    dict_set_steal(dict, "total_rows",
13!
3062
                   PyLong_FromLongLong(result.stats.total_rows));
13!
3063
    dict_set_steal(dict, "total_bytes",
13!
3064
                   PyLong_FromLongLong(result.stats.total_uncompressed_bytes));
13!
3065
    dict_set_steal(dict, "chunks_scanned",
13!
3066
                   PyLong_FromUnsignedLongLong(result.chunks_scanned));
13!
3067
    dict_set_steal(dict, "chunks_skipped",
13!
3068
                   PyLong_FromUnsignedLongLong(result.chunks_skipped));
13!
3069
    Py_DECREF(partitions_dict);
3070

3071
    return dict;
13✔
3072
}
13✔
3073

3074
static PyObject *TraceReader_get_view_chunks(TraceReaderObject *self,
3✔
3075
                                             PyObject *args, PyObject *kwds) {
3076
    static const char *kwlist[] = {"view", NULL};
3077
    PyObject *view_obj = Py_None;
3✔
3078

3079
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char **)kwlist,
3!
3080
                                     &view_obj)) {
3081
        return NULL;
×
3082
    }
3083

3084
    ViewDefinition view;
3✔
3085
    if (view_obj && view_obj != Py_None) {
3!
3086
        if (PyUnicode_Check(view_obj)) {
1!
3087
            const char *name = PyUnicode_AsUTF8(view_obj);
×
3088
            if (!name) return NULL;
×
3089
            std::string name_str(name);
×
3090
            if (name_str == "io") {
×
3091
                view = ViewDefinition::io_view();
×
3092
            } else if (name_str == "compute") {
×
3093
                view = ViewDefinition::compute_view();
×
3094
            } else if (name_str == "dlio") {
×
3095
                view = ViewDefinition::dlio_view();
×
3096
            } else {
3097
                view.with_name(name_str);
×
3098
            }
3099
        } else if (PyDict_Check(view_obj)) {
1!
3100
            PyObject *name_obj = PyDict_GetItemString(view_obj, "name");
1!
3101
            if (name_obj && PyUnicode_Check(name_obj)) {
1!
3102
                view.with_name(PyUnicode_AsUTF8(name_obj));
1!
3103
            }
3104
            PyObject *query_obj = PyDict_GetItemString(view_obj, "query");
1!
3105
            if (query_obj && query_obj != Py_None &&
2!
3106
                PyUnicode_Check(query_obj)) {
1✔
3107
                view.with_query(PyUnicode_AsUTF8(query_obj));
1!
3108
            }
3109
        } else {
3110
            PyErr_SetString(PyExc_TypeError, "view must be a string or dict");
×
3111
            return NULL;
×
3112
        }
3113
    }
3114

3115
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
3!
3116
    std::string index_path;
3✔
3117
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
3!
3118
    if (idx && idx[0] != '\0') {
3!
3119
        index_path = idx;
×
3120
    }
3121
    std::size_t checkpoint_size = self->checkpoint_size;
3✔
3122

3123
    GetViewChunksResult result;
3✔
3124
    std::string error_msg;
3✔
3125

3126
    Py_BEGIN_ALLOW_THREADS try {
3!
3127
        Runtime *rt = get_runtime(self);
3!
3128
        result = rt->submit(get_view_chunks_pipeline(file_path, index_path,
6!
3129
                                                     checkpoint_size, view),
3130
                            "get_view_chunks")
3131
                     .get();
3!
UNCOV
3132
    } catch (const std::exception &e) {
×
3133
        error_msg = e.what();
×
3134
    }
×
3135
    Py_END_ALLOW_THREADS
3!
3136

3137
        if (!error_msg.empty()) {
3!
3138
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
3139
        return NULL;
×
3140
    }
3141

3142
    if (!result.error.empty()) {
3!
3143
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
3144
        return NULL;
×
3145
    }
3146

3147
    PyObject *dict = PyDict_New();
3!
3148
    if (!dict) return NULL;
3!
3149

3150
    PyObject *chunks_list = PyList_New(result.chunks.size());
3!
3151
    if (!chunks_list) {
3!
3152
        Py_DECREF(dict);
3153
        return NULL;
×
3154
    }
3155

3156
    for (std::size_t i = 0; i < result.chunks.size(); ++i) {
7✔
3157
        const auto &chunk = result.chunks[i];
4✔
3158
        PyObject *chunk_dict = PyDict_New();
4!
3159
        if (!chunk_dict) {
4!
3160
            Py_DECREF(chunks_list);
3161
            Py_DECREF(dict);
3162
            return NULL;
×
3163
        }
3164
        dict_set_steal(chunk_dict, "checkpoint_idx",
4!
3165
                       PyLong_FromUnsignedLongLong(chunk.checkpoint_idx));
4!
3166
        dict_set_steal(chunk_dict, "start_byte",
4!
3167
                       PyLong_FromSize_t(chunk.start_byte));
4!
3168
        dict_set_steal(chunk_dict, "end_byte",
4!
3169
                       PyLong_FromSize_t(chunk.end_byte));
4!
3170
        PyList_SetItem(chunks_list, i, chunk_dict);
4!
3171
    }
3172

3173
    PyDict_SetItemString(dict, "chunks", chunks_list);
3!
3174
    dict_set_steal(dict, "total_checkpoints",
3!
3175
                   PyLong_FromUnsignedLongLong(result.total_checkpoints));
3!
3176
    dict_set_steal(dict, "skipped_checkpoints",
3!
3177
                   PyLong_FromUnsignedLongLong(result.skipped_checkpoints));
3!
3178
    dict_set_steal(dict, "file_may_match",
3!
3179
                   PyBool_FromLong(result.file_may_match ? 1 : 0));
3✔
3180
    Py_DECREF(chunks_list);
3181

3182
    return dict;
3✔
3183
}
3✔
3184

3185
static PyObject *TraceReader_write_view_chunk(TraceReaderObject *self,
1✔
3186
                                              PyObject *args, PyObject *kwds) {
3187
    static const char *kwlist[] = {
3188
        "output_file", "checkpoint_idx", "start_byte", "end_byte",
3189
        "view",        "compression",    "batch_size", NULL};
3190
    const char *output_file = NULL;
1✔
3191
    unsigned long long checkpoint_idx = 0;
1✔
3192
    Py_ssize_t start_byte = 0;
1✔
3193
    Py_ssize_t end_byte = 0;
1✔
3194
    PyObject *view_obj = Py_None;
1✔
3195
    const char *compression_str = "zstd";
1✔
3196
    Py_ssize_t batch_size = 10000;
1✔
3197

3198
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "sKnn|Osn", (char **)kwlist,
1!
3199
                                     &output_file, &checkpoint_idx, &start_byte,
3200
                                     &end_byte, &view_obj, &compression_str,
3201
                                     &batch_size)) {
3202
        return NULL;
×
3203
    }
3204

3205
    IpcCompression compression = IpcCompression::ZSTD;
1✔
3206
    if (compression_str) {
1!
3207
        std::string comp_lower(compression_str);
1!
3208
        for (auto &c : comp_lower) c = std::tolower(c);
5✔
3209
        if (comp_lower == "none") {
1!
3210
            compression = IpcCompression::NONE;
×
3211
        } else if (comp_lower == "zstd") {
1!
3212
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
3213
            compression = IpcCompression::ZSTD;
1✔
3214
#else
3215
            PyErr_SetString(PyExc_ValueError, "ZSTD compression not available");
3216
            return NULL;
3217
#endif
3218
        }
3219
    }
1✔
3220

3221
    ViewDefinition view;
1✔
3222
    if (view_obj && view_obj != Py_None) {
1!
3223
        if (PyUnicode_Check(view_obj)) {
×
3224
            const char *name = PyUnicode_AsUTF8(view_obj);
×
3225
            if (!name) return NULL;
×
3226
            std::string name_str(name);
×
3227
            if (name_str == "io") {
×
3228
                view = ViewDefinition::io_view();
×
3229
            } else if (name_str == "compute") {
×
3230
                view = ViewDefinition::compute_view();
×
3231
            } else if (name_str == "dlio") {
×
3232
                view = ViewDefinition::dlio_view();
×
3233
            } else {
3234
                view.with_name(name_str);
×
3235
            }
3236
        } else if (PyDict_Check(view_obj)) {
×
3237
            PyObject *name_obj = PyDict_GetItemString(view_obj, "name");
×
3238
            if (name_obj && PyUnicode_Check(name_obj)) {
×
3239
                view.with_name(PyUnicode_AsUTF8(name_obj));
×
3240
            }
3241
            PyObject *query_obj = PyDict_GetItemString(view_obj, "query");
×
3242
            if (query_obj && query_obj != Py_None &&
×
3243
                PyUnicode_Check(query_obj)) {
×
3244
                view.with_query(PyUnicode_AsUTF8(query_obj));
×
3245
            }
3246
        }
3247
    }
3248

3249
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
1!
3250
    std::string index_path;
1✔
3251
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
1!
3252
    if (idx && idx[0] != '\0') {
1!
3253
        index_path = idx;
×
3254
    }
3255
    std::size_t checkpoint_size = self->checkpoint_size;
1✔
3256

3257
    WriteViewChunkResult result;
1✔
3258
    std::string error_msg;
1✔
3259

3260
    Py_BEGIN_ALLOW_THREADS try {
1!
3261
        Runtime *rt = get_runtime(self);
1!
3262
        result =
3263
            rt->submit(write_view_chunk_pipeline(
4!
3264
                           file_path, index_path, checkpoint_size, view,
3265
                           checkpoint_idx, static_cast<std::size_t>(start_byte),
3266
                           static_cast<std::size_t>(end_byte),
3267
                           std::string(output_file), compression,
2!
3268
                           static_cast<std::size_t>(batch_size)),
3269
                       "write_view_chunk")
3270
                .get();
1!
UNCOV
3271
    } catch (const std::exception &e) {
×
3272
        error_msg = e.what();
×
3273
    }
×
3274
    Py_END_ALLOW_THREADS
1!
3275

3276
        if (!error_msg.empty()) {
1!
3277
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
3278
        return NULL;
×
3279
    }
3280

3281
    if (!result.error.empty()) {
1!
3282
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
3283
        return NULL;
×
3284
    }
3285

3286
    PyObject *dict = PyDict_New();
1!
3287
    if (!dict) return NULL;
1!
3288

3289
    dict_set_steal(dict, "output_file",
1!
3290
                   PyUnicode_FromString(result.output_file.c_str()));
3291
    dict_set_steal(dict, "events_matched",
1!
3292
                   PyLong_FromUnsignedLongLong(result.events_matched));
1!
3293
    dict_set_steal(dict, "events_scanned",
1!
3294
                   PyLong_FromUnsignedLongLong(result.events_scanned));
1!
3295
    dict_set_steal(dict, "rows_written",
1!
3296
                   PyLong_FromLongLong(result.rows_written));
1!
3297
    dict_set_steal(dict, "bytes_written",
1!
3298
                   PyLong_FromLongLong(result.bytes_written));
1!
3299

3300
    return dict;
1✔
3301
}
1✔
3302

3303
static PyObject *TraceReader_write_view_chunks(TraceReaderObject *self,
1✔
3304
                                               PyObject *args, PyObject *kwds) {
3305
    static const char *kwlist[] = {"chunks",      "output_dir", "view",
3306
                                   "compression", "batch_size", NULL};
3307
    PyObject *chunks_list = NULL;
1✔
3308
    const char *output_dir = NULL;
1✔
3309
    PyObject *view_obj = Py_None;
1✔
3310
    const char *compression_str = "zstd";
1✔
3311
    Py_ssize_t batch_size = 10000;
1✔
3312

3313
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "Os|Osn", (char **)kwlist,
1!
3314
                                     &chunks_list, &output_dir, &view_obj,
3315
                                     &compression_str, &batch_size)) {
3316
        return NULL;
×
3317
    }
3318

3319
    if (!PyList_Check(chunks_list)) {
1!
3320
        PyErr_SetString(PyExc_TypeError, "chunks must be a list");
×
3321
        return NULL;
×
3322
    }
3323

3324
    IpcCompression compression = IpcCompression::ZSTD;
1✔
3325
    if (strcmp(compression_str, "none") == 0) {
1!
3326
        compression = IpcCompression::NONE;
×
3327
    } else if (strcmp(compression_str, "zstd") != 0) {
1!
3328
        PyErr_SetString(PyExc_ValueError,
×
3329
                        "compression must be 'zstd' or 'none'");
3330
        return NULL;
×
3331
    }
3332

3333
    ViewDefinition view;
1✔
3334
    if (view_obj && view_obj != Py_None) {
1!
3335
        if (PyUnicode_Check(view_obj)) {
×
3336
            const char *name = PyUnicode_AsUTF8(view_obj);
×
3337
            if (!name) return NULL;
×
3338
            std::string name_str(name);
×
3339
            if (name_str == "io") {
×
3340
                view = ViewDefinition::io_view();
×
3341
            } else if (name_str == "compute") {
×
3342
                view = ViewDefinition::compute_view();
×
3343
            } else if (name_str == "dlio") {
×
3344
                view = ViewDefinition::dlio_view();
×
3345
            } else {
3346
                view.with_name(name_str);
×
3347
            }
3348
        } else if (PyDict_Check(view_obj)) {
×
3349
            PyObject *name_obj = PyDict_GetItemString(view_obj, "name");
×
3350
            if (name_obj && PyUnicode_Check(name_obj)) {
×
3351
                view.with_name(PyUnicode_AsUTF8(name_obj));
×
3352
            }
3353
            PyObject *query_obj = PyDict_GetItemString(view_obj, "query");
×
3354
            if (query_obj && query_obj != Py_None &&
×
3355
                PyUnicode_Check(query_obj)) {
×
3356
                view.with_query(PyUnicode_AsUTF8(query_obj));
×
3357
            }
3358
        }
3359
    }
3360

3361
    std::vector<ChunkDescriptor> chunks;
1✔
3362
    Py_ssize_t num_chunks = PyList_Size(chunks_list);
1!
3363
    chunks.reserve(static_cast<std::size_t>(num_chunks));
1!
3364

3365
    for (Py_ssize_t i = 0; i < num_chunks; i++) {
4✔
3366
        PyObject *chunk_dict = PyList_GetItem(chunks_list, i);
3!
3367
        if (!PyDict_Check(chunk_dict)) {
3!
3368
            PyErr_SetString(PyExc_TypeError, "each chunk must be a dict");
×
3369
            return NULL;
×
3370
        }
3371

3372
        ChunkDescriptor desc;
3✔
3373

3374
        PyObject *cp_idx = PyDict_GetItemString(chunk_dict, "checkpoint_idx");
3!
3375
        PyObject *start = PyDict_GetItemString(chunk_dict, "start_byte");
3!
3376
        PyObject *end = PyDict_GetItemString(chunk_dict, "end_byte");
3!
3377

3378
        if (!cp_idx || !start || !end) {
3!
3379
            PyErr_SetString(
×
3380
                PyExc_KeyError,
3381
                "chunk must have checkpoint_idx, start_byte, end_byte");
3382
            return NULL;
×
3383
        }
3384

3385
        desc.checkpoint_idx =
3✔
3386
            static_cast<std::uint64_t>(PyLong_AsUnsignedLongLong(cp_idx));
3!
3387
        desc.start_byte =
3✔
3388
            static_cast<std::size_t>(PyLong_AsUnsignedLongLong(start));
3!
3389
        desc.end_byte =
3✔
3390
            static_cast<std::size_t>(PyLong_AsUnsignedLongLong(end));
3!
3391

3392
        char filename[64];
3393
        snprintf(filename, sizeof(filename), "chunk-%05llu.arrow",
3✔
3394
                 (unsigned long long)desc.checkpoint_idx);
3✔
3395
        desc.output_file = std::string(output_dir) + "/" + filename;
3!
3396

3397
        chunks.push_back(std::move(desc));
3!
3398
    }
3!
3399

3400
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
1!
3401
    std::string index_path;
1✔
3402
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
1!
3403
    if (idx && idx[0] != '\0') {
1!
3404
        index_path = idx;
×
3405
    }
3406
    std::size_t checkpoint_size = self->checkpoint_size;
1✔
3407

3408
    WriteViewChunksResult result;
1✔
3409
    std::string error_msg;
1✔
3410

3411
    Py_BEGIN_ALLOW_THREADS try {
1!
3412
        Runtime *rt = get_runtime(self);
1!
3413
        result = rt->submit(write_view_chunks_pipeline(
3!
3414
                                file_path, index_path, checkpoint_size, view,
3415
                                std::move(chunks), compression,
1✔
3416
                                static_cast<std::size_t>(batch_size)),
3417
                            "write_view_chunks")
3418
                     .get();
1!
UNCOV
3419
    } catch (const std::exception &e) {
×
3420
        error_msg = e.what();
×
3421
    }
×
3422
    Py_END_ALLOW_THREADS
1!
3423

3424
        if (!error_msg.empty()) {
1!
3425
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
3426
        return NULL;
×
3427
    }
3428

3429
    PyObject *dict = PyDict_New();
1!
3430
    if (!dict) return NULL;
1!
3431

3432
    PyObject *results_list =
3433
        PyList_New(static_cast<Py_ssize_t>(result.results.size()));
1!
3434
    if (!results_list) {
1!
3435
        Py_DECREF(dict);
3436
        return NULL;
×
3437
    }
3438

3439
    for (std::size_t i = 0; i < result.results.size(); i++) {
4✔
3440
        const auto &r = result.results[i];
3✔
3441
        PyObject *item = PyDict_New();
3!
3442
        if (!item) {
3!
3443
            Py_DECREF(results_list);
3444
            Py_DECREF(dict);
3445
            return NULL;
×
3446
        }
3447
        dict_set_steal(item, "output_file",
3!
3448
                       PyUnicode_FromString(r.output_file.c_str()));
3449
        dict_set_steal(item, "rows_written",
3!
3450
                       PyLong_FromLongLong(r.rows_written));
3!
3451
        dict_set_steal(item, "events_matched",
3!
3452
                       PyLong_FromUnsignedLongLong(r.events_matched));
3!
3453
        if (!r.error.empty()) {
3!
NEW
3454
            dict_set_steal(item, "error",
×
3455
                           PyUnicode_FromString(r.error.c_str()));
3456
        }
3457
        PyList_SetItem(results_list, static_cast<Py_ssize_t>(i), item);
3!
3458
    }
3459

3460
    PyDict_SetItemString(dict, "results", results_list);
1!
3461
    Py_DECREF(results_list);
3462
    dict_set_steal(dict, "total_rows", PyLong_FromLongLong(result.total_rows));
1!
3463
    dict_set_steal(dict, "total_events_matched",
1!
3464
                   PyLong_FromLongLong(result.total_events_matched));
1!
3465

3466
    return dict;
1✔
3467
}
1✔
3468

3469
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
3470

3471
static PyObject *TraceReader_enter(TraceReaderObject *self,
58!
3472
                                   PyObject *Py_UNUSED(ignored)) {
3473
    Py_INCREF(self);
3474
    return (PyObject *)self;
58✔
3475
}
3476

3477
static PyObject *TraceReader_exit(TraceReaderObject *self, PyObject *args) {
57✔
3478
    Py_RETURN_NONE;
57✔
3479
}
3480

3481
static PyObject *TraceReader_get_file_path(TraceReaderObject *self,
7✔
3482
                                           void *closure) {
3483
    Py_INCREF(self->file_path);
7!
3484
    return self->file_path;
7✔
3485
}
3486

3487
static PyObject *TraceReader_get_index_dir(TraceReaderObject *self,
3✔
3488
                                           void *closure) {
3489
    Py_INCREF(self->index_dir);
3✔
3490
    return self->index_dir;
3✔
3491
}
3492

3493
static PyObject *TraceReader_get_has_index(TraceReaderObject *self,
6✔
3494
                                           void *closure) {
3495
    return PyBool_FromLong(self->has_index);
6✔
3496
}
3497

3498
static PyObject *TraceReader_get_num_lines_prop(TraceReaderObject *self,
4✔
3499
                                                void *closure) {
3500
    try {
3501
        TraceReaderConfig cfg = build_config(self);
4!
3502
        TraceReader reader(std::move(cfg));
4!
3503
        std::size_t n = reader.get_num_lines();
4!
3504
        if (n > 0) return PyLong_FromSize_t(n);
4!
3505
    } catch (...) {
4!
3506
    }
×
3507
    PyObject *empty_args = PyTuple_New(0);
4✔
3508
    if (!empty_args) return NULL;
4!
3509
    PyObject *list = TraceReader_read_lines(self, empty_args, NULL);
4✔
3510
    Py_DECREF(empty_args);
3511
    if (!list) return NULL;
4!
3512
    Py_ssize_t n = PyList_GET_SIZE(list);
4✔
3513
    Py_DECREF(list);
3514
    return PyLong_FromSsize_t(n);
4✔
3515
}
3516

3517
static PyObject *TraceReader_get_max_bytes(TraceReaderObject *self,
12✔
3518
                                           PyObject *Py_UNUSED(ignored)) {
3519
    try {
3520
        TraceReaderConfig cfg = build_config(self);
12!
3521
        TraceReader reader(std::move(cfg));
12!
3522
        return PyLong_FromSize_t(reader.get_max_bytes());
12!
3523
    } catch (const std::exception &e) {
12!
3524
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
3525
        return NULL;
×
3526
    }
×
3527
}
3528

3529
static PyObject *TraceReader_get_num_lines(TraceReaderObject *self,
3✔
3530
                                           PyObject *Py_UNUSED(ignored)) {
3531
    try {
3532
        TraceReaderConfig cfg = build_config(self);
3!
3533
        TraceReader reader(std::move(cfg));
3!
3534
        return PyLong_FromSize_t(reader.get_num_lines());
3!
3535
    } catch (const std::exception &e) {
3!
3536
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
3537
        return NULL;
×
3538
    }
×
3539
}
3540

3541
static PyMethodDef TraceReader_methods[] = {
3542
    {"iter_lines", (PyCFunction)TraceReader_iter_lines,
3543
     METH_VARARGS | METH_KEYWORDS,
3544
     "Return an iterator over decoded lines.\n"
3545
     "\n"
3546
     "Args:\n"
3547
     "    start_line (int): First line (0 = beginning).\n"
3548
     "    end_line (int): Last line (0 = end of file).\n"
3549
     "    start_byte (int): First byte offset (0 = beginning).\n"
3550
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3551
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3552
    {"iter_raw", (PyCFunction)TraceReader_iter_raw,
3553
     METH_VARARGS | METH_KEYWORDS,
3554
     "Return an iterator over raw byte chunks.\n"
3555
     "\n"
3556
     "Args:\n"
3557
     "    start_line (int): First line (0 = beginning).\n"
3558
     "    end_line (int): Last line (0 = end of file).\n"
3559
     "    start_byte (int): First byte offset (0 = beginning).\n"
3560
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3561
     "    buffer_size (int): Internal read buffer size in bytes.\n"
3562
     "    line_aligned (bool): Align chunks to line boundaries.\n"
3563
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
3564
    {"read_lines", (PyCFunction)TraceReader_read_lines,
3565
     METH_VARARGS | METH_KEYWORDS,
3566
     "Read all lines and return as list.\n"
3567
     "\n"
3568
     "Args:\n"
3569
     "    start_line (int): First line (0 = beginning).\n"
3570
     "    end_line (int): Last line (0 = end of file).\n"
3571
     "    start_byte (int): First byte offset (0 = beginning).\n"
3572
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3573
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3574
    {"iter_json", (PyCFunction)TraceReader_iter_json,
3575
     METH_VARARGS | METH_KEYWORDS,
3576
     "Return an iterator over parsed JSON events as Python dicts.\n"
3577
     "\n"
3578
     "Each event is parsed once in C++ (single-pass simdjson ondemand)\n"
3579
     "and yielded as a Python dict. No double-parsing overhead.\n"
3580
     "\n"
3581
     "Args:\n"
3582
     "    start_line (int): First line (0 = beginning).\n"
3583
     "    end_line (int): Last line (0 = end of file).\n"
3584
     "    start_byte (int): First byte offset (0 = beginning).\n"
3585
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3586
     "    buffer_size (int): Internal read buffer size in bytes.\n"
3587
     "    query (str): Optional query filter.\n"
3588
     "    batch_size (int): Events per internal batch (default 1024).\n"},
3589
    {"read_json", (PyCFunction)TraceReader_read_json_py,
3590
     METH_VARARGS | METH_KEYWORDS,
3591
     "Read all events as parsed Python dicts (list).\n"
3592
     "\n"
3593
     "Equivalent to list(iter_json(...)).\n"},
3594
    {"read_raw", (PyCFunction)TraceReader_read_raw,
3595
     METH_VARARGS | METH_KEYWORDS,
3596
     "Read all raw chunks and return as list.\n"
3597
     "\n"
3598
     "Args:\n"
3599
     "    start_line (int): First line (0 = beginning).\n"
3600
     "    end_line (int): Last line (0 = end of file).\n"
3601
     "    start_byte (int): First byte offset (0 = beginning).\n"
3602
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3603
     "    buffer_size (int): Internal read buffer size in bytes.\n"
3604
     "    line_aligned (bool): Align chunks to line boundaries.\n"
3605
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
3606
#ifdef DFTRACER_UTILS_ENABLE_ARROW
3607
    {"iter_arrow", (PyCFunction)TraceReader_iter_arrow,
3608
     METH_VARARGS | METH_KEYWORDS,
3609
     "Return an iterator over Arrow record batches.\n"
3610
     "\n"
3611
     "Args:\n"
3612
     "    batch_size (int): Maximum rows per Arrow batch.\n"
3613
     "    start_line (int): First line (0 = beginning).\n"
3614
     "    end_line (int): Last line (0 = end of file).\n"
3615
     "    start_byte (int): First byte offset (0 = beginning).\n"
3616
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3617
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3618
    {"iter_arrow_stream", (PyCFunction)TraceReader_iter_arrow_stream,
3619
     METH_VARARGS | METH_KEYWORDS,
3620
     "Return an _ArrowBatchStream that exposes Arrow record batches\n"
3621
     "via the Arrow C Data Interface stream protocol\n"
3622
     "(__arrow_c_stream__). PyArrow can drain the producer channel\n"
3623
     "with a single call, without per-batch Python iteration.\n"},
3624
    {"read_arrow", (PyCFunction)TraceReader_read_arrow,
3625
     METH_VARARGS | METH_KEYWORDS,
3626
     "Read all events as a materialized ArrowTable.\n"
3627
     "\n"
3628
     "Args:\n"
3629
     "    batch_size (int): Maximum rows per Arrow batch.\n"
3630
     "    start_line (int): First line (0 = beginning).\n"
3631
     "    end_line (int): Last line (0 = end of file).\n"
3632
     "    start_byte (int): First byte offset (0 = beginning).\n"
3633
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3634
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3635
#endif
3636
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
3637
    {"write_arrow", (PyCFunction)TraceReader_write_arrow,
3638
     METH_VARARGS | METH_KEYWORDS,
3639
     "Write trace data to partitioned Arrow IPC files.\n"
3640
     "\n"
3641
     "Args:\n"
3642
     "    path (str): Output directory path.\n"
3643
     "    partition_by (list[str] or None): Column names to partition by.\n"
3644
     "    num_buckets (int): Number of hash buckets (0 = no bucketing).\n"
3645
     "    chunk_size_mb (int): Max uncompressed MB per file (default 32).\n"
3646
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
3647
     "    batch_size (int): Rows per internal batch (default 10000).\n"
3648
     "    normalize (bool): Use normalized schema (default False).\n"
3649
     "\n"
3650
     "Returns:\n"
3651
     "    dict: Statistics including partitions, total_rows, total_bytes.\n"},
3652
    {"get_view_chunks", (PyCFunction)TraceReader_get_view_chunks,
3653
     METH_VARARGS | METH_KEYWORDS,
3654
     "Get candidate chunks for a view after bloom filter pruning.\n"
3655
     "\n"
3656
     "Args:\n"
3657
     "    view (str or dict): View name ('io', 'compute', 'dlio') or\n"
3658
     "                        dict with 'name' and optional 'query'.\n"
3659
     "\n"
3660
     "Returns:\n"
3661
     "    dict: chunks list, total_checkpoints, skipped_checkpoints.\n"},
3662
    {"write_view_chunk", (PyCFunction)TraceReader_write_view_chunk,
3663
     METH_VARARGS | METH_KEYWORDS,
3664
     "Write a single chunk to an Arrow IPC file.\n"
3665
     "\n"
3666
     "Args:\n"
3667
     "    output_file (str): Path to output Arrow IPC file.\n"
3668
     "    checkpoint_idx (int): Checkpoint index.\n"
3669
     "    start_byte (int): Start byte offset.\n"
3670
     "    end_byte (int): End byte offset.\n"
3671
     "    view (str or dict): View definition.\n"
3672
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
3673
     "    batch_size (int): Events per batch (default 10000).\n"
3674
     "\n"
3675
     "Returns:\n"
3676
     "    dict: output_file, events_matched, rows_written, bytes_written.\n"},
3677
    {"write_view_chunks", (PyCFunction)TraceReader_write_view_chunks,
3678
     METH_VARARGS | METH_KEYWORDS,
3679
     "Write multiple chunks to Arrow IPC files in parallel.\n"
3680
     "\n"
3681
     "All chunks are processed concurrently on the Runtime thread pool.\n"
3682
     "\n"
3683
     "Args:\n"
3684
     "    chunks (list): List of dicts with checkpoint_idx, start_byte, "
3685
     "end_byte.\n"
3686
     "    output_dir (str): Directory for output Arrow IPC files.\n"
3687
     "    view (str or dict): View definition.\n"
3688
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
3689
     "    batch_size (int): Events per batch (default 10000).\n"
3690
     "\n"
3691
     "Returns:\n"
3692
     "    dict: results list, total_rows, total_events_matched.\n"},
3693
#endif
3694
    {"get_max_bytes", (PyCFunction)TraceReader_get_max_bytes, METH_NOARGS,
3695
     "Get the maximum byte position (0 if unknown for compressed\n"
3696
     "files without index)."},
3697
    {"get_num_lines", (PyCFunction)TraceReader_get_num_lines, METH_NOARGS,
3698
     "Get the total number of lines (0 if unknown for files without\n"
3699
     "index)."},
3700
    {"__enter__", (PyCFunction)TraceReader_enter, METH_NOARGS,
3701
     "Enter the runtime context for the with statement."},
3702
    {"__exit__", (PyCFunction)TraceReader_exit, METH_VARARGS,
3703
     "Exit the runtime context for the with statement.\n"
3704
     "\n"
3705
     "TraceReader does not own the shared RocksDB instance for an index path;\n"
3706
     "any shared DB lifetime remains manager-owned on the native side."},
3707
    {NULL}};
3708

3709
static PyGetSetDef TraceReader_getsetters[] = {
3710
    {"path", (getter)TraceReader_get_file_path, NULL,
3711
     "Path to the trace file or directory", NULL},
3712
    {"index_dir", (getter)TraceReader_get_index_dir, NULL,
3713
     "Directory for index files", NULL},
3714
    {"has_index", (getter)TraceReader_get_has_index, NULL,
3715
     "True if a checkpoint index was found", NULL},
3716
    {"num_lines", (getter)TraceReader_get_num_lines_prop, NULL,
3717
     "Total line count (reads all lines if needed)", NULL},
3718
    {NULL}};
3719

3720
PyTypeObject TraceReaderType = {
3721
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.TraceReader",
3722
    sizeof(TraceReaderObject),                /* tp_basicsize */
3723
    0,                                        /* tp_itemsize */
3724
    (destructor)TraceReader_dealloc,          /* tp_dealloc */
3725
    0,                                        /* tp_vectorcall_offset */
3726
    0,                                        /* tp_getattr */
3727
    0,                                        /* tp_setattr */
3728
    0,                                        /* tp_as_async */
3729
    0,                                        /* tp_repr */
3730
    0,                                        /* tp_as_number */
3731
    0,                                        /* tp_as_sequence */
3732
    0,                                        /* tp_as_mapping */
3733
    0,                                        /* tp_hash */
3734
    0,                                        /* tp_call */
3735
    0,                                        /* tp_str */
3736
    0,                                        /* tp_getattro */
3737
    0,                                        /* tp_setattro */
3738
    0,                                        /* tp_as_buffer */
3739
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
3740
    "TraceReader(file_path: str, index_dir: str = '',\n"
3741
    "            checkpoint_size: int = 33554432,\n"
3742
    "            auto_build_index: bool = False,\n"
3743
    "            runtime: Runtime | None = None)\n"
3744
    "--\n"
3745
    "\n"
3746
    "Smart trace file reader that auto-selects sequential or indexed\n"
3747
    "reading based on whether a ``.dftindex`` store exists.\n"
3748
    "\n"
3749
    "Args:\n"
3750
    "    file_path (str): Path to the trace file (.pfw.gz or plain "
3751
    "text).\n"
3752
    "    index_dir (str): Directory to search for ``.dftindex`` "
3753
    "stores.\n"
3754
    "        Empty string (default) searches next to the trace file.\n"
3755
    "    checkpoint_size (int): Checkpoint interval in bytes for index\n"
3756
    "        building (default 32 MB).\n"
3757
    "    auto_build_index (bool): If True, automatically build an "
3758
    "index\n"
3759
    "        when none exists.\n"
3760
    "    runtime (Runtime or None): Runtime instance for thread pool "
3761
    "control.\n"
3762
    "        If None, uses the default global Runtime.\n"
3763
    "\n"
3764
    "Raises:\n"
3765
    "    RuntimeError: If *file_path* does not exist or cannot be "
3766
    "opened.\n",                /* tp_doc */
3767
    0,                          /* tp_traverse */
3768
    0,                          /* tp_clear */
3769
    0,                          /* tp_richcompare */
3770
    0,                          /* tp_weaklistoffset */
3771
    0,                          /* tp_iter */
3772
    0,                          /* tp_iternext */
3773
    TraceReader_methods,        /* tp_methods */
3774
    0,                          /* tp_members */
3775
    TraceReader_getsetters,     /* tp_getset */
3776
    0,                          /* tp_base */
3777
    0,                          /* tp_dict */
3778
    0,                          /* tp_descr_get */
3779
    0,                          /* tp_descr_set */
3780
    0,                          /* tp_dictoffset */
3781
    (initproc)TraceReader_init, /* tp_init */
3782
    0,                          /* tp_alloc */
3783
    TraceReader_new,            /* tp_new */
3784
};
3785

3786
int init_trace_reader(PyObject *m) {
1✔
3787
    if (PyType_Ready(&TraceReaderType) < 0) return -1;
1!
3788

3789
    Py_INCREF(&TraceReaderType);
3790
    if (PyModule_AddObject(m, "TraceReader", (PyObject *)&TraceReaderType) <
1!
3791
        0) {
3792
        Py_DECREF(&TraceReaderType);
3793
        Py_DECREF(m);
3794
        return -1;
×
3795
    }
3796

3797
    return 0;
1✔
3798
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc