• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28423703495

30 Jun 2026 05:59AM UTC coverage: 51.998% (-0.3%) from 52.278%
28423703495

Pull #83

github

web-flow
Merge fb542a938 into 2efed6649
Pull Request #83: refactor and improve code QoL

37282 of 93303 branches covered (39.96%)

Branch coverage included in aggregate %.

801 of 1525 new or added lines in 78 files covered. (52.52%)

98 existing lines in 37 files now uncovered.

33674 of 43157 relevant lines covered (78.03%)

20306.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.53
/src/dftracer/utils/python/trace_reader.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <Python.h>
3
#include <dftracer/utils/core/common/config.h>
4
#include <dftracer/utils/core/common/constants.h>
5
#include <dftracer/utils/core/common/filesystem.h>
6
#include <dftracer/utils/core/common/memory_budget.h>
7
#include <dftracer/utils/core/coro/channel.h>
8
#include <dftracer/utils/core/coro/task.h>
9
#include <dftracer/utils/core/coro/when_all.h>
10
#include <dftracer/utils/core/tasks/coro_scope.h>
11
#include <dftracer/utils/core/utils/string.h>
12
#include <dftracer/utils/python/arrow_helpers.h>
13
#include <dftracer/utils/python/batch_byte_size.h>
14
#include <dftracer/utils/python/json.h>
15
#include <dftracer/utils/python/py_dict_helpers.h>
16
#include <dftracer/utils/python/py_type_helpers.h>
17
#include <dftracer/utils/python/runtime.h>
18
#include <dftracer/utils/python/trace_reader.h>
19
#include <dftracer/utils/python/trace_reader_iterator.h>
20
#include <dftracer/utils/utilities/common/query/query.h>
21
#include <dftracer/utils/utilities/composites/dft/indexing/chunk_pruner_utility.h>
22
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
23
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
24
#include <dftracer/utils/utilities/indexer/index_database.h>
25
#include <dftracer/utils/utilities/indexer/internal/helpers.h>
26
#include <dftracer/utils/utilities/reader/trace_reader.h>
27

28
#include <algorithm>
29
#include <cctype>
30
#include <cstddef>
31
#include <cstdio>
32
#include <cstring>
33
#include <exception>
34
#include <memory>
35
#include <string>
36
#include <unordered_map>
37
#include <vector>
38
#ifdef DFTRACER_UTILS_ENABLE_ARROW
39
#include <dftracer/utils/python/arrow_stream_capsule.h>
40
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
41
#include <dftracer/utils/utilities/common/json/parser.h>
42
#endif
43
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
44
#include <dftracer/utils/utilities/common/arrow/ipc_writer.h>
45
#include <dftracer/utils/utilities/common/arrow/partition_writer.h>
46
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
47
#include <dftracer/utils/utilities/composites/dft/metadata_collector_utility.h>
48
#include <dftracer/utils/utilities/composites/dft/views/view_builder_utility.h>
49
#include <dftracer/utils/utilities/composites/dft/views/view_definition.h>
50
#include <dftracer/utils/utilities/composites/dft/views/view_reader_utility.h>
51
#endif
52

53
namespace {
54

55
using dftracer::utils::CoroScope;
56
using dftracer::utils::Runtime;
57
using dftracer::utils::coro::CoroTask;
58
using dftracer::utils::coro::when_all;
59
using dftracer::utils::utilities::filesystem::PatternDirectoryScannerUtility;
60
using dftracer::utils::utilities::filesystem::
61
    PatternDirectoryScannerUtilityInput;
62
using dftracer::utils::utilities::reader::ReadConfig;
63
using dftracer::utils::utilities::reader::TraceReader;
64
using dftracer::utils::utilities::reader::TraceReaderConfig;
65
#ifdef DFTRACER_UTILS_ENABLE_ARROW
66
using dftracer::utils::utilities::common::arrow::ColumnType;
67
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
68
using dftracer::utils::utilities::common::json::JsonParser;
69
using dftracer::utils::utilities::common::json::JsonValueHelper;
70
#endif
71
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
72
using dftracer::utils::utilities::common::arrow::IpcCompression;
73
using dftracer::utils::utilities::common::arrow::PartitionWriter;
74
using dftracer::utils::utilities::common::arrow::PartitionWriteStats;
75
using dftracer::utils::utilities::composites::dft::MetadataCollectorUtility;
76
using dftracer::utils::utilities::composites::dft::
77
    MetadataCollectorUtilityInput;
78
using dftracer::utils::utilities::composites::dft::views::ViewBuilderInput;
79
using dftracer::utils::utilities::composites::dft::views::ViewBuilderUtility;
80
using dftracer::utils::utilities::composites::dft::views::ViewDefinition;
81
using dftracer::utils::utilities::composites::dft::views::ViewReaderInput;
82
using dftracer::utils::utilities::composites::dft::views::ViewReaderUtility;
83
#endif
84

85
using dftracer::utils::python::MemoryViewBatchData;
86
using dftracer::utils::python::MemoryViewBatchIteratorState;
87

88
CoroTask<void> produce_lines_batched(
4,536!
89
    std::shared_ptr<MemoryViewBatchIteratorState> state,
90
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer,
91
    TraceReaderConfig cfg, ReadConfig rc, std::size_t batch_size) {
56!
92
    auto guard = producer.guard();
56!
93
    try {
94
        TraceReader reader(std::move(cfg));
56!
95
        auto gen = reader.read_lines(rc);
56!
96
        MemoryViewBatchData batch;
56✔
97
        std::size_t count = 0;
56✔
98

99
        while (auto opt = co_await gen.next()) {
4,062!
100
            if (state->cancelled.load(std::memory_order_acquire)) break;
946!
101
            auto sv = opt->content;
946✔
102
            Py_ssize_t offset = static_cast<Py_ssize_t>(batch.buffer.size());
946✔
103
            batch.buffer.insert(batch.buffer.end(), sv.begin(), sv.end());
946!
104
            batch.offsets.push_back(offset);
946!
105
            batch.lengths.push_back(static_cast<Py_ssize_t>(sv.size()));
946!
106
            ++count;
946✔
107

108
            if (count >= batch_size) {
946!
109
                auto batch_bytes = dftracer::utils::python::byte_size(batch);
×
110
                state->bytes_in_queue.fetch_add(batch_bytes,
111
                                                std::memory_order_acq_rel);
112
                if (!co_await producer.send(std::move(batch))) break;
×
113
                batch = MemoryViewBatchData{};
114
                count = 0;
115
            }
×
116
        }
1,002!
117
        if (count > 0 && !state->cancelled.load(std::memory_order_acquire)) {
153✔
118
            auto batch_bytes = dftracer::utils::python::byte_size(batch);
156!
119
            state->bytes_in_queue.fetch_add(batch_bytes,
156✔
120
                                            std::memory_order_acq_rel);
121
            co_await producer.send(std::move(batch));
208!
122
        }
52!
123
    } catch (...) {
2,358✔
124
        state->set_error(std::current_exception());
1!
125
    }
1!
126
}
6,481!
127

128
CoroTask<void> produce_raw_batched(
100!
129
    std::shared_ptr<MemoryViewBatchIteratorState> state,
130
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer,
131
    TraceReaderConfig cfg, ReadConfig rc) {
10!
132
    auto guard = producer.guard();
10!
133
    try {
134
        TraceReader reader(std::move(cfg));
10!
135
        auto gen = reader.read_raw(rc);
10!
136
        while (auto opt = co_await gen.next()) {
1,030!
137
            if (state->cancelled.load(std::memory_order_acquire)) break;
735!
138
            MemoryViewBatchData batch;
735✔
139
            batch.buffer.assign(opt->data(), opt->data() + opt->size());
735!
140
            batch.offsets.push_back(0);
735!
141
            batch.lengths.push_back(static_cast<Py_ssize_t>(opt->size()));
735!
142
            auto batch_bytes = dftracer::utils::python::byte_size(batch);
735✔
143
            state->bytes_in_queue.fetch_add(batch_bytes,
735✔
144
                                            std::memory_order_acq_rel);
145
            if (!co_await producer.send(std::move(batch))) break;
980!
146
        }
745!
147
    } catch (...) {
30✔
148
        state->set_error(std::current_exception());
×
149
    }
×
150
}
2,050!
151

152
using dftracer::utils::utilities::common::json::JsonParser;
153
using dftracer::utils::utilities::common::json::JsonValueHelper;
154

155
static constexpr std::size_t ESTIMATED_BYTES_PER_LINE = 256;
156
static constexpr std::size_t ESTIMATED_BYTES_PER_RAW_CHUNK = 4 * 1024 * 1024;
157
static constexpr std::size_t ESTIMATED_BYTES_PER_JSON_EVENT = 512;
158
static constexpr std::size_t ESTIMATED_BYTES_PER_ARROW_ROW = 1024;
159

160
static void insert_simdjson_value(ArgsMap &map, std::string_view key,
4,303✔
161
                                  simdjson::ondemand::value val) {
162
    auto type = val.type();
4,303✔
163
    if (type.error()) return;
4,319!
164
    switch (type.value_unsafe()) {
4,318!
165
        case simdjson::ondemand::json_type::string: {
957✔
166
            auto r = val.get_string();
1,919✔
167
            if (!r.error()) map.insert(key, std::string(r.value_unsafe()));
1,921!
168
            break;
1,922✔
169
        }
170
        case simdjson::ondemand::json_type::number: {
1,197✔
171
            auto ri = val.get_int64();
2,396✔
172
            if (!ri.error()) {
2,394✔
173
                auto v = ri.value_unsafe();
2,394✔
174
                if (v >= 0)
2,388!
175
                    map.insert(key, static_cast<std::uint64_t>(v));
2,389!
176
                else
177
                    map.insert(key, v);
×
178
            } else {
1,199✔
179
                auto rd = val.get_double();
×
180
                if (!rd.error()) map.insert(key, rd.value_unsafe());
×
181
            }
182
            break;
2,397✔
183
        }
184
        case simdjson::ondemand::json_type::boolean: {
185
            auto r = val.get_bool();
×
186
            if (!r.error()) map.insert(key, r.value_unsafe());
×
187
            break;
×
188
        }
189
        default:
190
            break;
×
191
    }
192
}
2,157✔
193

194
static void parse_json_to_event(JsonParser &parser, JsonDictEvent &ev) {
474✔
195
    ev.top.set_valid(true);
474✔
196
    parser.for_each_field(
713!
197
        [&](std::string_view key, simdjson::ondemand::value val) {
4,057✔
198
            if (key == "args") {
3,819✔
199
                auto obj = val.get_object();
480✔
200
                if (!obj.error()) {
478!
201
                    ev.args.set_valid(true);
479✔
202
                    for (auto field : obj.value_unsafe()) {
1,432✔
203
                        if (field.error()) continue;
957!
204
                        auto fkey = field.unescaped_key();
957✔
205
                        if (fkey.error()) continue;
957!
206
                        auto fval = field.value();
955✔
207
                        if (fval.error()) continue;
957!
208
                        insert_simdjson_value(ev.args, fkey.value_unsafe(),
1,433!
209
                                              fval.value_unsafe());
957✔
210
                    }
211
                }
240✔
212
            } else {
240✔
213
                insert_simdjson_value(ev.top, key, val);
3,338✔
214
            }
215
        });
3,826✔
216
}
477✔
217

218
CoroTask<void> produce_json_dicts(
134!
219
    std::shared_ptr<JsonDictIteratorState> state,
220
    dftracer::utils::coro::ChannelProducer<JsonDictBatch> producer,
221
    TraceReaderConfig cfg, ReadConfig rc, std::size_t batch_size) {
1!
222
    auto guard = producer.guard();
1!
223
    try {
224
        TraceReader reader(std::move(cfg));
1!
225
        auto gen = reader.read_json(rc);
1!
226
        JsonDictBatch batch;
1✔
227
        batch.events.reserve(batch_size);
1!
228

229
        while (auto opt = co_await gen.next()) {
125!
230
            if (state->cancelled.load(std::memory_order_acquire)) break;
30!
231

232
            JsonDictEvent ev;
30!
233
            parse_json_to_event(*opt->parser, ev);
30!
234
            batch.events.push_back(std::move(ev));
30!
235

236
            if (batch.events.size() >= batch_size) {
30!
237
                auto batch_bytes = dftracer::utils::python::byte_size(batch);
×
238
                state->bytes_in_queue.fetch_add(batch_bytes,
239
                                                std::memory_order_acq_rel);
240
                if (!co_await producer.send(std::move(batch))) break;
×
241
                batch = JsonDictBatch{};
242
                batch.events.reserve(batch_size);
×
243
            }
×
244
        }
31!
245
        if (!batch.events.empty() &&
3✔
246
            !state->cancelled.load(std::memory_order_acquire)) {
1✔
247
            auto batch_bytes = dftracer::utils::python::byte_size(batch);
3!
248
            state->bytes_in_queue.fetch_add(batch_bytes,
3✔
249
                                            std::memory_order_acq_rel);
250
            co_await producer.send(std::move(batch));
4!
251
        }
1!
252
    } catch (...) {
69✔
253
        state->set_error(std::current_exception());
×
254
    }
×
255
}
195!
256

257
static CoroTask<void> send_files_to_channel(
144!
258
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
259
    const std::vector<std::string> *files, std::atomic<bool> *cancelled) {
8!
260
    for (const auto &fp : *files) {
128✔
261
        if (cancelled->load(std::memory_order_acquire)) break;
72!
262
        if (!co_await file_chan->send(fp)) break;
104!
263
    }
24✔
264
    file_chan->close();
8!
265
    co_return;
8✔
266
}
112!
267

268
static CoroTask<void> json_dict_file_worker(
966!
269
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
270
    dftracer::utils::coro::Channel<JsonDictBatch> *out_chan,
271
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
272
    ReadConfig rc, std::size_t batch_size, std::atomic<bool> *cancelled) {
9!
273
    dftracer::utils::coro::ChannelProducer<JsonDictBatch> producer(out_chan);
9!
274
    auto guard = producer.guard();
9!
275

276
    while (auto file_path = co_await file_chan->receive()) {
56!
277
        if (cancelled->load(std::memory_order_acquire)) co_return;
14!
278
        TraceReaderConfig cfg;
14✔
279
        cfg.file_path = std::move(*file_path);
14✔
280
        cfg.index_dir = index_dir;
14!
281
        cfg.checkpoint_size = checkpoint_size;
14✔
282
        cfg.auto_build_index = auto_build_index;
14✔
283

284
        TraceReader reader(std::move(cfg));
14!
285
        auto gen = reader.read_json(rc);
14!
286
        JsonDictBatch batch;
14✔
287
        batch.events.reserve(batch_size);
14!
288

289
        while (auto opt = co_await gen.next()) {
894!
290
            if (cancelled->load(std::memory_order_acquire)) co_return;
212!
291
            JsonDictEvent ev;
212!
292
            parse_json_to_event(*opt->parser, ev);
212✔
293
            batch.events.push_back(std::move(ev));
210✔
294
            if (batch.events.size() >= batch_size) {
208!
295
                if (!co_await producer.send(std::move(batch))) co_return;
×
296
                batch = JsonDictBatch{};
297
                batch.events.reserve(batch_size);
×
298
            }
299
        }
224✔
300
        if (!batch.events.empty()) {
42!
301
            if (!co_await producer.send(std::move(batch))) co_return;
56!
302
        }
14✔
303
    }
495✔
304
    co_return;
9✔
305
}
1,467!
306

307
static CoroTask<void> spawn_json_dict_producers(
30!
308
    CoroScope &child, dftracer::utils::coro::Channel<JsonDictBatch> *out_chan,
309
    const std::vector<std::string> *files, const std::string *index_dir,
310
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
311
    std::size_t batch_size, std::atomic<bool> *cancelled_ptr,
312
    std::size_t max_workers) {
5!
313
    std::size_t num_workers = std::min(files->size(), max_workers);
5!
314
    auto file_chan =
5✔
315
        dftracer::utils::coro::make_channel<std::string>(num_workers);
5!
316

317
    for (std::size_t i = 0; i < num_workers; ++i) {
14✔
318
        child.spawn([out_chan, fc = file_chan, idx = *index_dir,
27!
319
                     checkpoint_size, auto_build_index, r = *rc, batch_size,
27!
320
                     cancelled_ptr](CoroScope &) {
36✔
321
            return json_dict_file_worker(fc, out_chan, idx, checkpoint_size,
27!
322
                                         auto_build_index, r, batch_size,
18!
323
                                         cancelled_ptr);
18!
324
        });
325
    }
9✔
326

327
    child.spawn([fc = file_chan, files, cancelled_ptr](CoroScope &) {
15!
328
        return send_files_to_channel(fc, files, cancelled_ptr);
10!
329
    });
330
    co_return;
10✔
331
}
15!
332

333
static CoroTask<void> produce_json_dicts_parallel(
68!
334
    CoroScope &scope, JsonDictIteratorState *sp, std::string dir_path,
335
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
336
    ReadConfig rc, std::size_t batch_size, std::size_t max_workers) {
6!
337
    try {
338
        PatternDirectoryScannerUtility scanner;
18!
339
        auto scan_input = PatternDirectoryScannerUtilityInput(
36!
340
            dir_path, {".pfw", ".pfw.gz"}, true, false);
18!
341
        auto entries = co_await scope.spawn(scanner, scan_input);
30!
342

343
        std::vector<std::string> files;
16✔
344
        files.reserve(entries.size());
16✔
345
        for (auto &e : entries) files.push_back(e.path.string());
20!
346
        std::sort(files.begin(), files.end());
6✔
347

348
        if (files.empty()) {
16✔
349
            sp->channel->close();
1!
350
            co_return;
1✔
351
        }
352

353
        auto *chan_ptr = sp->channel.get();
15✔
354
        auto *cancelled_ptr = &sp->cancelled;
15✔
355

356
        co_await scope.scope([chan_ptr, &files, &index_dir, checkpoint_size,
120!
357
                              auto_build_index, &rc, batch_size, cancelled_ptr,
45✔
358
                              max_workers](CoroScope &child) -> CoroTask<void> {
20!
359
            co_await spawn_json_dict_producers(
40!
360
                child, chan_ptr, &files, &index_dir, checkpoint_size,
15✔
361
                auto_build_index, &rc, batch_size, cancelled_ptr, max_workers);
15✔
362
        });
20!
363
    } catch (...) {
16✔
364
        sp->set_error(std::current_exception());
×
365
    }
×
366
}
78!
367

368
static CoroTask<void> lines_file_worker(
544!
369
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
370
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
371
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
372
    ReadConfig rc, std::size_t batch_size, std::atomic<bool> *cancelled) {
4!
373
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer(
8!
374
        out_chan);
4✔
375
    auto guard = producer.guard();
4!
376

377
    while (auto file_path = co_await file_chan->receive()) {
24!
378
        if (cancelled->load(std::memory_order_acquire)) co_return;
7!
379
        TraceReaderConfig cfg;
7✔
380
        cfg.file_path = std::move(*file_path);
7✔
381
        cfg.index_dir = index_dir;
7!
382
        cfg.checkpoint_size = checkpoint_size;
7✔
383
        cfg.auto_build_index = auto_build_index;
7✔
384

385
        TraceReader reader(std::move(cfg));
7!
386
        auto gen = reader.read_lines(rc);
7!
387
        MemoryViewBatchData batch;
7✔
388
        std::size_t count = 0;
7✔
389

390
        while (auto opt = co_await gen.next()) {
502!
391
            if (cancelled->load(std::memory_order_acquire)) co_return;
118!
392
            auto sv = opt->content;
118✔
393
            Py_ssize_t offset = static_cast<Py_ssize_t>(batch.buffer.size());
118✔
394
            batch.buffer.insert(batch.buffer.end(), sv.begin(), sv.end());
118!
395
            batch.offsets.push_back(offset);
118!
396
            batch.lengths.push_back(static_cast<Py_ssize_t>(sv.size()));
118!
397
            ++count;
118✔
398
            if (count >= batch_size) {
118!
399
                if (!co_await producer.send(std::move(batch))) co_return;
×
400
                batch = MemoryViewBatchData{};
401
                count = 0;
402
            }
403
        }
125!
404
        if (count > 0) {
21!
405
            if (!co_await producer.send(std::move(batch))) co_return;
28!
406
        }
7✔
407
    }
277✔
408
    co_return;
4✔
409
}
813!
410

411
static CoroTask<void> spawn_lines_producers(
12!
412
    CoroScope &child,
413
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
414
    const std::vector<std::string> *files, const std::string *index_dir,
415
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
416
    std::size_t batch_size, std::atomic<bool> *cancelled_ptr,
417
    std::size_t max_workers) {
2!
418
    std::size_t num_workers = std::min(files->size(), max_workers);
2!
419
    auto file_chan =
2✔
420
        dftracer::utils::coro::make_channel<std::string>(num_workers);
2!
421

422
    for (std::size_t i = 0; i < num_workers; ++i) {
6✔
423
        child.spawn([out_chan, fc = file_chan, idx = *index_dir,
11!
424
                     checkpoint_size, auto_build_index, r = *rc, batch_size,
12!
425
                     cancelled_ptr](CoroScope &) {
15✔
426
            return lines_file_worker(fc, out_chan, idx, checkpoint_size,
12!
427
                                     auto_build_index, r, batch_size,
7!
428
                                     cancelled_ptr);
7!
429
        });
430
    }
4✔
431

432
    child.spawn([fc = file_chan, files, cancelled_ptr](CoroScope &) {
6!
433
        return send_files_to_channel(fc, files, cancelled_ptr);
4!
434
    });
435
    co_return;
4✔
436
}
6!
437

438
static CoroTask<void> produce_lines_parallel(
32!
439
    CoroScope &scope, MemoryViewBatchIteratorState *sp, std::string dir_path,
440
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
441
    ReadConfig rc, std::size_t batch_size, std::size_t max_workers) {
3!
442
    try {
443
        PatternDirectoryScannerUtility scanner;
9!
444
        auto scan_input = PatternDirectoryScannerUtilityInput(
18!
445
            dir_path, {".pfw", ".pfw.gz"}, true, false);
9!
446
        auto entries = co_await scope.spawn(scanner, scan_input);
15!
447

448
        std::vector<std::string> files;
7✔
449
        files.reserve(entries.size());
7✔
450
        for (auto &e : entries) files.push_back(e.path.string());
10!
451
        std::sort(files.begin(), files.end());
3✔
452

453
        if (files.empty()) {
7✔
454
            sp->channel->close();
1!
455
            co_return;
1✔
456
        }
457

458
        auto *chan_ptr = sp->channel.get();
6✔
459
        auto *cancelled_ptr = &sp->cancelled;
6✔
460

461
        co_await scope.scope([chan_ptr, &files, &index_dir, checkpoint_size,
48!
462
                              auto_build_index, &rc, batch_size, cancelled_ptr,
18✔
463
                              max_workers](CoroScope &child) -> CoroTask<void> {
8!
464
            co_await spawn_lines_producers(
16!
465
                child, chan_ptr, &files, &index_dir, checkpoint_size,
6✔
466
                auto_build_index, &rc, batch_size, cancelled_ptr, max_workers);
6✔
467
        });
8!
468
    } catch (...) {
7✔
469
        sp->set_error(std::current_exception());
×
470
    }
×
471
}
36!
472

473
static CoroTask<void> raw_file_worker(
16!
474
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
475
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
476
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
477
    ReadConfig rc, std::atomic<bool> *cancelled) {
2!
478
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer(
4!
479
        out_chan);
2✔
480
    auto guard = producer.guard();
2!
481

482
    while (auto file_path = co_await file_chan->receive()) {
13!
483
        if (cancelled->load(std::memory_order_acquire)) co_return;
3!
484
        TraceReaderConfig cfg;
3✔
485
        cfg.file_path = std::move(*file_path);
3✔
486
        cfg.index_dir = index_dir;
3!
487
        cfg.checkpoint_size = checkpoint_size;
3✔
488
        cfg.auto_build_index = auto_build_index;
3✔
489

490
        TraceReader reader(std::move(cfg));
3!
491
        auto gen = reader.read_raw(rc);
3!
492
        while (auto opt = co_await gen.next()) {
252!
493
            if (cancelled->load(std::memory_order_acquire)) co_return;
180!
494
            MemoryViewBatchData batch;
180✔
495
            batch.buffer.assign(opt->data(), opt->data() + opt->size());
180!
496
            batch.offsets.push_back(0);
180!
497
            batch.lengths.push_back(static_cast<Py_ssize_t>(opt->size()));
180!
498
            if (!co_await producer.send(std::move(batch))) co_return;
240!
499
        }
183!
500
    }
11✔
501
    co_return;
2✔
502
}
508!
503

504
static CoroTask<void> spawn_raw_producers(
6!
505
    CoroScope &child,
506
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
507
    const std::vector<std::string> *files, const std::string *index_dir,
508
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
509
    std::atomic<bool> *cancelled_ptr, std::size_t max_workers) {
1!
510
    std::size_t num_workers = std::min(files->size(), max_workers);
1!
511
    auto file_chan =
1✔
512
        dftracer::utils::coro::make_channel<std::string>(num_workers);
1!
513

514
    for (std::size_t i = 0; i < num_workers; ++i) {
3✔
515
        child.spawn([out_chan, fc = file_chan, idx = *index_dir,
6!
516
                     checkpoint_size, auto_build_index, r = *rc,
6!
517
                     cancelled_ptr](CoroScope &) {
8✔
518
            return raw_file_worker(fc, out_chan, idx, checkpoint_size,
6!
519
                                   auto_build_index, r, cancelled_ptr);
4!
520
        });
521
    }
2✔
522

523
    child.spawn([fc = file_chan, files, cancelled_ptr](CoroScope &) {
3!
524
        return send_files_to_channel(fc, files, cancelled_ptr);
2!
525
    });
526
    co_return;
2✔
527
}
3!
528

529
static CoroTask<void> produce_raw_parallel(
20!
530
    CoroScope &scope, MemoryViewBatchIteratorState *sp, std::string dir_path,
531
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
532
    ReadConfig rc, std::size_t max_workers) {
2!
533
    try {
534
        PatternDirectoryScannerUtility scanner;
6!
535
        auto scan_input = PatternDirectoryScannerUtilityInput(
12!
536
            dir_path, {".pfw", ".pfw.gz"}, true, false);
6!
537
        auto entries = co_await scope.spawn(scanner, scan_input);
10!
538

539
        std::vector<std::string> files;
4✔
540
        files.reserve(entries.size());
4✔
541
        for (auto &e : entries) files.push_back(e.path.string());
5!
542
        std::sort(files.begin(), files.end());
2✔
543

544
        if (files.empty()) {
4✔
545
            sp->channel->close();
1!
546
            co_return;
1✔
547
        }
548

549
        auto *chan_ptr = sp->channel.get();
3✔
550
        auto *cancelled_ptr = &sp->cancelled;
3✔
551

552
        co_await scope.scope([chan_ptr, &files, &index_dir, checkpoint_size,
21!
553
                              auto_build_index, &rc, cancelled_ptr,
6✔
554
                              max_workers](CoroScope &child) -> CoroTask<void> {
4!
555
            co_await spawn_raw_producers(child, chan_ptr, &files, &index_dir,
8!
556
                                         checkpoint_size, auto_build_index, &rc,
3✔
557
                                         cancelled_ptr, max_workers);
3✔
558
        });
4!
559
    } catch (...) {
4✔
560
        sp->set_error(std::current_exception());
×
561
    }
×
562
}
22!
563

564
#ifdef DFTRACER_UTILS_ENABLE_ARROW
565

566
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
567
using dftracer::utils::utilities::common::arrow::ColumnType;
568
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
569

570
// Bump arena for string_views that must survive until builder.finish().
571
struct StringArena {
572
    static constexpr std::size_t BLOCK_SIZE = 64 * 1024;
573
    std::vector<std::vector<char>> blocks;
574
    std::size_t pos = 0;
575

576
    StringArena() { blocks.emplace_back(BLOCK_SIZE); }
×
577

578
    std::string_view push(const char *data, std::size_t len) {
×
579
        if (pos + len > blocks.back().size()) {
×
580
            blocks.emplace_back(std::max(BLOCK_SIZE, len));
×
581
            pos = 0;
×
582
        }
583
        char *dst = blocks.back().data() + pos;
×
584
        std::memcpy(dst, data, len);
×
585
        pos += len;
×
586
        return {dst, len};
×
587
    }
588

589
    void clear() {
×
590
        if (blocks.size() > 1) blocks.resize(1);
×
591
        pos = 0;
×
592
    }
×
593
};
594

595
// --- Row type constants (must match Python TYPE_* constants) ---
596
enum RowType : int8_t {
597
    ROW_EVENT = 0,
598
    ROW_FILE_HASH = 1,
599
    ROW_HOST_HASH = 2,
600
    ROW_STRING_HASH = 3,
601
    ROW_METADATA = 4,
602
    ROW_PROC_METADATA = 5,
603
    ROW_PROFILE = 6,
604
    ROW_SYSTEM = 7,
605
};
606

607
// --- IO category constants (must match Python IOCategory values) ---
608
enum IOCat : int8_t {
609
    IO_READ = 1,
610
    IO_WRITE = 2,
611
    IO_METADATA = 3,
612
    IO_PCTL = 4,
613
    IO_IPC = 5,
614
    IO_OTHER = 6,
615
    IO_SYNC = 7,
616
};
617

618
static int8_t get_io_cat(std::string_view func) {
×
619
    using namespace dftracer::utils::utilities::composites::dft::internal;
620
    for (auto op : posix_ops::READ)
×
621
        if (op == func) return IO_READ;
×
622
    for (auto op : posix_ops::WRITE)
×
623
        if (op == func) return IO_WRITE;
×
624
    for (auto op : posix_ops::SYNC)
×
625
        if (op == func) return IO_SYNC;
×
626
    for (auto op : posix_ops::PCTL)
×
627
        if (op == func) return IO_PCTL;
×
628
    for (auto op : posix_ops::IPC)
×
629
        if (op == func) return IO_IPC;
×
630
    for (auto op : posix_ops::METADATA)
×
631
        if (op == func) return IO_METADATA;
×
632
    return IO_OTHER;
×
633
}
634

635
static bool str_iequal(std::string_view a, const char *b) {
×
636
    std::size_t len = std::strlen(b);
×
637
    if (a.size() != len) return false;
×
638
    for (std::size_t i = 0; i < len; ++i) {
×
639
        if (std::tolower(static_cast<unsigned char>(a[i])) !=
×
640
            static_cast<unsigned char>(b[i]))
×
641
            return false;
×
642
    }
643
    return true;
×
644
}
645

646
static bool str_contains_lower(std::string_view s, const char *needle) {
×
647
    std::size_t nlen = std::strlen(needle);
×
648
    if (s.size() < nlen) return false;
×
649
    for (std::size_t i = 0; i <= s.size() - nlen; ++i) {
×
650
        bool match = true;
×
651
        for (std::size_t j = 0; j < nlen; ++j) {
×
652
            if (std::tolower(static_cast<unsigned char>(s[i + j])) !=
×
653
                static_cast<unsigned char>(needle[j])) {
×
654
                match = false;
×
655
                break;
×
656
            }
657
        }
658
        if (match) return true;
×
659
    }
660
    return false;
×
661
}
662

663
// Fields extracted from a row's "args" object in a single pass.
664
struct ParsedArgs {
665
    std::optional<std::string_view> name, value, hhash, fhash;
666
    std::optional<std::int64_t> epoch, step, size_sum, ret;
667
    std::optional<std::int64_t> offset, image_idx, image_size;
668
    // Other int/float args, kept for profile/sys columns.
669
    std::unordered_map<std::string, std::int64_t> int_map;
670
    std::unordered_map<std::string, double> float_map;
671
};
672

NEW
673
static ParsedArgs parse_row_args(JsonParser &parser) {
×
674
    using SVH = JsonValueHelper;
NEW
675
    ParsedArgs a;
×
676
    parser.rewind();
×
677
    parser.for_each_field(
×
678
        "args", [&](std::string_view key, simdjson::ondemand::value val) {
×
679
            if (key == "name") {
×
NEW
680
                if (auto s = SVH::get_string(val)) a.name = s;
×
681
            } else if (key == "value") {
×
NEW
682
                if (auto s = SVH::get_string(val)) a.value = s;
×
683
            } else if (key == "hhash") {
×
NEW
684
                if (auto s = SVH::get_string(val)) a.hhash = s;
×
685
            } else if (key == "fhash") {
×
NEW
686
                if (auto s = SVH::get_string(val)) a.fhash = s;
×
687
            } else if (key == "epoch") {
×
NEW
688
                if (auto i = SVH::get_int64(val)) a.epoch = i;
×
689
            } else if (key == "step") {
×
NEW
690
                if (auto i = SVH::get_int64(val)) a.step = i;
×
691
            } else if (key == "size_sum") {
×
NEW
692
                if (auto i = SVH::get_int64(val)) a.size_sum = i;
×
693
            } else if (key == "ret") {
×
NEW
694
                if (auto i = SVH::get_int64(val)) a.ret = i;
×
695
            } else if (key == "offset") {
×
NEW
696
                if (auto i = SVH::get_int64(val)) a.offset = i;
×
697
            } else if (key == "image_idx") {
×
NEW
698
                if (auto i = SVH::get_int64(val)) a.image_idx = i;
×
699
            } else if (key == "image_size") {
×
NEW
700
                if (auto i = SVH::get_int64(val)) a.image_size = i;
×
701
            } else {
UNCOV
702
                if (auto i = SVH::get_int64(val)) {
×
NEW
703
                    a.int_map[std::string(key)] = *i;
×
704
                } else if (auto d = SVH::get_double(val)) {
×
NEW
705
                    a.float_map[std::string(key)] = *d;
×
706
                }
707
            }
708
        });
×
NEW
709
    return a;
×
NEW
710
}
×
711

NEW
712
static void append_profile_columns(
×
713
    RecordBatchBuilder &builder,
714
    const std::unordered_map<std::string, std::int64_t> &int_map) {
715
    static const char *profile_keys[] = {
716
        "count",      "count_max",  "count_min",  "count_sum",  "dft_cnt",
717
        "dur",        "dur_max",    "dur_min",    "dur_sum",    "epoch",
718
        "flags",      "offset",     "offset_max", "offset_min", "offset_sum",
719
        "ret",        "ret_max",    "ret_min",    "ret_sum",    "whence",
720
        "whence_max", "whence_min", "whence_sum", nullptr};
NEW
721
    for (const char **pk = profile_keys; *pk; ++pk) {
×
NEW
722
        auto it = int_map.find(*pk);
×
NEW
723
        if (it != int_map.end()) {
×
NEW
724
            auto idx = builder.add_or_get_column(*pk, ColumnType::INT64);
×
NEW
725
            builder.append_int64(idx, it->second);
×
726
        }
727
    }
NEW
728
}
×
729

NEW
730
static void append_system_columns(
×
731
    RecordBatchBuilder &builder,
732
    const std::unordered_map<std::string, double> &float_map) {
733
    static const char *sys_keys[] = {
734
        "user_pct", "system_pct",  "iowait_pct",   "idle_pct",
735
        "irq_pct",  "softirq_pct", "MemAvailable", "MemFree",
736
        "Cached",   "Dirty",       "Active",       nullptr};
NEW
737
    for (const char **sk = sys_keys; *sk; ++sk) {
×
NEW
738
        auto it = float_map.find(*sk);
×
NEW
739
        if (it != float_map.end()) {
×
NEW
740
            auto idx = builder.add_or_get_column(*sk, ColumnType::DOUBLE);
×
NEW
741
            builder.append_double(idx, it->second);
×
742
        }
743
    }
NEW
744
}
×
745

746
// Normalize a raw JSON row (parsed with simdjson) into the semantic
747
// output schema.  Appends one row to `builder` with the full set of output
748
// columns.  Returns false if the row should be skipped (no valid name).
NEW
749
static bool normalize_row(RecordBatchBuilder &builder, StringArena &arena,
×
750
                          JsonParser &parser) {
751
    // --- Extract top-level fields ---
NEW
752
    auto ph = parser.get_string("ph").value_or(std::string_view{});
×
NEW
753
    auto name_sv = parser.get_string("name").value_or(std::string_view{});
×
NEW
754
    auto cat_sv = parser.get_string("cat").value_or(std::string_view{});
×
NEW
755
    auto pid_opt = parser.get_int64("pid");
×
NEW
756
    auto tid_opt = parser.get_int64("tid");
×
NEW
757
    auto ts_opt = parser.get_int64("ts");
×
NEW
758
    auto dur_opt = parser.get_int64("dur");
×
759

NEW
760
    ParsedArgs args = parse_row_args(parser);
×
761

762
    // --- Type classification ---
763
    bool is_M = (ph == "M");
×
764
    bool is_C = (ph == "C");
×
765
    bool is_event = !is_M && !is_C;
×
766

767
    int8_t row_type = ROW_EVENT;
×
768
    if (is_M) {
×
769
        if (name_sv == "FH")
×
770
            row_type = ROW_FILE_HASH;
×
771
        else if (name_sv == "HH")
×
772
            row_type = ROW_HOST_HASH;
×
773
        else if (name_sv == "SH")
×
774
            row_type = ROW_STRING_HASH;
×
775
        else if (name_sv == "PR")
×
776
            row_type = ROW_PROC_METADATA;
×
777
        else
778
            row_type = ROW_METADATA;
×
779
    } else if (is_C) {
×
780
        row_type = str_iequal(cat_sv, "sys") ? ROW_SYSTEM : ROW_PROFILE;
×
781
    }
782
    bool is_hash = (row_type >= ROW_FILE_HASH && row_type <= ROW_STRING_HASH) ||
×
783
                   row_type == ROW_PROC_METADATA;
784
    bool is_profile = (row_type == ROW_PROFILE);
×
785
    bool is_sys = (row_type == ROW_SYSTEM);
×
786

787
    // Name: metadata rows use args.name if available
788
    std::string_view out_name = name_sv;
×
NEW
789
    if (is_M && args.name && !args.name->empty()) {
×
NEW
790
        out_name = *args.name;
×
791
    }
792
    if (out_name.empty()) return false;  // skip rows without name
×
793

794
    // --- Declare all output columns ---
795
    auto ci_type = builder.add_or_get_column("type", ColumnType::INT64);
×
796
    auto ci_cat = builder.add_or_get_column("cat", ColumnType::STRING);
×
797
    auto ci_name = builder.add_or_get_column("name", ColumnType::STRING);
×
798
    auto ci_pid = builder.add_or_get_column("pid", ColumnType::INT64);
×
799
    auto ci_tid = builder.add_or_get_column("tid", ColumnType::INT64);
×
800
    auto ci_hash = builder.add_or_get_column("hash", ColumnType::STRING);
×
801
    auto ci_value = builder.add_or_get_column("value", ColumnType::STRING);
×
802
    auto ci_host_hash =
803
        builder.add_or_get_column("host_hash", ColumnType::STRING);
×
804
    auto ci_file_hash =
805
        builder.add_or_get_column("file_hash", ColumnType::STRING);
×
806
    auto ci_epoch = builder.add_or_get_column("epoch", ColumnType::INT64);
×
807
    auto ci_step = builder.add_or_get_column("step", ColumnType::INT64);
×
808
    auto ci_ts = builder.add_or_get_column("ts", ColumnType::INT64);
×
809
    auto ci_dur = builder.add_or_get_column("dur", ColumnType::INT64);
×
810
    auto ci_te = builder.add_or_get_column("te", ColumnType::INT64);
×
811
    [[maybe_unused]] auto ci_trange =
812
        builder.add_or_get_column("trange", ColumnType::INT64);
×
813
    auto ci_io_cat = builder.add_or_get_column("io_cat", ColumnType::INT64);
×
814
    auto ci_size = builder.add_or_get_column("size", ColumnType::INT64);
×
815
    auto ci_offset = builder.add_or_get_column("offset", ColumnType::INT64);
×
816
    auto ci_image_id = builder.add_or_get_column("image_id", ColumnType::INT64);
×
817

818
    // --- Populate core columns ---
819
    builder.append_int64(ci_type, row_type);
×
820

821
    // cat (lowercased) - write into arena
822
    if (!cat_sv.empty()) {
×
823
        char lbuf[256];
824
        std::size_t clen = std::min(cat_sv.size(), sizeof(lbuf));
×
825
        for (std::size_t i = 0; i < clen; ++i)
×
826
            lbuf[i] = static_cast<char>(
×
827
                std::tolower(static_cast<unsigned char>(cat_sv[i])));
×
828
        builder.append_string(ci_cat, arena.push(lbuf, clen));
×
829
    } else {
830
        builder.append_null(ci_cat);
×
831
    }
832

833
    builder.append_string(ci_name, out_name);
×
834

835
    if (pid_opt) builder.append_int64(ci_pid, *pid_opt);
×
836
    if (tid_opt) builder.append_int64(ci_tid, *tid_opt);
×
837

838
    // hash / value
NEW
839
    if (is_hash && args.value && !args.value->empty())
×
NEW
840
        builder.append_string(ci_hash, *args.value);
×
NEW
841
    if (row_type == ROW_METADATA && args.value && !args.value->empty())
×
NEW
842
        builder.append_string(ci_value, *args.value);
×
843

844
    // host_hash / file_hash
NEW
845
    if (args.hhash && !args.hhash->empty())
×
NEW
846
        builder.append_string(ci_host_hash, *args.hhash);
×
NEW
847
    if (args.fhash && !args.fhash->empty())
×
NEW
848
        builder.append_string(ci_file_hash, *args.fhash);
×
849

850
    // epoch / step
NEW
851
    if (args.epoch && *args.epoch >= 0)
×
NEW
852
        builder.append_int64(ci_epoch, *args.epoch);
×
NEW
853
    if (args.step && *args.step >= 0) builder.append_int64(ci_step, *args.step);
×
854

855
    // --- Temporal ---
856
    bool has_ts = (is_event || is_C) && ts_opt.has_value();
×
857
    bool has_dur = dur_opt.has_value();
×
858
    int64_t ts_val = 0, dur_val = 0;
×
859
    if (has_ts) {
×
860
        ts_val = *ts_opt;
×
861
        builder.append_int64(ci_ts, ts_val);
×
862
    }
863
    if (is_event && has_ts && has_dur) {
×
864
        dur_val = *dur_opt;
×
865
        builder.append_int64(ci_dur, dur_val);
×
866
        builder.append_int64(ci_te, ts_val + dur_val);
×
867
    }
868

869
    // --- IO columns (events only) ---
870
    if (is_event) {
×
871
        bool is_posix_stdio =
872
            str_iequal(cat_sv, "posix") || str_iequal(cat_sv, "stdio");
×
873
        int8_t io_cat = IO_OTHER;
×
874

875
        // size priority: size_sum > POSIX ret > image_size
NEW
876
        if (args.size_sum) {
×
NEW
877
            builder.append_int64(ci_size, *args.size_sum);
×
878
            if (is_posix_stdio) io_cat = get_io_cat(out_name);
×
879
        } else if (is_posix_stdio) {
×
880
            io_cat = get_io_cat(out_name);
×
NEW
881
            if (args.ret && *args.ret > 0 &&
×
882
                (io_cat == IO_READ || io_cat == IO_WRITE))
×
NEW
883
                builder.append_int64(ci_size, *args.ret);
×
NEW
884
            if (args.offset && *args.offset >= 0)
×
NEW
885
                builder.append_int64(ci_offset, *args.offset);
×
886
        } else {
NEW
887
            if (args.image_idx && *args.image_idx > 0)
×
NEW
888
                builder.append_int64(ci_image_id, *args.image_idx);
×
NEW
889
            if (args.image_size && *args.image_size > 0 &&
×
890
                !str_contains_lower(out_name, "open"))
×
NEW
891
                builder.append_int64(ci_size, *args.image_size);
×
892
        }
893
        builder.append_int64(ci_io_cat, io_cat);
×
894
    }
895

896
    // --- Profile columns ---
897
    if (is_profile) {
×
898
        bool is_posix_stdio =
899
            str_iequal(cat_sv, "posix") || str_iequal(cat_sv, "stdio");
×
900
        int8_t io_cat = is_posix_stdio ? get_io_cat(out_name) : IO_OTHER;
×
901
        builder.append_int64(ci_io_cat, io_cat);
×
NEW
902
        append_profile_columns(builder, args.int_map);
×
903
    }
904

905
    // --- System columns ---
906
    if (is_sys) {
×
NEW
907
        append_system_columns(builder, args.float_map);
×
908
    }
909

910
    builder.end_row();
×
911
    return true;
×
912
}
×
913

914
// Flatten a simdjson object into "prefix.key" columns using native types.
915
// On type mismatch (same key, different type across rows), appends null.
916
static void flatten_object_into(RecordBatchBuilder &builder, StringArena &arena,
917
                                std::string_view prefix,
918
                                simdjson::ondemand::object obj) {
919
    using SVH = JsonValueHelper;
920
    char key_buf[512];
921

922
    for (auto field : obj) {
×
923
        if (field.error()) continue;
×
924

925
        auto key_result = field.unescaped_key();
926
        if (key_result.error()) continue;
×
927
        std::string_view sk = key_result.value_unsafe();
928

929
        auto val_result = field.value();
930
        if (val_result.error()) continue;
×
931
        auto sub_val = val_result.value_unsafe();
932

933
        std::size_t needed = prefix.size() + 1 + sk.size();
934
        if (needed >= sizeof(key_buf)) continue;
×
935
        std::memcpy(key_buf, prefix.data(), prefix.size());
936
        key_buf[prefix.size()] = '.';
937
        std::memcpy(key_buf + prefix.size() + 1, sk.data(), sk.size());
938
        std::string_view full_key(key_buf, needed);
939

940
        auto type_result = sub_val.type();
941
        if (type_result.error()) continue;
×
942
        auto json_type = type_result.value_unsafe();
943

944
        switch (json_type) {
×
945
            case simdjson::ondemand::json_type::number: {
946
                auto num_result = sub_val.get_number();
947
                if (num_result.error()) break;
×
948
                auto num = num_result.value_unsafe();
949
                if (num.is_int64()) {
×
950
                    auto idx =
951
                        builder.add_or_get_column(full_key, ColumnType::INT64);
×
952
                    if (builder.column_type(idx) == ColumnType::INT64)
×
953
                        builder.append_int64(idx, num.get_int64());
×
954
                    else
955
                        builder.append_null(idx);
×
956
                } else if (num.is_uint64()) {
×
957
                    auto idx =
958
                        builder.add_or_get_column(full_key, ColumnType::UINT64);
×
959
                    if (builder.column_type(idx) == ColumnType::UINT64)
×
960
                        builder.append_uint64(idx, num.get_uint64());
×
961
                    else
962
                        builder.append_null(idx);
×
963
                } else {
964
                    auto idx =
965
                        builder.add_or_get_column(full_key, ColumnType::DOUBLE);
×
966
                    if (builder.column_type(idx) == ColumnType::DOUBLE)
×
967
                        builder.append_double(idx, num.get_double());
×
968
                    else
969
                        builder.append_null(idx);
×
970
                }
971
                break;
972
            }
973
            case simdjson::ondemand::json_type::string: {
974
                auto str_result = sub_val.get_string();
975
                if (str_result.error()) break;
×
976
                auto str = str_result.value_unsafe();
977
                auto idx =
978
                    builder.add_or_get_column(full_key, ColumnType::STRING);
×
979
                if (builder.column_type(idx) == ColumnType::STRING)
×
980
                    builder.append_string(idx, str);
×
981
                else
982
                    builder.append_null(idx);
×
983
                break;
984
            }
985
            case simdjson::ondemand::json_type::boolean: {
986
                auto bool_result = sub_val.get_bool();
987
                if (bool_result.error()) break;
×
988
                auto b = bool_result.value_unsafe();
989
                auto idx =
990
                    builder.add_or_get_column(full_key, ColumnType::BOOL);
×
991
                if (builder.column_type(idx) == ColumnType::BOOL)
×
992
                    builder.append_bool(idx, b);
×
993
                else
994
                    builder.append_null(idx);
×
995
                break;
996
            }
997
            case simdjson::ondemand::json_type::null: {
998
                auto existing = builder.find_column(full_key);
×
999
                if (existing) builder.append_null(*existing);
×
1000
                break;
1001
            }
1002
            case simdjson::ondemand::json_type::object:
1003
            case simdjson::ondemand::json_type::array: {
1004
                // Serialize nested object/array to JSON string
1005
                auto json_str = SVH::to_json_string(sub_val);
×
1006
                auto idx =
1007
                    builder.add_or_get_column(full_key, ColumnType::STRING);
×
1008
                if (json_str) {
×
1009
                    builder.append_string(
×
1010
                        idx, arena.push(json_str->data(), json_str->size()));
1011
                } else {
1012
                    builder.append_null(idx);
×
1013
                }
1014
                break;
1015
            }
1016
            default:
1017
                break;
1018
        }
1019
    }
1020
}
1021

1022
static bool build_arrow_row(RecordBatchBuilder &builder, JsonParser &parser,
×
1023
                            StringArena &arena, bool normalize) {
1024
    if (normalize) return normalize_row(builder, arena, parser);
×
1025

1026
    using SVH = JsonValueHelper;
1027
    parser.for_each_field([&](std::string_view key_sv,
×
1028
                              simdjson::ondemand::value val) {
1029
        auto type_result = val.type();
×
1030
        if (type_result.error()) return;
×
1031
        auto json_type = type_result.value_unsafe();
×
1032
        switch (json_type) {
×
1033
            case simdjson::ondemand::json_type::number: {
1034
                auto num_result = val.get_number();
×
1035
                if (num_result.error()) break;
×
1036
                auto num = num_result.value_unsafe();
×
1037
                if (num.is_int64()) {
×
1038
                    std::size_t idx =
1039
                        builder.add_or_get_column(key_sv, ColumnType::INT64);
×
1040
                    builder.append_int64(idx, num.get_int64());
×
1041
                } else if (num.is_uint64()) {
×
1042
                    std::size_t idx =
1043
                        builder.add_or_get_column(key_sv, ColumnType::UINT64);
×
1044
                    builder.append_uint64(idx, num.get_uint64());
×
1045
                } else {
1046
                    std::size_t idx =
1047
                        builder.add_or_get_column(key_sv, ColumnType::DOUBLE);
×
1048
                    builder.append_double(idx, num.get_double());
×
1049
                }
1050
                break;
×
1051
            }
1052
            case simdjson::ondemand::json_type::string: {
1053
                auto str_result = val.get_string();
×
1054
                if (str_result.error()) break;
×
1055
                auto str = str_result.value_unsafe();
×
1056
                std::size_t idx =
1057
                    builder.add_or_get_column(key_sv, ColumnType::STRING);
×
1058
                builder.append_string(idx, str);
×
1059
                break;
×
1060
            }
1061
            case simdjson::ondemand::json_type::boolean: {
1062
                auto bool_result = val.get_bool();
×
1063
                if (bool_result.error()) break;
×
1064
                auto b = bool_result.value_unsafe();
×
1065
                std::size_t idx =
1066
                    builder.add_or_get_column(key_sv, ColumnType::BOOL);
×
1067
                builder.append_bool(idx, b);
×
1068
                break;
×
1069
            }
1070
            case simdjson::ondemand::json_type::null: {
1071
                auto existing = builder.find_column(key_sv);
×
1072
                if (existing) builder.append_null(*existing);
×
1073
                break;
×
1074
            }
1075
            case simdjson::ondemand::json_type::object:
1076
            case simdjson::ondemand::json_type::array: {
1077
                auto json_str = SVH::to_json_string(val);
×
1078
                std::size_t idx =
1079
                    builder.add_or_get_column(key_sv, ColumnType::STRING);
×
1080
                if (json_str) {
×
1081
                    builder.append_string(
×
1082
                        idx, arena.push(json_str->data(), json_str->size()));
×
1083
                } else {
1084
                    builder.append_null(idx);
×
1085
                }
1086
                break;
1087
            }
×
1088
            default:
1089
                break;
×
1090
        }
1091
    });
1092
    builder.end_row();
×
1093
    return true;
×
1094
}
1095

1096
static bool process_json_line(RecordBatchBuilder &builder, JsonParser &parser,
1097
                              StringArena &arena, std::string_view content,
1098
                              bool normalize) {
1099
    const char *trimmed;
1100
    std::size_t trimmed_length;
1101
    if (!dftracer::utils::json_trim_and_validate_with_comma(
×
1102
            content.data(), content.size(), trimmed, trimmed_length))
1103
        return false;
1104
    if (!parser.parse(std::string_view(trimmed, trimmed_length))) return false;
×
1105
    return build_arrow_row(builder, parser, arena, normalize);
×
1106
}
1107

1108
static CoroTask<void> produce_arrow_for_file(
×
1109
    dftracer::utils::coro::Channel<ArrowExportResult> *chan,
1110
    std::string file_path, std::string index_dir, std::size_t checkpoint_size,
1111
    bool auto_build_index, ReadConfig rc, std::size_t batch_size,
1112
    bool normalize, std::atomic<bool> *cancelled) {
1113
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer(chan);
1114
    auto guard = producer.guard();
1115

1116
    TraceReaderConfig cfg;
1117
    cfg.file_path = std::move(file_path);
1118
    cfg.index_dir = std::move(index_dir);
1119
    cfg.checkpoint_size = checkpoint_size;
1120
    cfg.auto_build_index = auto_build_index;
1121

1122
    TraceReader reader(std::move(cfg));
1123

1124
    // Fast path: non-normalized Arrow build happens inside TraceReader.
1125
    // Normalize still goes through read_json + build_arrow_row for the
1126
    // richer schema derivation.
1127
    if (!normalize) {
1128
        auto batch_gen = reader.read_arrow(rc, batch_size);
1129
        while (auto batch_opt = co_await batch_gen.next()) {
1130
            if (cancelled->load(std::memory_order_acquire)) co_return;
1131
            if (!co_await producer.send(std::move(*batch_opt))) co_return;
1132
        }
1133
        co_return;
1134
    }
1135

1136
    auto gen = reader.read_json(rc);
1137
    RecordBatchBuilder builder;
1138
    builder.reserve(batch_size);
1139
    StringArena arena;
1140

1141
    while (auto opt = co_await gen.next()) {
1142
        if (cancelled->load(std::memory_order_acquire)) co_return;
1143
        if (!build_arrow_row(builder, *opt->parser, arena, normalize)) continue;
1144
        if (builder.num_rows() >= batch_size) {
1145
            auto result = builder.finish();
1146
            arena.clear();
1147
            if (!co_await producer.send(std::move(result))) co_return;
1148
            if (!builder.is_schema_locked()) builder.lock_schema();
1149
            builder.reset(true);
1150
            builder.reserve(batch_size);
1151
        }
1152
    }
1153
    if (builder.num_rows() > 0) {
1154
        co_await producer.send(builder.finish());
1155
    }
1156
    co_return;
1157
}
×
1158

1159
static CoroTask<void> file_worker(
×
1160
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
1161
    dftracer::utils::coro::Channel<ArrowExportResult> *out_chan,
1162
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1163
    ReadConfig rc, std::size_t batch_size, bool normalize,
1164
    std::atomic<bool> *cancelled) {
1165
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer(
1166
        out_chan);
1167
    auto guard = producer.guard();
1168

1169
    while (auto file_path = co_await file_chan->receive()) {
1170
        if (cancelled->load(std::memory_order_acquire)) co_return;
1171
        TraceReaderConfig cfg;
1172
        cfg.file_path = std::move(*file_path);
1173
        cfg.index_dir = index_dir;
1174
        cfg.checkpoint_size = checkpoint_size;
1175
        cfg.auto_build_index = auto_build_index;
1176

1177
        TraceReader reader(std::move(cfg));
1178

1179
        if (!normalize) {
1180
            auto batch_gen = reader.read_arrow(rc, batch_size);
1181
            while (auto batch_opt = co_await batch_gen.next()) {
1182
                if (cancelled->load(std::memory_order_acquire)) co_return;
1183
                if (!co_await producer.send(std::move(*batch_opt))) co_return;
1184
            }
1185
            continue;
1186
        }
1187

1188
        auto gen = reader.read_json(rc);
1189
        RecordBatchBuilder builder;
1190
        builder.reserve(batch_size);
1191
        StringArena arena;
1192

1193
        while (auto opt = co_await gen.next()) {
1194
            if (cancelled->load(std::memory_order_acquire)) co_return;
1195
            if (!build_arrow_row(builder, *opt->parser, arena, normalize))
1196
                continue;
1197
            if (builder.num_rows() >= batch_size) {
1198
                auto result = builder.finish();
1199
                arena.clear();
1200
                if (!co_await producer.send(std::move(result))) co_return;
1201
                if (!builder.is_schema_locked()) builder.lock_schema();
1202
                builder.reset(true);
1203
                builder.reserve(batch_size);
1204
            }
1205
        }
1206
        if (builder.num_rows() > 0) {
1207
            if (!co_await producer.send(builder.finish())) co_return;
1208
        }
1209
    }
1210
    co_return;
1211
}
×
1212

1213
// Extract AND-of-EQ leaves from a Query AST. Returns nullopt if the predicate
1214
// shape is anything else (NE, range ops, IN, NOT, OR), in which case the
1215
// uniform-match shortcut does not apply.
1216
static std::optional<std::vector<std::pair<std::string, std::string>>>
1217
extract_eq_leaves(
×
1218
    const dftracer::utils::utilities::common::query::QueryNode &node) {
1219
    namespace q_ns = dftracer::utils::utilities::common::query;
1220
    using LeafVec = std::vector<std::pair<std::string, std::string>>;
1221

1222
    auto literal_to_string = [](const q_ns::LiteralNode &lit) -> std::string {
×
1223
        return std::visit(
1224
            [](auto &&v) -> std::string {
×
1225
                using T = std::decay_t<decltype(v)>;
1226
                if constexpr (std::is_same_v<T, std::string>)
1227
                    return v;
×
1228
                else if constexpr (std::is_same_v<T, bool>)
1229
                    return v ? "true" : "false";
×
1230
                else if constexpr (std::is_same_v<T, int64_t>)
1231
                    return std::to_string(v);
×
1232
                else if constexpr (std::is_same_v<T, uint64_t>)
1233
                    return std::to_string(v);
×
1234
                else if constexpr (std::is_same_v<T, double>)
1235
                    return std::to_string(v);
×
1236
                else
1237
                    return {};
1238
            },
1239
            lit.value);
×
1240
    };
1241

1242
    return std::visit(
1243
        [&](const auto &n) -> std::optional<LeafVec> {
×
1244
            using T = std::decay_t<decltype(n)>;
1245
            if constexpr (std::is_same_v<T, q_ns::CompareNode>) {
1246
                if (n.op != q_ns::CompareOp::EQ) return std::nullopt;
×
1247
                return LeafVec{{n.field.path, literal_to_string(n.value)}};
×
1248
            } else if constexpr (std::is_same_v<T, q_ns::AndNode>) {
1249
                auto l = extract_eq_leaves(*n.left);
×
1250
                if (!l) return std::nullopt;
×
1251
                auto r = extract_eq_leaves(*n.right);
×
1252
                if (!r) return std::nullopt;
×
1253
                l->insert(l->end(), r->begin(), r->end());
×
1254
                return l;
×
1255
            } else {
×
1256
                return std::nullopt;
×
1257
            }
1258
        },
1259
        node.data);
×
1260
}
1261

1262
// True iff every checkpoint in `chunk_idxs` has dim_stats min == max == literal
1263
// for every leaf. Empty leaves -> false (no shortcut). Missing dim_stats for
1264
// any (chunk, leaf) -> false (we don't know, play safe).
1265
static bool all_chunks_uniform_match(
×
1266
    const dftracer::utils::utilities::indexer::IndexDatabase &db, int fid,
1267
    const std::vector<std::pair<std::string, std::string>> &leaves,
1268
    const std::vector<std::uint64_t> &chunk_idxs) {
1269
    if (leaves.empty() || chunk_idxs.empty()) return false;
×
1270
    namespace indexing = dftracer::utils::utilities::composites::dft::indexing;
1271

1272
    for (const auto &[dim, val] : leaves) {
×
1273
        auto rows = db.query_chunk_dimension_stats_for_dimension(fid, dim);
×
1274
        if (rows.empty()) return false;
×
1275
        std::unordered_map<std::uint64_t,
1276
                           const indexing::ChunkDimensionStatsResult *>
1277
            by_ckpt;
×
1278
        by_ckpt.reserve(rows.size());
×
1279
        for (const auto &r : rows) by_ckpt.emplace(r.checkpoint_idx, &r);
×
1280
        for (auto cidx : chunk_idxs) {
×
1281
            auto it = by_ckpt.find(cidx);
×
1282
            if (it == by_ckpt.end()) return false;
×
1283
            const auto &ds = *it->second;
×
1284
            if (ds.min_value != val || ds.max_value != val) return false;
×
1285
        }
1286
    }
×
1287
    return true;
×
1288
}
1289

1290
// Byte-range work unit for checkpoint-level parallelism. Each unit covers
1291
// one or more consecutive checkpoints from a single file. Decompression of
1292
// a single gz file is sequential per gzip stream, so splitting at
1293
// checkpoint-aligned byte offsets is what lets multiple workers share the
1294
// decode work for one file.
1295
struct ArrowWorkItem {
221✔
1296
    std::string file_path;
1297
    std::size_t start_byte = 0;
221✔
1298
    std::size_t end_byte = 0;
221✔
1299
    bool start_at_checkpoint = false;
221✔
1300
    bool end_at_checkpoint = false;
221✔
1301
    // When true, every kept chunk for this byte range is uniform-matching
1302
    // (dim_stats min == max == predicate literal for every AND-of-EQ leaf),
1303
    // so per-event predicate eval is skippable.
1304
    bool chunk_prune_only = false;
221✔
1305
    // Line-range work items override byte ranges: the worker passes these
1306
    // down as LINE_RANGE on the read, and the gzip stream resolves them to
1307
    // byte offsets via the checkpoint index. 0 = no line constraint.
1308
    std::size_t start_line = 0;
221✔
1309
    std::size_t end_line = 0;
221✔
1310
};
1311

1312
// User-supplied byte/line clip applied to every emitted work item.
1313
struct ClipRange {
1314
    std::size_t start_byte = 0, end_byte = 0;
1315
    std::size_t start_line = 0, end_line = 0;
1316
    bool has_line_clip() const { return start_line > 0 || end_line > 0; }
20!
1317
    bool has_byte_clip() const { return end_byte > start_byte; }
8✔
1318
};
1319

1320
// Geometry of the pruner chunks for one file, derived from its gzip recovery
1321
// points. Pruner chunk_idx is 0-indexed over uncompressed slices: recovery
1322
// point ckpts[k] sits at the START of chunk (k+1); chunk 0 has no recovery
1323
// point at its start (decoded from gzip stream start). Total chunks =
1324
// ckpts.size()+1.
1325
struct ChunkGeometry {
1326
    const std::vector<dftracer::utils::utilities::indexer::IndexerCheckpoint>
1327
        &ckpts;
1328

1329
    std::size_t total_chunks() const { return ckpts.size() + 1; }
8✔
1330
    std::size_t start_byte(std::uint64_t cidx) const {
12✔
1331
        if (cidx == 0) return 0;
12✔
1332
        return ckpts[cidx - 1].uc_offset;
8✔
1333
    }
6✔
1334
    std::size_t end_byte(std::uint64_t cidx) const {
12✔
1335
        if (cidx == 0) return ckpts.empty() ? 0 : ckpts[0].uc_offset;
12!
1336
        std::size_t k = cidx - 1;
8✔
1337
        return ckpts[k].uc_offset + ckpts[k].uc_size;
8✔
1338
    }
6✔
1339
    // Chunk 0 covers everything before the first recovery point; chunk k>=1
1340
    // spans recovery point (k-1).
1341
    std::size_t first_line(std::uint64_t cidx) const {
12✔
1342
        if (cidx == 0) return 1;
12✔
1343
        return ckpts[cidx - 1].first_line_num;
8✔
1344
    }
6✔
1345
    std::size_t last_line(std::uint64_t cidx) const {
12✔
1346
        if (cidx == 0) {
12✔
1347
            if (ckpts.empty()) return SIZE_MAX;
4!
1348
            return ckpts[0].first_line_num > 0 ? ckpts[0].first_line_num - 1
4!
1349
                                               : 0;
2✔
1350
        }
1351
        return ckpts[cidx - 1].last_line_num;
8✔
1352
    }
6✔
1353
};
1354

1355
// Select the chunk indices to scan for one file. Returns empty when the file
1356
// is fully pruned or no chunk overlaps the line clip (caller skips the file).
1357
static std::vector<std::uint64_t> select_kept_chunks(
8✔
1358
    const ChunkGeometry &geo, bool has_query,
1359
    const dftracer::utils::utilities::composites::dft::indexing::
1360
        ChunkPrunerOutput &pr,
1361
    const ClipRange &clip) {
1362
    const std::size_t total_chunks = geo.total_chunks();
8✔
1363
    std::vector<std::uint64_t> keep_chunks;
8✔
1364
    keep_chunks.reserve(total_chunks);
8!
1365
    if (has_query) {
8!
NEW
1366
        if (pr.success && !pr.file_may_match) {
×
NEW
1367
            return keep_chunks;  // whole file pruned
×
1368
        }
NEW
1369
        if (pr.success && !pr.candidate_checkpoints.empty() &&
×
NEW
1370
            pr.candidate_checkpoints.size() < pr.total_checkpoints) {
×
NEW
1371
            for (auto cidx : pr.candidate_checkpoints) {
×
NEW
1372
                if (cidx < total_chunks) keep_chunks.push_back(cidx);
×
1373
            }
NEW
1374
            std::sort(keep_chunks.begin(), keep_chunks.end());
×
NEW
1375
            keep_chunks.erase(
×
NEW
1376
                std::unique(keep_chunks.begin(), keep_chunks.end()),
×
NEW
1377
                keep_chunks.end());
×
1378
        } else {
NEW
1379
            for (std::uint64_t c = 0; c < total_chunks; ++c)
×
NEW
1380
                keep_chunks.push_back(c);
×
1381
        }
1382
    } else {
1383
        for (std::uint64_t c = 0; c < total_chunks; ++c)
24✔
1384
            keep_chunks.push_back(c);
16!
1385
    }
1386

1387
    // Intersect with the user's line range so workers only touch chunks that
1388
    // actually overlap it. Each work item carries the sub-line-range;
1389
    // LINE_RANGE on the read maps it back to bytes via the same checkpoint
1390
    // table the gzip stream uses.
1391
    if (clip.has_line_clip()) {
8✔
1392
        std::size_t lo = clip.start_line > 0 ? clip.start_line : 1;
4!
1393
        std::size_t hi = clip.end_line > 0 ? clip.end_line : SIZE_MAX;
4!
1394
        std::vector<std::uint64_t> filtered;
4✔
1395
        filtered.reserve(keep_chunks.size());
4!
1396
        for (auto c : keep_chunks) {
12✔
1397
            std::size_t cf = geo.first_line(c);
8!
1398
            std::size_t cl = geo.last_line(c);
8!
1399
            if (cl < lo || cf > hi) continue;
8✔
1400
            filtered.push_back(c);
4!
1401
        }
1402
        keep_chunks = std::move(filtered);
4✔
1403
    }
4✔
1404
    return keep_chunks;
8✔
1405
}
4!
1406

1407
// Group the kept chunks into contiguous work ranges (one per worker slot) and
1408
// append the resulting work items.
1409
static void emit_work_items(std::vector<ArrowWorkItem> &items,
8✔
1410
                            const std::string &fp, const ChunkGeometry &geo,
1411
                            const std::vector<std::uint64_t> &keep_chunks,
1412
                            bool file_pure_match, std::size_t max_workers,
1413
                            const ClipRange &clip) {
1414
    std::size_t target_ranges = std::max<std::size_t>(1, max_workers);
8✔
1415
    std::size_t per_range = std::max<std::size_t>(
8✔
1416
        1, (keep_chunks.size() + target_ranges - 1) / target_ranges);
8✔
1417

1418
    std::size_t group_start = 0;
8✔
1419
    while (group_start < keep_chunks.size()) {
20✔
1420
        std::size_t group_end = group_start;
12✔
1421
        std::size_t emitted = 0;
12✔
1422
        while (group_end < keep_chunks.size() && emitted < per_range) {
24✔
1423
            if (group_end > group_start &&
12!
NEW
1424
                keep_chunks[group_end] != keep_chunks[group_end - 1] + 1) {
×
NEW
1425
                break;
×
1426
            }
1427
            ++group_end;
12✔
1428
            ++emitted;
12✔
1429
        }
1430
        std::uint64_t scidx = keep_chunks[group_start];
12✔
1431
        std::uint64_t ecidx = keep_chunks[group_end - 1];
12✔
1432
        std::size_t start_byte = geo.start_byte(scidx);
12✔
1433
        std::size_t end_byte = geo.end_byte(ecidx);
12✔
1434
        // start_at_checkpoint: a gzip recovery point sits at start_byte (true
1435
        // for any cidx>=1; false for the implicit chunk 0 which decodes from
1436
        // stream start).
1437
        bool start_at_checkpoint = (scidx >= 1);
12✔
1438
        bool end_at_checkpoint = (group_end < keep_chunks.size());
12✔
1439
        if (clip.has_line_clip()) {
12✔
1440
            std::size_t lo = clip.start_line > 0 ? clip.start_line : 1;
4!
1441
            std::size_t hi = clip.end_line > 0 ? clip.end_line : SIZE_MAX;
4!
1442
            std::size_t cluster_first = geo.first_line(scidx);
4✔
1443
            std::size_t cluster_last = geo.last_line(ecidx);
4✔
1444
            std::size_t item_start = std::max<std::size_t>(lo, cluster_first);
4✔
1445
            std::size_t item_end = std::min<std::size_t>(hi, cluster_last);
4✔
1446
            if (item_start > item_end) {
4!
NEW
1447
                group_start = group_end;
×
NEW
1448
                continue;
×
1449
            }
1450
            ArrowWorkItem item;
4✔
1451
            item.file_path = fp;
4!
1452
            item.chunk_prune_only = file_pure_match;
4✔
1453
            item.start_line = item_start;
4✔
1454
            item.end_line = item_end;
4✔
1455
            items.push_back(std::move(item));
4!
1456
            group_start = group_end;
4✔
1457
            continue;
2✔
1458
        }
4✔
1459
        if (clip.has_byte_clip()) {
8✔
1460
            if (start_byte < clip.start_byte) {
4✔
NEW
1461
                start_byte = clip.start_byte;
×
NEW
1462
                start_at_checkpoint = false;
×
1463
            }
1464
            if (end_byte > clip.end_byte) {
4✔
1465
                end_byte = clip.end_byte;
4✔
1466
                end_at_checkpoint = false;
4✔
1467
            }
2✔
1468
            if (start_byte >= end_byte) {
4✔
1469
                group_start = group_end;
2✔
1470
                continue;
2✔
1471
            }
1472
        }
1✔
1473
        items.push_back({fp, start_byte, end_byte, start_at_checkpoint,
12!
1474
                         end_at_checkpoint, file_pure_match});
6✔
1475
        group_start = group_end;
6✔
1476
    }
1477
}
8✔
1478

1479
static std::vector<ArrowWorkItem> enumerate_work_items(
72✔
1480
    const std::vector<std::string> &files, const std::string &index_dir,
1481
    const std::string &query_str, std::size_t max_workers,
1482
    std::size_t clip_start_byte = 0, std::size_t clip_end_byte = 0,
1483
    std::size_t clip_start_line = 0, std::size_t clip_end_line = 0) {
1484
    namespace dft_internal =
1485
        dftracer::utils::utilities::composites::dft::internal;
1486
    namespace indexer_ns = dftracer::utils::utilities::indexer;
1487
    namespace indexing = dftracer::utils::utilities::composites::dft::indexing;
1488

1489
    const ClipRange clip{clip_start_byte, clip_end_byte, clip_start_line,
72✔
1490
                         clip_end_line};
72✔
1491

1492
    std::vector<ArrowWorkItem> items;
72✔
1493
    items.reserve(files.size() * 4);
72!
1494

1495
    auto push_unsplit = [&](const std::string &fp) {
150✔
1496
        ArrowWorkItem item;
114✔
1497
        item.file_path = fp;
114!
1498
        item.start_line = clip.start_line;
114✔
1499
        item.end_line = clip.end_line;
114✔
1500
        items.push_back(std::move(item));
114!
1501
    };
114✔
1502

1503
    // Parse the query once. Pruner input copies a Query, so we keep the
1504
    // parsed form around to feed each ChunkPrunerInput without re-parsing.
1505
    std::optional<dftracer::utils::utilities::common::query::Query> parsed;
72✔
1506
    if (!query_str.empty()) {
72✔
1507
        auto r = dftracer::utils::utilities::common::query::Query::from_string(
3!
1508
            query_str);
6!
1509
        if (r) parsed = std::move(*r);
6!
1510
    }
6✔
1511

1512
    // All files in a directory-mode scan share the same `.dftindex` root.
1513
    // Group files by their resolved index path so we can open the RocksDB
1514
    // once per index and reuse it to prune every file against that handle.
1515
    std::unordered_map<std::string, std::vector<std::size_t>> by_index;
72✔
1516
    for (std::size_t i = 0; i < files.size(); ++i) {
194✔
1517
        std::string index_path =
1518
            dft_internal::determine_index_path(files[i], index_dir);
122!
1519
        by_index[index_path].push_back(i);
122!
1520
    }
122✔
1521

1522
    for (auto &entry : by_index) {
150!
1523
        const auto &index_path = entry.first;
78✔
1524
        const auto &file_idxs = entry.second;
78✔
1525
        if (!fs::exists(index_path)) {
78!
1526
            for (auto i : file_idxs) push_unsplit(files[i]);
156✔
1527
            continue;
62✔
1528
        }
31✔
1529
        std::unique_ptr<indexer_ns::IndexDatabase> idx_db;
16✔
1530
        try {
1531
            idx_db = std::make_unique<indexer_ns::IndexDatabase>(
16!
1532
                index_path,
8✔
1533
                dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
16!
1534
        } catch (...) {
8✔
1535
            for (auto i : file_idxs) push_unsplit(files[i]);
×
1536
            continue;
1537
        }
×
1538

1539
        // Resolve fid + checkpoints per file (cheap queries).
1540
        struct FileCtx {
1541
            std::size_t file_idx;
1542
            int fid;
1543
            std::vector<indexer_ns::IndexerCheckpoint> ckpts;
1544
        };
1545
        std::vector<FileCtx> file_ctxs;
16✔
1546
        file_ctxs.reserve(file_idxs.size());
16!
1547
        for (auto i : file_idxs) {
44✔
1548
            FileCtx fc;
28✔
1549
            fc.file_idx = i;
28✔
1550
            fc.fid = idx_db->get_file_info_id(
42✔
1551
                indexer_ns::internal::get_logical_path(files[i]));
42!
1552
            if (fc.fid < 0) {
28!
1553
                push_unsplit(files[i]);
×
1554
                continue;
×
1555
            }
1556
            fc.ckpts = idx_db->query_checkpoints(fc.fid);
28!
1557
            if (fc.ckpts.empty()) {
28✔
1558
                push_unsplit(files[i]);
20!
1559
                continue;
20✔
1560
            }
1561
            std::sort(fc.ckpts.begin(), fc.ckpts.end(),
8!
1562
                      [](const auto &a, const auto &b) {
×
1563
                          return a.first_line_num < b.first_line_num;
×
1564
                      });
1565
            file_ctxs.push_back(std::move(fc));
8!
1566
        }
28✔
1567

1568
        // Batch-prune all files against the shared index: dim_stats and
1569
        // chunk_statistics are loaded in one RocksDB scan each instead of
1570
        // one scan per file.
1571
        std::vector<indexing::ChunkPrunerOutput> pruner_outs(file_ctxs.size());
16!
1572
        if (parsed && !file_ctxs.empty()) {
16!
1573
            indexing::ChunkPrunerBatchInput batch_in;
×
1574
            batch_in.index_path = index_path;
×
1575
            batch_in.external_db = idx_db.get();
×
1576
            batch_in.items.reserve(file_ctxs.size());
×
1577
            for (auto &fc : file_ctxs) {
×
1578
                batch_in.items.push_back({files[fc.file_idx], *parsed});
×
1579
            }
1580
            indexing::ChunkPrunerUtility pruner;
×
1581
            auto batch_out = pruner.process_batch(batch_in);
×
1582
            if (batch_out.success) {
×
1583
                pruner_outs = std::move(batch_out.outputs);
×
1584
            }
1585
        }
×
1586

1587
        // For AND-of-EQ predicates, precompute uniform-match leaves once.
1588
        // Per-file pure_match is checked inline below and lets workers skip
1589
        // per-event predicate eval on chunks where dim_stats min == max ==
1590
        // literal for every leaf.
1591
        std::optional<std::vector<std::pair<std::string, std::string>>>
1592
            eq_leaves;
16✔
1593
        if (parsed) eq_leaves = extract_eq_leaves(parsed->root());
16!
1594

1595
        for (std::size_t fc_idx = 0; fc_idx < file_ctxs.size(); ++fc_idx) {
24✔
1596
            auto &fc = file_ctxs[fc_idx];
8✔
1597
            const auto &fp = files[fc.file_idx];
8✔
1598

1599
            ChunkGeometry geo{fc.ckpts};
8✔
1600
            std::vector<std::uint64_t> keep_chunks = select_kept_chunks(
4!
1601
                geo, parsed.has_value(), pruner_outs[fc_idx], clip);
8!
1602
            if (keep_chunks.empty()) continue;
8!
1603

1604
            // All-or-nothing per file: if every kept chunk is uniform-matching
1605
            // for every leaf, every work item from this file gets the
1606
            // chunk_prune_only fast path. Mixed files fall back to per-event
1607
            // eval to stay safe.
1608
            bool file_pure_match = false;
8✔
1609
            if (eq_leaves && !eq_leaves->empty() && idx_db) {
8!
1610
                file_pure_match = all_chunks_uniform_match(
×
1611
                    *idx_db, fc.fid, *eq_leaves, keep_chunks);
×
1612
            }
1613

1614
            emit_work_items(items, fp, geo, keep_chunks, file_pure_match,
12!
1615
                            max_workers, clip);
4✔
1616
        }
8✔
1617
    }
16✔
1618
    return items;
108✔
1619
}
72!
1620

1621
static CoroTask<void> send_work_items_to_channel(
464!
1622
    std::shared_ptr<dftracer::utils::coro::Channel<ArrowWorkItem>> chan,
1623
    const std::vector<ArrowWorkItem> *items, std::atomic<bool> *cancelled) {
36!
1624
    for (const auto &it : *items) {
346✔
1625
        if (cancelled->load(std::memory_order_acquire)) break;
186!
1626
        if (!co_await chan->send(it)) break;
284!
1627
    }
62✔
1628
    chan->close();
36!
1629
    co_return;
36✔
1630
}
320!
1631

1632
static CoroTask<void> checkpoint_worker(
311!
1633
    std::shared_ptr<dftracer::utils::coro::Channel<ArrowWorkItem>> work_chan,
1634
    dftracer::utils::coro::Channel<ArrowExportResult> *out_chan,
1635
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1636
    ReadConfig rc, std::size_t batch_size, bool normalize,
1637
    std::atomic<bool> *cancelled) {
52!
1638
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer(
104!
1639
        out_chan);
52✔
1640
    auto guard = producer.guard();
52!
1641

1642
    // Cache readers keyed by file path so we don't re-probe the same file
1643
    // when successive work items land on it.
1644
    std::unordered_map<std::string, std::shared_ptr<TraceReader>> readers;
52✔
1645

1646
    while (auto item = co_await work_chan->receive()) {
352!
1647
        if (cancelled->load(std::memory_order_acquire)) co_return;
186!
1648

1649
        auto &reader_ptr = readers[item->file_path];
186!
1650
        if (!reader_ptr) {
186✔
1651
            TraceReaderConfig cfg;
62✔
1652
            cfg.file_path = item->file_path;
62!
1653
            cfg.index_dir = index_dir;
62!
1654
            cfg.checkpoint_size = checkpoint_size;
62✔
1655
            cfg.auto_build_index = auto_build_index;
62✔
1656
            reader_ptr = std::make_shared<TraceReader>(std::move(cfg));
62!
1657
        }
62✔
1658

1659
        ReadConfig local_rc = rc;
186!
1660
        if (item->start_line > 0 || item->end_line > 0) {
186!
1661
            // Line-range work items: the read drives off LINE_RANGE; the
1662
            // gzip stream resolves it back to byte offsets via checkpoints.
1663
            local_rc.start_line = item->start_line;
127✔
1664
            local_rc.end_line = item->end_line;
127✔
1665
            local_rc.start_byte = 0;
127✔
1666
            local_rc.end_byte = 0;
127✔
1667
            local_rc.start_at_checkpoint = false;
127✔
1668
            local_rc.end_at_checkpoint = false;
127✔
1669
        } else {
127✔
1670
            local_rc.start_byte = item->start_byte;
59✔
1671
            local_rc.end_byte = item->end_byte;
59✔
1672
            local_rc.start_at_checkpoint = item->start_at_checkpoint;
59✔
1673
            local_rc.end_at_checkpoint = item->end_at_checkpoint;
59✔
1674
        }
1675
        // Pruning already happened at enumeration time; avoid the per-
1676
        // work-item RocksDB opens that would otherwise dwarf the actual
1677
        // read cost at directory scale (256 files * N ranges).
1678
        local_rc.skip_pruning = true;
186✔
1679
        // chunks pre-classified as uniform-matching skip per-event eval.
1680
        if (item->chunk_prune_only) local_rc.chunk_prune_only = true;
186!
1681

1682
        if (!normalize) {
186!
1683
            auto batch_gen = reader_ptr->read_arrow(local_rc, batch_size);
186!
1684
            while (auto batch_opt = co_await batch_gen.next()) {
569!
1685
                if (cancelled->load(std::memory_order_acquire)) co_return;
243!
1686
                if (!co_await producer.send(std::move(*batch_opt))) co_return;
324!
1687
            }
305!
1688
            continue;
62✔
1689
        }
62✔
1690

1691
        auto gen = reader_ptr->read_json(local_rc);
×
1692
        RecordBatchBuilder builder;
×
1693
        builder.reserve(batch_size);
×
1694
        StringArena arena;
×
1695

1696
        while (auto opt = co_await gen.next()) {
×
1697
            if (cancelled->load(std::memory_order_acquire)) co_return;
×
1698
            if (!build_arrow_row(builder, *opt->parser, arena, normalize))
×
1699
                continue;
1700
            if (builder.num_rows() >= batch_size) {
×
1701
                auto result = builder.finish();
×
1702
                arena.clear();
×
1703
                if (!co_await producer.send(std::move(result))) co_return;
×
1704
                if (!builder.is_schema_locked()) builder.lock_schema();
×
1705
                builder.reset(true);
×
1706
                builder.reserve(batch_size);
×
1707
            }
×
1708
        }
×
1709
        if (builder.num_rows() > 0) {
×
1710
            if (!co_await producer.send(builder.finish())) co_return;
×
1711
        }
1712
    }
238!
1713
    co_return;
52✔
1714
}
1,294!
1715

1716
static CoroTask<void> spawn_arrow_producers(
216!
1717
    CoroScope &child,
1718
    dftracer::utils::coro::Channel<ArrowExportResult> *out_chan,
1719
    const std::vector<ArrowWorkItem> *work_items, const std::string *index_dir,
1720
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
1721
    std::size_t batch_size, bool normalize, std::atomic<bool> *cancelled_ptr,
1722
    std::size_t max_workers) {
36!
1723
    std::size_t num_workers = std::min(work_items->size(), max_workers);
36!
1724
    if (num_workers == 0) num_workers = 1;
36!
1725
    auto work_chan =
36✔
1726
        dftracer::utils::coro::make_channel<ArrowWorkItem>(num_workers);
36!
1727

1728
    for (std::size_t i = 0; i < num_workers; ++i) {
88✔
1729
        child.spawn([out_chan, wc = work_chan, idx = *index_dir,
153!
1730
                     checkpoint_size, auto_build_index, r = *rc, batch_size,
156!
1731
                     normalize, cancelled_ptr](CoroScope &) {
254✔
1732
            return checkpoint_worker(wc, out_chan, idx, checkpoint_size,
153!
1733
                                     auto_build_index, r, batch_size, normalize,
101!
1734
                                     cancelled_ptr);
101!
1735
        });
1736
    }
52✔
1737

1738
    child.spawn([wc = work_chan, work_items, cancelled_ptr](CoroScope &) {
108!
1739
        return send_work_items_to_channel(wc, work_items, cancelled_ptr);
72!
1740
    });
1741
    co_return;
72✔
1742
}
108!
1743

1744
static CoroTask<void> produce_arrow_batches_for_files(
294!
1745
    CoroScope &scope, ArrowIteratorState *sp, std::vector<std::string> files,
1746
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1747
    ReadConfig rc, std::size_t batch_size, bool normalize,
1748
    std::size_t max_workers) {
37!
1749
    try {
1750
        if (files.empty()) {
109✔
1751
            sp->channel->close();
1!
1752
            co_return;
38✔
1753
        }
1754

1755
        auto work_items = enumerate_work_items(
216!
1756
            files, index_dir, rc.query, max_workers, rc.start_byte, rc.end_byte,
108✔
1757
            rc.start_line, rc.end_line);
108✔
1758
        if (work_items.empty()) {
108!
1759
            sp->channel->close();
×
1760
            co_return;
1761
        }
1762

1763
        auto *chan_ptr = sp->channel.get();
108✔
1764
        auto *cancelled_ptr = &sp->cancelled;
108✔
1765

1766
        co_await scope.scope([chan_ptr, &work_items, &index_dir,
1,080!
1767
                              checkpoint_size, auto_build_index, &rc,
216✔
1768
                              batch_size, normalize, cancelled_ptr,
324✔
1769
                              max_workers](CoroScope &child) -> CoroTask<void> {
144!
1770
            co_await spawn_arrow_producers(
288!
1771
                child, chan_ptr, &work_items, &index_dir, checkpoint_size,
108✔
1772
                auto_build_index, &rc, batch_size, normalize, cancelled_ptr,
108✔
1773
                max_workers);
108✔
1774
        });
144!
1775
    } catch (...) {
36!
1776
        sp->set_error(std::current_exception());
×
1777
    }
×
1778
}
146!
1779

1780
static CoroTask<void> produce_arrow_batches_parallel(
180!
1781
    CoroScope &scope, ArrowIteratorState *sp, std::string dir_path,
1782
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1783
    ReadConfig rc, std::size_t batch_size, bool normalize,
1784
    std::size_t max_workers) {
15!
1785
    try {
1786
        PatternDirectoryScannerUtility scanner;
45!
1787
        auto scan_input = PatternDirectoryScannerUtilityInput(
90!
1788
            dir_path, {".pfw", ".pfw.gz"}, true, false);
45!
1789
        auto entries = co_await scope.spawn(scanner, scan_input);
75!
1790

1791
        std::vector<std::string> files;
45✔
1792
        files.reserve(entries.size());
45✔
1793
        for (auto &e : entries) files.push_back(e.path.string());
54!
1794
        std::sort(files.begin(), files.end());
15✔
1795

1796
        co_await produce_arrow_batches_for_files(
105!
1797
            scope, sp, std::move(files), std::move(index_dir), checkpoint_size,
45✔
1798
            auto_build_index, std::move(rc), batch_size, normalize,
45✔
1799
            max_workers);
45✔
1800
    } catch (...) {
45✔
1801
        sp->set_error(std::current_exception());
×
1802
    }
×
1803
}
210!
1804

1805
CoroTask<void> produce_arrow_batches(
136!
1806
    std::shared_ptr<ArrowIteratorState> state,
1807
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer,
1808
    TraceReaderConfig cfg, ReadConfig rc, std::size_t batch_size,
1809
    bool flatten_objects = false, bool normalize = false) {
17!
1810
    (void)flatten_objects;
1811

1812
    auto guard = producer.guard();
51!
1813
    try {
1814
        TraceReader reader(std::move(cfg));
51!
1815

1816
        if (!normalize) {
51!
1817
            auto batch_gen = reader.read_arrow(rc, batch_size);
51!
1818
            while (auto batch_opt = co_await batch_gen.next()) {
213!
1819
                if (state->cancelled.load(std::memory_order_acquire)) break;
97✔
1820
                auto result_bytes =
96✔
1821
                    dftracer::utils::python::byte_size(*batch_opt);
96!
1822
                state->bytes_in_queue.fetch_add(result_bytes,
96✔
1823
                                                std::memory_order_acq_rel);
1824
                if (!co_await producer.send(std::move(*batch_opt))) break;
128!
1825
            }
113!
1826
            co_return;
17✔
1827
        }
17✔
1828

1829
        auto gen = reader.read_json(rc);
×
1830
        RecordBatchBuilder builder;
×
1831
        builder.reserve(batch_size);
×
1832

1833
        StringArena arena;
×
1834

1835
        while (auto opt = co_await gen.next()) {
×
1836
            if (state->cancelled.load(std::memory_order_acquire)) break;
×
1837
            if (!build_arrow_row(builder, *opt->parser, arena, normalize))
×
1838
                continue;
1839

1840
            if (builder.num_rows() >= batch_size) {
×
1841
                auto result = builder.finish();
×
1842
                arena.clear();
×
1843
                auto result_bytes = dftracer::utils::python::byte_size(result);
×
1844
                state->bytes_in_queue.fetch_add(result_bytes,
1845
                                                std::memory_order_acq_rel);
1846
                if (!co_await producer.send(std::move(result))) break;
×
1847
                if (!builder.is_schema_locked()) {
×
1848
                    builder.lock_schema();
1849
                }
1850
                builder.reset(true);
×
1851
                builder.reserve(batch_size);
×
1852
            }
×
1853
        }
×
1854

1855
        if (builder.num_rows() > 0 &&
×
1856
            !state->cancelled.load(std::memory_order_acquire)) {
1857
            auto result = builder.finish();
×
1858
            auto result_bytes = dftracer::utils::python::byte_size(result);
×
1859
            state->bytes_in_queue.fetch_add(result_bytes,
1860
                                            std::memory_order_acq_rel);
1861
            co_await producer.send(std::move(result));
×
1862
        }
×
1863
    } catch (...) {
17!
1864
        state->set_error(std::current_exception());
×
1865
    }
×
1866
}
375!
1867

1868
#endif  // DFTRACER_UTILS_ENABLE_ARROW
1869

1870
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
1871

1872
struct WriteArrowStats {
39✔
1873
    std::unordered_map<std::string, PartitionWriteStats> partitions;
1874
    int64_t total_rows = 0;
39✔
1875
    int64_t total_uncompressed_bytes = 0;
39✔
1876
};
1877

1878
struct WriteArrowResult {
52✔
1879
    WriteArrowStats stats;
1880
    std::string error;
1881
    std::uint64_t chunks_scanned = 0;
39✔
1882
    std::uint64_t chunks_skipped = 0;
39✔
1883
};
1884

1885
CoroTask<WriteArrowResult> write_arrow_pipeline(
190!
1886
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
1887
    std::vector<ViewDefinition> views, std::string output_path,
1888
    int64_t chunk_size_bytes, IpcCompression compression,
1889
    std::size_t event_batch_size) {
13!
1890
    namespace dft_internal =
1891
        dftracer::utils::utilities::composites::dft::internal;
1892
    WriteArrowResult result;
13✔
1893

1894
    try {
1895
        if (views.empty()) {
13✔
1896
            views.push_back(ViewDefinition().with_name("all"));
8!
1897
        }
8✔
1898

1899
        std::string resolved_index =
13✔
1900
            index_path.empty()
26!
1901
                ? dft_internal::determine_index_path(file_path, "")
13!
1902
                : index_path;
×
1903

1904
        auto meta_input = MetadataCollectorUtilityInput::from_file(file_path)
26!
1905
                              .with_checkpoint_size(checkpoint_size)
13!
1906
                              .with_index(resolved_index);
13!
1907
        auto metadata = co_await MetadataCollectorUtility{}.process(meta_input);
39!
1908
        if (!metadata.success) {
13!
1909
            result.error =
1910
                "Failed to collect metadata: " + metadata.error_message;
×
1911
            co_return result;
×
1912
        }
1913

1914
        for (const auto &view : views) {
99✔
1915
            std::string view_output = output_path;
30!
1916
            if (views.size() > 1 || view.name != "all") {
30✔
1917
                view_output = output_path + "/" + view.name;
42!
1918
            }
6✔
1919

1920
            PartitionWriter writer;
42!
1921
            int rc_open = co_await writer.open(view_output, chunk_size_bytes,
98!
1922
                                               compression);
42✔
1923
            if (rc_open != 0) {
42!
1924
                result.error =
1925
                    "Failed to open partition writer for view: " + view.name;
×
1926
                co_return result;
×
1927
            }
1928

1929
            ViewBuilderInput builder_input;
42✔
1930
            builder_input.with_view(view)
42✔
1931
                .with_file_path(file_path)
14!
1932
                .with_index_path(resolved_index)
14!
1933
                .with_uncompressed_size(metadata.uncompressed_size)
14!
1934
                .with_num_checkpoints(metadata.num_checkpoints);
14✔
1935

1936
            auto build_output =
42✔
1937
                co_await ViewBuilderUtility{}.process(builder_input);
56!
1938
            if (!build_output.success) {
58!
1939
                result.error = "ViewBuilder failed for view: " + view.name;
×
1940
                co_return result;
×
1941
            }
1942

1943
            result.chunks_skipped += build_output.skipped_checkpoints;
58✔
1944

1945
            if (!build_output.file_may_match) {
58✔
1946
                auto stats = co_await writer.close();
24!
1947
                result.stats.partitions[view.name] = std::move(stats);
6!
1948
                continue;
1949
            }
6✔
1950

1951
            RecordBatchBuilder builder;
40!
1952
            bool schema_locked = false;
40✔
1953

1954
            for (const auto &candidate : build_output.candidates) {
64!
1955
                ViewReaderInput reader_input;
40✔
1956
                reader_input.with_file_path(file_path)
40✔
1957
                    .with_index_path(resolved_index)
8!
1958
                    .with_checkpoint_size(checkpoint_size)
8!
1959
                    .with_byte_range(candidate.start_byte, candidate.end_byte)
8!
1960
                    .with_checkpoint_idx(candidate.checkpoint_idx)
8!
1961
                    .with_event_batch_size(event_batch_size)
8!
1962
                    .with_view(view);
8!
1963
                reader_input.query = view.query;
8✔
1964

1965
                ViewReaderUtility reader;
40!
1966
                auto gen = reader.process(reader_input);
40!
1967
                while (auto opt = co_await gen.next()) {
64!
1968
                    auto arrow_batch = opt->to_arrow(builder);
24!
1969
                    int rc_write = co_await writer.write_batch(arrow_batch);
32!
1970
                    if (rc_write != 0) {
8!
1971
                        result.error =
1972
                            "Failed to write batch for view: " + view.name;
×
1973
                        co_return result;
×
1974
                    }
1975
                    if (!schema_locked) {
8!
1976
                        builder.lock_schema();
8✔
1977
                        schema_locked = true;
8✔
1978
                    }
8✔
1979
                    builder.reset(true);
8!
1980
                }
32!
1981
                result.chunks_scanned++;
8✔
1982
            }
24✔
1983

1984
            auto stats = co_await writer.close();
32!
1985
            result.stats.partitions[view.name] = std::move(stats);
8!
1986
            result.stats.total_rows +=
8✔
1987
                result.stats.partitions[view.name].total_rows;
8!
1988
            result.stats.total_uncompressed_bytes +=
8✔
1989
                result.stats.partitions[view.name].total_uncompressed_bytes;
8!
1990
        }
86✔
1991
    } catch (const std::exception &e) {
69!
1992
        result.error = e.what();
×
1993
    }
×
1994
    co_return result;
13!
1995
}
405!
1996

1997
struct ViewChunkInfo {
1998
    std::uint64_t checkpoint_idx;
1999
    std::size_t start_byte;
2000
    std::size_t end_byte;
2001
};
2002

2003
struct GetViewChunksResult {
12✔
2004
    std::vector<ViewChunkInfo> chunks;
2005
    std::uint64_t total_checkpoints = 0;
9✔
2006
    std::uint64_t skipped_checkpoints = 0;
9✔
2007
    bool file_may_match = false;
9✔
2008
    std::string error;
2009
};
2010

2011
CoroTask<GetViewChunksResult> get_view_chunks_pipeline(
30!
2012
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
2013
    ViewDefinition view) {
3!
2014
    namespace dft_internal =
2015
        dftracer::utils::utilities::composites::dft::internal;
2016
    GetViewChunksResult result;
3✔
2017

2018
    try {
2019
        std::string resolved_index =
3✔
2020
            index_path.empty()
6!
2021
                ? dft_internal::determine_index_path(file_path, "")
3!
2022
                : index_path;
×
2023

2024
        auto meta_input = MetadataCollectorUtilityInput::from_file(file_path)
3!
2025
                              .with_checkpoint_size(checkpoint_size)
3✔
2026
                              .with_index(resolved_index);
3!
2027
        auto metadata = co_await MetadataCollectorUtility{}.process(meta_input);
9!
2028
        if (!metadata.success) {
9!
2029
            result.error =
2030
                "Failed to collect metadata: " + metadata.error_message;
×
2031
            co_return result;
×
2032
        }
2033

2034
        ViewBuilderInput builder_input;
9✔
2035
        builder_input.with_view(view)
9✔
2036
            .with_file_path(file_path)
3!
2037
            .with_index_path(resolved_index)
3!
2038
            .with_uncompressed_size(metadata.uncompressed_size)
3!
2039
            .with_num_checkpoints(metadata.num_checkpoints);
3✔
2040

2041
        auto build_output =
9✔
2042
            co_await ViewBuilderUtility{}.process(builder_input);
12!
2043
        if (!build_output.success) {
3!
2044
            result.error = "ViewBuilder failed";
×
2045
            co_return result;
×
2046
        }
2047

2048
        result.file_may_match = build_output.file_may_match;
3✔
2049
        result.total_checkpoints = build_output.total_checkpoints;
3✔
2050
        result.skipped_checkpoints = build_output.skipped_checkpoints;
3✔
2051

2052
        for (const auto &candidate : build_output.candidates) {
7✔
2053
            result.chunks.push_back({candidate.checkpoint_idx,
12!
2054
                                     candidate.start_byte, candidate.end_byte});
8✔
2055
        }
4✔
2056
    } catch (const std::exception &e) {
9!
2057
        result.error = e.what();
×
2058
    }
×
2059
    co_return result;
3!
2060
}
39!
2061

2062
struct WriteViewChunkResult {
16✔
2063
    std::string output_file;
2064
    std::uint64_t events_matched = 0;
12✔
2065
    std::uint64_t events_scanned = 0;
12✔
2066
    int64_t rows_written = 0;
12✔
2067
    int64_t bytes_written = 0;
12✔
2068
    std::string error;
2069
};
2070

2071
CoroTask<WriteViewChunkResult> write_view_chunk_pipeline(
56!
2072
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
2073
    ViewDefinition view, std::uint64_t checkpoint_idx, std::size_t start_byte,
2074
    std::size_t end_byte, std::string output_file, IpcCompression compression,
2075
    std::size_t event_batch_size) {
4!
2076
    namespace dft_internal =
2077
        dftracer::utils::utilities::composites::dft::internal;
2078
    WriteViewChunkResult result;
4✔
2079
    result.output_file = output_file;
4!
2080

2081
    try {
2082
        std::string resolved_index =
4✔
2083
            index_path.empty()
8!
2084
                ? dft_internal::determine_index_path(file_path, "")
4!
2085
                : index_path;
×
2086

2087
        dftracer::utils::utilities::common::arrow::IpcWriter writer;
4!
2088
        int rc_open = co_await writer.open(output_file, compression);
12!
2089
        if (rc_open != 0) {
20!
2090
            result.error = "Failed to open output file";
×
2091
            co_return result;
×
2092
        }
2093

2094
        ViewReaderInput reader_input;
20✔
2095
        reader_input.with_file_path(file_path)
20✔
2096
            .with_index_path(resolved_index)
3✔
2097
            .with_checkpoint_size(checkpoint_size)
4!
2098
            .with_byte_range(start_byte, end_byte)
4!
2099
            .with_checkpoint_idx(checkpoint_idx)
4!
2100
            .with_event_batch_size(event_batch_size)
4!
2101
            .with_view(view);
4✔
2102
        reader_input.query = view.query;
3✔
2103

2104
        RecordBatchBuilder builder;
18!
2105
        bool schema_locked = false;
18✔
2106

2107
        ViewReaderUtility reader;
18!
2108
        auto gen = reader.process(reader_input);
18!
2109
        while (auto opt = co_await gen.next()) {
29!
2110
            result.events_matched += opt->events_matched;
12✔
2111
            result.events_scanned += opt->events_scanned;
12✔
2112
            auto batch = opt->to_arrow(builder);
12!
2113
            if (batch.valid()) {
12!
2114
                result.rows_written += batch.num_rows();
12✔
2115
                int rc = co_await writer.write_batch(batch);
16!
2116
                if (rc != 0) {
4!
2117
                    result.error = "Failed to write batch";
×
2118
                    co_return result;
×
2119
                }
2120
                if (!schema_locked) {
4!
2121
                    builder.lock_schema();
4✔
2122
                    schema_locked = true;
4✔
2123
                }
4✔
2124
                builder.reset(true);
4!
2125
            }
4!
2126
        }
16!
2127

2128
        int rc = co_await writer.close();
16!
2129
        if (rc != 0) {
4!
2130
            result.error = "Failed to close output file";
×
2131
        }
2132
    } catch (const std::exception &e) {
20!
2133
        result.error = e.what();
×
2134
    }
×
2135
    co_return result;
4!
2136
}
116!
2137

2138
struct ChunkDescriptor {
2139
    std::uint64_t checkpoint_idx;
2140
    std::size_t start_byte;
2141
    std::size_t end_byte;
2142
    std::string output_file;
2143
};
2144

2145
struct WriteViewChunksResult {
3✔
2146
    std::vector<WriteViewChunkResult> results;
2147
    int64_t total_rows = 0;
3✔
2148
    int64_t total_events_matched = 0;
3✔
2149
};
2150

2151
CoroTask<WriteViewChunksResult> write_view_chunks_pipeline(
8!
2152
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
2153
    ViewDefinition view, std::vector<ChunkDescriptor> chunks,
2154
    IpcCompression compression, std::size_t event_batch_size) {
1!
2155
    WriteViewChunksResult result;
3✔
2156

2157
    if (chunks.empty()) {
3!
2158
        co_return result;
1!
2159
    }
2160

2161
    std::vector<CoroTask<WriteViewChunkResult>> tasks;
3✔
2162
    tasks.reserve(chunks.size());
3!
2163

2164
    for (const auto &chunk : chunks) {
6✔
2165
        tasks.push_back(write_view_chunk_pipeline(
6!
2166
            file_path, index_path, checkpoint_size, view, chunk.checkpoint_idx,
3!
2167
            chunk.start_byte, chunk.end_byte, chunk.output_file, compression,
3!
2168
            event_batch_size));
3✔
2169
    }
3✔
2170

2171
    result.results = co_await when_all(std::move(tasks));
4!
2172

2173
    for (const auto &r : result.results) {
4✔
2174
        result.total_rows += r.rows_written;
3✔
2175
        result.total_events_matched += r.events_matched;
3✔
2176
    }
3✔
2177

2178
    co_return result;
1!
2179
}
7!
2180

2181
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
2182

2183
TraceReaderConfig build_config(TraceReaderObject *self) {
302✔
2184
    TraceReaderConfig cfg;
302✔
2185
    cfg.file_path = PyUnicode_AsUTF8(self->file_path);
302!
2186
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
302!
2187
    if (idx) cfg.index_dir = idx;
302!
2188
    cfg.checkpoint_size = self->checkpoint_size;
302✔
2189
    cfg.auto_build_index = self->auto_build_index != 0;
302✔
2190
    return cfg;
302✔
2191
}
151!
2192

2193
static Runtime *get_runtime(TraceReaderObject *self) {
300✔
2194
    if (self->runtime_obj) {
300✔
2195
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
68✔
2196
    }
2197
    return get_default_runtime();
232✔
2198
}
150✔
2199

2200
static TraceReaderIteratorObject *make_memoryview_iterator(
142✔
2201
    std::shared_ptr<MemoryViewBatchIteratorState> state) {
2202
    TraceReaderIteratorObject *it =
71✔
2203
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
142✔
2204
            &TraceReaderIteratorType, 0);
2205
    if (!it) return NULL;
142✔
2206
    new (&it->batch_state)
142✔
2207
        std::shared_ptr<MemoryViewBatchIteratorState>(std::move(state));
142✔
2208
    it->current_batch = NULL;
142✔
2209
    it->batch_index = 0;
142✔
2210
    new (&it->json_dict_state) std::shared_ptr<JsonDictIteratorState>();
142✔
2211
    new (&it->json_dict_current_batch) std::shared_ptr<JsonDictBatch>();
142✔
2212
    it->json_dict_index = 0;
142✔
2213
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2214
    new (&it->arrow_state) std::shared_ptr<ArrowIteratorState>();
142✔
2215
#endif
2216
    it->mode = IteratorMode::MEMORYVIEW;
142✔
2217
    return it;
142✔
2218
}
71✔
2219

2220
static TraceReaderIteratorObject *make_json_dict_iterator(
14✔
2221
    std::shared_ptr<JsonDictIteratorState> state) {
2222
    TraceReaderIteratorObject *it =
7✔
2223
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
14✔
2224
            &TraceReaderIteratorType, 0);
2225
    if (!it) return NULL;
14✔
2226
    new (&it->batch_state) std::shared_ptr<MemoryViewBatchIteratorState>();
14✔
2227
    it->current_batch = NULL;
14✔
2228
    it->batch_index = 0;
14✔
2229
    new (&it->json_dict_state)
14✔
2230
        std::shared_ptr<JsonDictIteratorState>(std::move(state));
14✔
2231
    new (&it->json_dict_current_batch) std::shared_ptr<JsonDictBatch>();
14✔
2232
    it->json_dict_index = 0;
14✔
2233
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2234
    new (&it->arrow_state) std::shared_ptr<ArrowIteratorState>();
14✔
2235
#endif
2236
    it->mode = IteratorMode::JSON_DICT;
14✔
2237
    return it;
14✔
2238
}
7✔
2239

2240
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2241
static TraceReaderIteratorObject *make_arrow_iterator(
54✔
2242
    std::shared_ptr<ArrowIteratorState> state) {
2243
    TraceReaderIteratorObject *it =
27✔
2244
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
54✔
2245
            &TraceReaderIteratorType, 0);
2246
    if (!it) return NULL;
54✔
2247
    new (&it->batch_state) std::shared_ptr<MemoryViewBatchIteratorState>();
54✔
2248
    it->current_batch = NULL;
54✔
2249
    it->batch_index = 0;
54✔
2250
    new (&it->json_dict_state) std::shared_ptr<JsonDictIteratorState>();
54✔
2251
    new (&it->json_dict_current_batch) std::shared_ptr<JsonDictBatch>();
54✔
2252
    it->json_dict_index = 0;
54✔
2253
    new (&it->arrow_state)
54✔
2254
        std::shared_ptr<ArrowIteratorState>(std::move(state));
54✔
2255
    it->mode = IteratorMode::ARROW;
54✔
2256
    return it;
54✔
2257
}
27✔
2258
#endif
2259

2260
}  // namespace
2261

2262
static void TraceReader_dealloc(TraceReaderObject *self) {
316✔
2263
    Py_XDECREF(self->file_path);
316✔
2264
    Py_XDECREF(self->index_dir);
316✔
2265
    Py_XDECREF(self->runtime_obj);
316✔
2266
    Py_TYPE(self)->tp_free((PyObject *)self);
316✔
2267
}
316✔
2268

2269
static PyObject *TraceReader_new(PyTypeObject *type, PyObject *args,
316✔
2270
                                 PyObject *kwds) {
2271
    TraceReaderObject *self = (TraceReaderObject *)type->tp_alloc(type, 0);
316✔
2272
    if (self) {
316✔
2273
        self->file_path = NULL;
316✔
2274
        self->index_dir = NULL;
316✔
2275
        self->checkpoint_size =
316✔
2276
            dftracer::utils::constants::indexer::DEFAULT_CHECKPOINT_SIZE;
2277
        self->auto_build_index = 0;
316✔
2278
        self->has_index = 0;
316✔
2279
        self->runtime_obj = NULL;
316✔
2280
    }
158✔
2281
    return (PyObject *)self;
316✔
2282
}
2283

2284
static int TraceReader_init(TraceReaderObject *self, PyObject *args,
316✔
2285
                            PyObject *kwds) {
2286
    static const char *kwlist[] = {
2287
        "path",    "index_dir", "checkpoint_size", "auto_build_index",
2288
        "runtime", NULL};
2289

2290
    const char *file_path;
2291
    const char *index_dir = "";
316✔
2292
    std::size_t checkpoint_size =
316✔
2293
        dftracer::utils::constants::indexer::DEFAULT_CHECKPOINT_SIZE;
2294
    int auto_build_index = 0;
316✔
2295
    PyObject *runtime_arg = NULL;
316✔
2296

2297
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|snpO", (char **)kwlist,
316!
2298
                                     &file_path, &index_dir, &checkpoint_size,
2299
                                     &auto_build_index, &runtime_arg)) {
2300
        return -1;
×
2301
    }
2302

2303
    if (runtime_arg && runtime_arg != Py_None) {
316✔
2304
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
64!
2305
            // Direct C++ Runtime object
2306
            Py_INCREF(runtime_arg);
×
2307
            self->runtime_obj = runtime_arg;
×
2308
        } else {
2309
            // Python wrapper, extract _native attribute
2310
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
64!
2311
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
64!
2312
                self->runtime_obj = native;  // already incref'd by GetAttr
64✔
2313
            } else {
32✔
2314
                Py_XDECREF(native);
×
2315
                PyErr_SetString(PyExc_TypeError,
×
2316
                                "runtime must be a Runtime instance or None");
2317
                return -1;
×
2318
            }
2319
        }
2320
    }
32✔
2321

2322
    self->file_path = PyUnicode_FromString(file_path);
316!
2323
    if (!self->file_path) return -1;
316✔
2324

2325
    self->index_dir = PyUnicode_FromString(index_dir);
316!
2326
    if (!self->index_dir) {
316✔
2327
        Py_DECREF(self->file_path);
×
2328
        self->file_path = NULL;
×
2329
        return -1;
×
2330
    }
2331

2332
    self->checkpoint_size = checkpoint_size;
316✔
2333
    self->auto_build_index = auto_build_index;
316✔
2334

2335
    try {
2336
        TraceReaderConfig cfg;
316✔
2337
        cfg.file_path = file_path;
316!
2338
        cfg.index_dir = index_dir;
316!
2339
        cfg.checkpoint_size = checkpoint_size;
316✔
2340
        cfg.auto_build_index = auto_build_index != 0;
316✔
2341
        TraceReader probe(std::move(cfg));
316!
2342
        self->has_index = probe.has_index() ? 1 : 0;
316!
2343
    } catch (const std::exception &e) {
316!
2344
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2345
        Py_DECREF(self->file_path);
×
2346
        Py_DECREF(self->index_dir);
×
2347
        self->file_path = NULL;
×
2348
        self->index_dir = NULL;
×
2349
        return -1;
×
2350
    }
×
2351

2352
    return 0;
316✔
2353
}
158✔
2354

2355
static PyObject *TraceReader_iter_lines(TraceReaderObject *self, PyObject *args,
124✔
2356
                                        PyObject *kwds) {
2357
    static const char *kwlist[] = {"start_line",    "end_line",    "start_byte",
2358
                                   "end_byte",      "buffer_size", "query",
2359
                                   "memory_budget", NULL};
2360
    Py_ssize_t start_line = 0, end_line = 0;
124✔
2361
    Py_ssize_t start_byte = 0, end_byte = 0;
124✔
2362
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
124✔
2363
    const char *query_str = NULL;
124✔
2364
    Py_ssize_t memory_budget = 0;
124✔
2365

2366
    if (!PyArg_ParseTupleAndKeywords(
124!
2367
            args, kwds, "|nnnnnzn", (char **)kwlist, &start_line, &end_line,
62✔
2368
            &start_byte, &end_byte, &buffer_size, &query_str, &memory_budget)) {
2369
        return NULL;
×
2370
    }
2371

2372
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
124!
2373
        buffer_size <= 0) {
118!
2374
        PyErr_SetString(
6!
2375
            PyExc_ValueError,
3✔
2376
            "range arguments must be >= 0; buffer_size must be > 0");
2377
        return NULL;
6✔
2378
    }
2379

2380
    TraceReaderConfig cfg;
118✔
2381
    try {
2382
        cfg = build_config(self);
118!
2383
    } catch (const std::exception &e) {
59!
2384
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2385
        return NULL;
×
2386
    }
×
2387

2388
    ReadConfig rc;
118✔
2389
    rc.start_line = static_cast<std::size_t>(start_line);
118✔
2390
    rc.end_line = static_cast<std::size_t>(end_line);
118✔
2391
    rc.start_byte = static_cast<std::size_t>(start_byte);
118✔
2392
    rc.end_byte = static_cast<std::size_t>(end_byte);
118✔
2393
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
118✔
2394
    if (query_str) rc.query = query_str;
118!
2395

2396
    auto state = std::make_shared<MemoryViewBatchIteratorState>();
118!
2397
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
118!
2398
        static_cast<std::size_t>(memory_budget));
59✔
2399

2400
    Runtime *rt = get_runtime(self);
118!
2401
    std::size_t max_workers = rt->threads();
118!
2402
    constexpr std::size_t LINE_BATCH_SIZE = 1024;
118✔
2403
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
118!
2404
        state->memory_budget_bytes, LINE_BATCH_SIZE * ESTIMATED_BYTES_PER_LINE,
118✔
2405
        max_workers);
59✔
2406
    state->channel =
118✔
2407
        dftracer::utils::coro::make_channel<MemoryViewBatchData>(capacity);
177!
2408
    auto *sp = state.get();
118✔
2409

2410
    try {
2411
        bool is_dir = fs::is_directory(cfg.file_path);
118!
2412
        if (is_dir) {
118✔
2413
            auto handle = rt->scope(
6!
2414
                "iter_lines_parallel",
3!
2415
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
27!
2416
                 checkpoint_size = cfg.checkpoint_size,
6✔
2417
                 auto_build_index = cfg.auto_build_index, rc,
9!
2418
                 max_workers](CoroScope &scope) -> CoroTask<void> {
6!
2419
                    co_await produce_lines_parallel(
24!
2420
                        scope, sp, dir_path, index_dir, checkpoint_size,
9!
2421
                        auto_build_index, rc, LINE_BATCH_SIZE, max_workers);
9!
2422
                });
18!
2423
            state->task_future = handle.future;
6!
2424
        } else {
6✔
2425
            auto handle = rt->submit(
112!
2426
                produce_lines_batched(state, state->channel->producer(), cfg,
224!
2427
                                      rc, LINE_BATCH_SIZE),
56!
2428
                "iter_lines");
224!
2429
            state->task_future = handle.future;
112!
2430
        }
112✔
2431
    } catch (const std::exception &e) {
59!
2432
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2433
        return NULL;
×
2434
    }
×
2435

2436
    TraceReaderIteratorObject *it = make_memoryview_iterator(std::move(state));
118!
2437
    return (PyObject *)it;
118✔
2438
}
121✔
2439

2440
static PyObject *TraceReader_iter_raw(TraceReaderObject *self, PyObject *args,
26✔
2441
                                      PyObject *kwds) {
2442
    static const char *kwlist[] = {"start_line", "end_line",    "start_byte",
2443
                                   "end_byte",   "buffer_size", "line_aligned",
2444
                                   "multi_line", "query",       "memory_budget",
2445
                                   NULL};
2446
    Py_ssize_t start_line = 0, end_line = 0;
26✔
2447
    Py_ssize_t start_byte = 0, end_byte = 0;
26✔
2448
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
26✔
2449
    int line_aligned = 1;
26✔
2450
    int multi_line = 1;
26✔
2451
    const char *query_str = NULL;
26✔
2452
    Py_ssize_t memory_budget = 0;
26✔
2453

2454
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnppzn", (char **)kwlist,
26!
2455
                                     &start_line, &end_line, &start_byte,
2456
                                     &end_byte, &buffer_size, &line_aligned,
2457
                                     &multi_line, &query_str, &memory_budget)) {
2458
        return NULL;
×
2459
    }
2460

2461
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
26!
2462
        buffer_size <= 0) {
24!
2463
        PyErr_SetString(
2!
2464
            PyExc_ValueError,
1✔
2465
            "range arguments must be >= 0; buffer_size must be > 0");
2466
        return NULL;
2✔
2467
    }
2468

2469
    TraceReaderConfig cfg;
24✔
2470
    try {
2471
        cfg = build_config(self);
24!
2472
    } catch (const std::exception &e) {
12!
2473
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2474
        return NULL;
×
2475
    }
×
2476

2477
    ReadConfig rc;
24✔
2478
    rc.start_line = static_cast<std::size_t>(start_line);
24✔
2479
    rc.end_line = static_cast<std::size_t>(end_line);
24✔
2480
    rc.start_byte = static_cast<std::size_t>(start_byte);
24✔
2481
    rc.end_byte = static_cast<std::size_t>(end_byte);
24✔
2482
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
24✔
2483
    rc.line_aligned = line_aligned != 0;
24✔
2484
    rc.multi_line = multi_line != 0;
24✔
2485
    if (query_str) rc.query = query_str;
24!
2486

2487
    auto state = std::make_shared<MemoryViewBatchIteratorState>();
24!
2488
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
24!
2489
        static_cast<std::size_t>(memory_budget));
12✔
2490

2491
    Runtime *rt = get_runtime(self);
24!
2492
    std::size_t max_workers = rt->threads();
24!
2493
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
24!
2494
        state->memory_budget_bytes, ESTIMATED_BYTES_PER_RAW_CHUNK, max_workers);
24✔
2495
    state->channel =
24✔
2496
        dftracer::utils::coro::make_channel<MemoryViewBatchData>(capacity);
36!
2497
    auto *sp = state.get();
24✔
2498

2499
    try {
2500
        bool is_dir = fs::is_directory(cfg.file_path);
24!
2501
        if (is_dir) {
24✔
2502
            auto handle = rt->scope(
4!
2503
                "iter_raw_parallel",
2!
2504
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
18!
2505
                 checkpoint_size = cfg.checkpoint_size,
4✔
2506
                 auto_build_index = cfg.auto_build_index, rc,
6!
2507
                 max_workers](CoroScope &scope) -> CoroTask<void> {
4!
2508
                    co_await produce_raw_parallel(
16!
2509
                        scope, sp, dir_path, index_dir, checkpoint_size,
6!
2510
                        auto_build_index, rc, max_workers);
6!
2511
                });
12!
2512
            state->task_future = handle.future;
4!
2513
        } else {
4✔
2514
            auto handle = rt->submit(
20!
2515
                produce_raw_batched(state, state->channel->producer(), cfg, rc),
30!
2516
                "iter_raw");
40!
2517
            state->task_future = handle.future;
20!
2518
        }
20✔
2519
    } catch (const std::exception &e) {
12!
2520
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2521
        return NULL;
×
2522
    }
×
2523

2524
    TraceReaderIteratorObject *it = make_memoryview_iterator(std::move(state));
24!
2525
    return (PyObject *)it;
24✔
2526
}
25✔
2527

2528
static PyObject *TraceReader_read_lines(TraceReaderObject *self, PyObject *args,
92✔
2529
                                        PyObject *kwds) {
2530
    PyObject *iter = TraceReader_iter_lines(self, args, kwds);
92✔
2531
    if (!iter) return NULL;
92✔
2532
    PyObject *list = PySequence_List(iter);
88✔
2533
    Py_DECREF(iter);
44✔
2534
    return list;
88✔
2535
}
46✔
2536

2537
static PyObject *TraceReader_iter_json(TraceReaderObject *self, PyObject *args,
14✔
2538
                                       PyObject *kwds) {
2539
    static const char *kwlist[] = {"start_line", "end_line",      "start_byte",
2540
                                   "end_byte",   "buffer_size",   "query",
2541
                                   "batch_size", "memory_budget", NULL};
2542
    Py_ssize_t start_line = 0, end_line = 0;
14✔
2543
    Py_ssize_t start_byte = 0, end_byte = 0;
14✔
2544
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
14✔
2545
    const char *query_str = NULL;
14✔
2546
    Py_ssize_t batch_size = 1024;
14✔
2547
    Py_ssize_t memory_budget = 0;
14✔
2548

2549
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnznn", (char **)kwlist,
14!
2550
                                     &start_line, &end_line, &start_byte,
2551
                                     &end_byte, &buffer_size, &query_str,
2552
                                     &batch_size, &memory_budget)) {
2553
        return NULL;
×
2554
    }
2555

2556
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
14!
2557
        buffer_size <= 0 || batch_size <= 0) {
14!
2558
        PyErr_SetString(PyExc_ValueError,
×
2559
                        "range arguments must be >= 0; buffer_size and "
2560
                        "batch_size must be > 0");
2561
        return NULL;
×
2562
    }
2563

2564
    TraceReaderConfig cfg;
14✔
2565
    try {
2566
        cfg = build_config(self);
14!
2567
    } catch (const std::exception &e) {
7!
2568
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2569
        return NULL;
×
2570
    }
×
2571

2572
    ReadConfig rc;
14✔
2573
    rc.start_line = static_cast<std::size_t>(start_line);
14✔
2574
    rc.end_line = static_cast<std::size_t>(end_line);
14✔
2575
    rc.start_byte = static_cast<std::size_t>(start_byte);
14✔
2576
    rc.end_byte = static_cast<std::size_t>(end_byte);
14✔
2577
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
14✔
2578
    if (query_str) rc.query = query_str;
14!
2579

2580
    auto state = std::make_shared<JsonDictIteratorState>();
14!
2581
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
14!
2582
        static_cast<std::size_t>(memory_budget));
7✔
2583

2584
    Runtime *rt = get_runtime(self);
14!
2585
    std::size_t max_workers = rt->threads();
14!
2586
    auto bs = static_cast<std::size_t>(batch_size);
14✔
2587
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
14!
2588
        state->memory_budget_bytes, bs * ESTIMATED_BYTES_PER_JSON_EVENT,
14✔
2589
        max_workers);
7✔
2590
    state->channel =
14✔
2591
        dftracer::utils::coro::make_channel<JsonDictBatch>(capacity);
21!
2592
    auto *sp = state.get();
14✔
2593

2594
    try {
2595
        bool is_dir = fs::is_directory(cfg.file_path);
14!
2596
        if (is_dir) {
14✔
2597
            auto handle = rt->scope(
12!
2598
                "iter_json_parallel",
6!
2599
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
54!
2600
                 checkpoint_size = cfg.checkpoint_size,
12✔
2601
                 auto_build_index = cfg.auto_build_index, rc, bs,
18!
2602
                 max_workers](CoroScope &scope) -> CoroTask<void> {
12!
2603
                    co_await produce_json_dicts_parallel(
48!
2604
                        scope, sp, dir_path, index_dir, checkpoint_size,
18!
2605
                        auto_build_index, rc, bs, max_workers);
18!
2606
                });
36!
2607
            state->task_future = handle.future;
12!
2608
        } else {
12✔
2609
            auto handle =
2610
                rt->submit(produce_json_dicts(state, state->channel->producer(),
5!
2611
                                              cfg, rc, bs),
1!
2612
                           "iter_json");
4!
2613
            state->task_future = handle.future;
2!
2614
        }
2✔
2615
    } catch (const std::exception &e) {
7!
2616
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2617
        return NULL;
×
2618
    }
×
2619

2620
    TraceReaderIteratorObject *it = make_json_dict_iterator(std::move(state));
14!
2621
    return (PyObject *)it;
14✔
2622
}
14✔
2623

2624
static PyObject *TraceReader_read_json_py(TraceReaderObject *self,
2✔
2625
                                          PyObject *args, PyObject *kwds) {
2626
    PyObject *iter = TraceReader_iter_json(self, args, kwds);
2✔
2627
    if (!iter) return NULL;
2✔
2628
    PyObject *list = PySequence_List(iter);
2✔
2629
    Py_DECREF(iter);
1✔
2630
    return list;
2✔
2631
}
1✔
2632

2633
static PyObject *TraceReader_read_raw(TraceReaderObject *self, PyObject *args,
8✔
2634
                                      PyObject *kwds) {
2635
    PyObject *iter = TraceReader_iter_raw(self, args, kwds);
8✔
2636
    if (!iter) return NULL;
8✔
2637
    PyObject *list = PySequence_List(iter);
8✔
2638
    Py_DECREF(iter);
4✔
2639
    return list;
8✔
2640
}
4✔
2641

2642
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2643

2644
static PyObject *TraceReader_iter_arrow(TraceReaderObject *self, PyObject *args,
54✔
2645
                                        PyObject *kwds) {
2646
    static const char *kwlist[] = {
2647
        "batch_size", "start_line",    "end_line", "start_byte",
2648
        "end_byte",   "buffer_size",   "query",    "flatten_objects",
2649
        "normalize",  "memory_budget", NULL};
2650
    Py_ssize_t batch_size = 10000;
54✔
2651
    Py_ssize_t start_line = 0, end_line = 0;
54✔
2652
    Py_ssize_t start_byte = 0, end_byte = 0;
54✔
2653
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
54✔
2654
    const char *query_str = NULL;
54✔
2655
    int flatten_objects = 1;  // default: expand top-level objects
54✔
2656
    int normalize = 0;
54✔
2657
    Py_ssize_t memory_budget = 0;
54✔
2658

2659
    if (!PyArg_ParseTupleAndKeywords(
54!
2660
            args, kwds, "|nnnnnnzppn", (char **)kwlist, &batch_size,
27✔
2661
            &start_line, &end_line, &start_byte, &end_byte, &buffer_size,
2662
            &query_str, &flatten_objects, &normalize, &memory_budget)) {
2663
        return NULL;
×
2664
    }
2665

2666
    if (batch_size <= 0) {
54!
2667
        PyErr_SetString(PyExc_ValueError, "batch_size must be > 0");
×
2668
        return NULL;
×
2669
    }
2670
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
54!
2671
        buffer_size <= 0) {
54!
2672
        PyErr_SetString(
×
2673
            PyExc_ValueError,
2674
            "range arguments must be >= 0; buffer_size must be > 0");
2675
        return NULL;
×
2676
    }
2677

2678
    TraceReaderConfig cfg;
54✔
2679
    try {
2680
        cfg = build_config(self);
54!
2681
    } catch (const std::exception &e) {
27!
2682
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2683
        return NULL;
×
2684
    }
×
2685

2686
    ReadConfig rc;
54✔
2687
    rc.start_line = static_cast<std::size_t>(start_line);
54✔
2688
    rc.end_line = static_cast<std::size_t>(end_line);
54✔
2689
    rc.start_byte = static_cast<std::size_t>(start_byte);
54✔
2690
    rc.end_byte = static_cast<std::size_t>(end_byte);
54✔
2691
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
54✔
2692
    rc.flatten_objects = flatten_objects != 0;
54✔
2693
    if (query_str) rc.query = query_str;
54!
2694

2695
    auto state = std::make_shared<ArrowIteratorState>();
54!
2696
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
54!
2697
        static_cast<std::size_t>(memory_budget));
27✔
2698

2699
    Runtime *rt = get_runtime(self);
54!
2700
    std::size_t max_workers = rt->threads();
54!
2701
    auto bs = static_cast<std::size_t>(batch_size);
54✔
2702
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
54!
2703
        state->memory_budget_bytes, bs * ESTIMATED_BYTES_PER_ARROW_ROW,
54✔
2704
        max_workers);
27✔
2705
    state->channel =
54✔
2706
        dftracer::utils::coro::make_channel<ArrowIteratorState::BatchType>(
81!
2707
            capacity);
54✔
2708
    auto *sp = state.get();
54✔
2709

2710
    try {
2711
        bool is_dir = fs::is_directory(cfg.file_path);
54!
2712
        if (is_dir) {
54✔
2713
            auto handle = rt->scope(
10!
2714
                "iter_arrow_parallel",
5!
2715
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
45!
2716
                 checkpoint_size = cfg.checkpoint_size,
10✔
2717
                 auto_build_index = cfg.auto_build_index, rc, bs,
15!
2718
                 norm = normalize != 0,
10✔
2719
                 max_workers](CoroScope &scope) -> CoroTask<void> {
10!
2720
                    co_await produce_arrow_batches_parallel(
40!
2721
                        scope, sp, dir_path, index_dir, checkpoint_size,
15!
2722
                        auto_build_index, rc, bs, norm, max_workers);
15!
2723
                });
30!
2724
            state->task_future = handle.future;
10!
2725
        } else if (normalize) {
54!
2726
            auto handle = rt->submit(
×
2727
                produce_arrow_batches(state, state->channel->producer(), cfg,
×
2728
                                      rc, static_cast<std::size_t>(batch_size),
×
2729
                                      flatten_objects != 0, normalize != 0),
2730
                "iter_arrow");
×
2731
            state->task_future = handle.future;
×
2732
        } else {
×
2733
            std::vector<std::string> files_vec{cfg.file_path};
88!
2734
            auto handle = rt->scope(
44!
2735
                "iter_arrow_parallel",
22!
2736
                [sp, files = std::move(files_vec), index_dir = cfg.index_dir,
198!
2737
                 checkpoint_size = cfg.checkpoint_size,
44✔
2738
                 auto_build_index = cfg.auto_build_index, rc, bs,
66!
2739
                 norm = normalize != 0,
44✔
2740
                 max_workers](CoroScope &scope) mutable -> CoroTask<void> {
44!
2741
                    co_await produce_arrow_batches_for_files(
176!
2742
                        scope, sp, std::move(files), index_dir, checkpoint_size,
66!
2743
                        auto_build_index, rc, bs, norm, max_workers);
66!
2744
                });
132!
2745
            state->task_future = handle.future;
44!
2746
        }
44✔
2747
    } catch (const std::exception &e) {
27!
2748
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2749
        return NULL;
×
2750
    }
×
2751

2752
    TraceReaderIteratorObject *it = make_arrow_iterator(std::move(state));
54!
2753
    return (PyObject *)it;
54✔
2754
}
54✔
2755

2756
// Build ArrowIteratorState + spawn the producer task. Same plumbing as
2757
// TraceReader_iter_arrow but returns the state so callers can wrap it as
2758
// either a per-batch iterator or an ArrowArrayStream.
2759
static std::shared_ptr<ArrowIteratorState> spawn_arrow_producer(
54✔
2760
    TraceReaderObject *self, PyObject *args, PyObject *kwds) {
2761
    static const char *kwlist[] = {
2762
        "batch_size", "start_line",    "end_line", "start_byte",
2763
        "end_byte",   "buffer_size",   "query",    "flatten_objects",
2764
        "normalize",  "memory_budget", NULL};
2765
    Py_ssize_t batch_size = 10000;
54✔
2766
    Py_ssize_t start_line = 0, end_line = 0;
54✔
2767
    Py_ssize_t start_byte = 0, end_byte = 0;
54✔
2768
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
54✔
2769
    const char *query_str = NULL;
54✔
2770
    int flatten_objects = 1;  // default: expand top-level objects
54✔
2771
    int normalize = 0;
54✔
2772
    Py_ssize_t memory_budget = 0;
54✔
2773

2774
    if (!PyArg_ParseTupleAndKeywords(
54!
2775
            args, kwds, "|nnnnnnzppn", (char **)kwlist, &batch_size,
27✔
2776
            &start_line, &end_line, &start_byte, &end_byte, &buffer_size,
2777
            &query_str, &flatten_objects, &normalize, &memory_budget)) {
2778
        return nullptr;
×
2779
    }
2780

2781
    if (batch_size <= 0) {
54!
2782
        PyErr_SetString(PyExc_ValueError, "batch_size must be > 0");
×
2783
        return nullptr;
×
2784
    }
2785
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
54!
2786
        buffer_size <= 0) {
54!
2787
        PyErr_SetString(
×
2788
            PyExc_ValueError,
2789
            "range arguments must be >= 0; buffer_size must be > 0");
2790
        return nullptr;
×
2791
    }
2792

2793
    TraceReaderConfig cfg;
54✔
2794
    try {
2795
        cfg = build_config(self);
54!
2796
    } catch (const std::exception &e) {
27!
2797
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2798
        return nullptr;
×
2799
    }
×
2800

2801
    ReadConfig rc;
54✔
2802
    rc.start_line = static_cast<std::size_t>(start_line);
54✔
2803
    rc.end_line = static_cast<std::size_t>(end_line);
54✔
2804
    rc.start_byte = static_cast<std::size_t>(start_byte);
54✔
2805
    rc.end_byte = static_cast<std::size_t>(end_byte);
54✔
2806
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
54✔
2807
    rc.flatten_objects = flatten_objects != 0;
54✔
2808
    if (query_str) rc.query = query_str;
54!
2809

2810
    auto state = std::make_shared<ArrowIteratorState>();
54!
2811
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
54!
2812
        static_cast<std::size_t>(memory_budget));
27✔
2813

2814
    Runtime *rt = get_runtime(self);
54!
2815
    std::size_t max_workers = rt->threads();
54!
2816
    auto bs = static_cast<std::size_t>(batch_size);
54✔
2817
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
54!
2818
        state->memory_budget_bytes, bs * ESTIMATED_BYTES_PER_ARROW_ROW,
54✔
2819
        max_workers);
27✔
2820
    state->channel =
54✔
2821
        dftracer::utils::coro::make_channel<ArrowIteratorState::BatchType>(
81!
2822
            capacity);
54✔
2823
    auto *sp = state.get();
54✔
2824

2825
    try {
2826
        bool is_dir = fs::is_directory(cfg.file_path);
54!
2827
        if (is_dir) {
54✔
2828
            auto handle = rt->scope(
20!
2829
                "iter_arrow_parallel",
10!
2830
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
90!
2831
                 checkpoint_size = cfg.checkpoint_size,
20✔
2832
                 auto_build_index = cfg.auto_build_index, rc, bs,
30!
2833
                 norm = normalize != 0,
20✔
2834
                 max_workers](CoroScope &scope) -> CoroTask<void> {
20!
2835
                    co_await produce_arrow_batches_parallel(
80!
2836
                        scope, sp, dir_path, index_dir, checkpoint_size,
30!
2837
                        auto_build_index, rc, bs, norm, max_workers);
30!
2838
                });
60!
2839
            state->task_future = handle.future;
20!
2840
        } else {
20✔
2841
            auto handle = rt->submit(
34!
2842
                produce_arrow_batches(state, state->channel->producer(), cfg,
68!
2843
                                      rc, static_cast<std::size_t>(batch_size),
17!
2844
                                      flatten_objects != 0, normalize != 0),
17✔
2845
                "iter_arrow");
68!
2846
            state->task_future = handle.future;
34!
2847
        }
34✔
2848
    } catch (const std::exception &e) {
27!
2849
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2850
        return nullptr;
×
2851
    }
×
2852

2853
    return state;
54✔
2854
}
54✔
2855

2856
static PyObject *TraceReader_iter_arrow_stream(TraceReaderObject *self,
28✔
2857
                                               PyObject *args, PyObject *kwds) {
2858
    auto state = spawn_arrow_producer(self, args, kwds);
28!
2859
    if (!state) return NULL;
28!
2860
    return make_arrow_batch_stream(std::move(state));
28!
2861
}
28✔
2862

2863
static PyObject *TraceReader_read_arrow(TraceReaderObject *self, PyObject *args,
26✔
2864
                                        PyObject *kwds) {
2865
    auto state = spawn_arrow_producer(self, args, kwds);
26!
2866
    if (!state) return NULL;
26!
2867
    PyObject *stream = make_arrow_batch_stream(std::move(state));
26!
2868
    if (!stream) return NULL;
26✔
2869
    return dftracer::utils::python::wrap_arrow_stream_table(stream);
26!
2870
}
26✔
2871

2872
#endif  // DFTRACER_UTILS_ENABLE_ARROW
2873

2874
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
2875

2876
static int parse_str_list_trace(PyObject *obj, std::vector<std::string> &out,
2877
                                const char *param_name) {
2878
    if (!obj || obj == Py_None) return 0;
×
2879
    if (!PyList_Check(obj)) {
×
2880
        PyErr_Format(PyExc_TypeError, "%s must be a list of str", param_name);
2881
        return -1;
2882
    }
2883
    Py_ssize_t n = PyList_Size(obj);
2884
    for (Py_ssize_t i = 0; i < n; i++) {
×
2885
        const char *s = PyUnicode_AsUTF8(PyList_GetItem(obj, i));
×
2886
        if (!s) return -1;
×
2887
        out.emplace_back(s);
×
2888
    }
2889
    return 0;
2890
}
2891

2892
static PyObject *TraceReader_write_arrow(TraceReaderObject *self,
26✔
2893
                                         PyObject *args, PyObject *kwds) {
2894
    static const char *kwlist[] = {"path",        "views",      "chunk_size_mb",
2895
                                   "compression", "batch_size", NULL};
2896
    const char *path = NULL;
26✔
2897
    PyObject *views_obj = Py_None;
26✔
2898
    int chunk_size_mb = 32;
26✔
2899
    const char *compression_str = "zstd";
26✔
2900
    Py_ssize_t batch_size = 10000;
26✔
2901

2902
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|Oisn", (char **)kwlist,
26!
2903
                                     &path, &views_obj, &chunk_size_mb,
2904
                                     &compression_str, &batch_size)) {
2905
        return NULL;
×
2906
    }
2907

2908
    if (chunk_size_mb < 0) {
26✔
2909
        PyErr_SetString(PyExc_ValueError, "chunk_size_mb must be >= 0");
×
2910
        return NULL;
×
2911
    }
2912

2913
    std::vector<ViewDefinition> views;
26✔
2914
    if (views_obj && views_obj != Py_None) {
26!
2915
        if (!PyList_Check(views_obj)) {
10!
2916
            PyErr_SetString(PyExc_TypeError, "views must be a list or None");
×
2917
            return NULL;
×
2918
        }
2919
        Py_ssize_t n = PyList_Size(views_obj);
10!
2920
        for (Py_ssize_t i = 0; i < n; i++) {
22✔
2921
            PyObject *item = PyList_GetItem(views_obj, i);
12!
2922
            ViewDefinition vd;
12✔
2923

2924
            if (PyUnicode_Check(item)) {
12✔
2925
                const char *name = PyUnicode_AsUTF8(item);
2!
2926
                if (!name) return NULL;
2✔
2927
                std::string name_str(name);
2!
2928
                if (name_str == "io") {
2!
2929
                    vd = ViewDefinition::io_view();
2!
2930
                } else if (name_str == "compute") {
1!
2931
                    vd = ViewDefinition::compute_view();
×
2932
                } else if (name_str == "dlio") {
×
2933
                    vd = ViewDefinition::dlio_view();
×
2934
                } else {
2935
                    vd.with_name(name_str);
×
2936
                }
2937
            } else if (PyDict_Check(item)) {
12!
2938
                PyObject *name_obj = PyDict_GetItemString(item, "name");
10!
2939
                if (!name_obj || !PyUnicode_Check(name_obj)) {
10!
2940
                    PyErr_SetString(PyExc_ValueError,
×
2941
                                    "view dict must have 'name' string");
2942
                    return NULL;
×
2943
                }
2944
                vd.with_name(PyUnicode_AsUTF8(name_obj));
10!
2945

2946
                PyObject *query_obj = PyDict_GetItemString(item, "query");
10!
2947
                if (query_obj && query_obj != Py_None) {
10!
2948
                    if (!PyUnicode_Check(query_obj)) {
10!
2949
                        PyErr_SetString(PyExc_ValueError,
×
2950
                                        "view 'query' must be a string");
2951
                        return NULL;
×
2952
                    }
2953
                    vd.with_query(PyUnicode_AsUTF8(query_obj));
10!
2954
                }
5✔
2955

2956
                PyObject *meta_obj =
5✔
2957
                    PyDict_GetItemString(item, "include_metadata");
10!
2958
                if (meta_obj && meta_obj != Py_None) {
10!
2959
                    vd.with_include_metadata(PyObject_IsTrue(meta_obj));
2!
2960
                }
1✔
2961
            } else {
5✔
2962
                PyErr_SetString(PyExc_TypeError,
×
2963
                                "views list must contain strings or dicts");
2964
                return NULL;
×
2965
            }
2966
            views.push_back(std::move(vd));
12!
2967
        }
12✔
2968
    }
5✔
2969

2970
    IpcCompression compression = IpcCompression::ZSTD;
26✔
2971
    if (compression_str) {
26!
2972
        std::string comp_lower(compression_str);
26!
2973
        for (auto &c : comp_lower) c = std::tolower(c);
130!
2974
        if (comp_lower == "none") {
26✔
2975
            compression = IpcCompression::NONE;
2✔
2976
        } else if (comp_lower == "zstd") {
25✔
2977
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
2978
            compression = IpcCompression::ZSTD;
24✔
2979
#else
2980
            PyErr_SetString(
2981
                PyExc_ValueError,
2982
                "ZSTD compression not available (built without ZSTD)");
2983
            return NULL;
2984
#endif
2985
        } else {
12✔
2986
            PyErr_Format(PyExc_ValueError,
×
2987
                         "Unknown compression: %s (use 'none' or 'zstd')",
2988
                         compression_str);
2989
            return NULL;
×
2990
        }
2991
    }
26✔
2992

2993
    int64_t chunk_size_bytes =
26✔
2994
        static_cast<int64_t>(chunk_size_mb) * 1024 * 1024;
26✔
2995

2996
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
26!
2997
    std::string index_path;
26✔
2998
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
26!
2999
    if (idx && idx[0] != '\0') {
26!
3000
        index_path = idx;
×
3001
    }
3002
    std::size_t checkpoint_size = self->checkpoint_size;
26✔
3003

3004
    std::string output_path(path);
26!
3005
    WriteArrowResult result;
26✔
3006
    std::string error_msg;
26✔
3007

3008
    Py_BEGIN_ALLOW_THREADS try {
26!
3009
        Runtime *rt = get_runtime(self);
26!
3010
        result =
13✔
3011
            rt->submit(write_arrow_pipeline(
78!
3012
                           file_path, index_path, checkpoint_size,
13!
3013
                           std::move(views), output_path, chunk_size_bytes,
26!
3014
                           compression, static_cast<std::size_t>(batch_size)),
13✔
3015
                       "write_arrow")
13!
3016
                .get();
26!
3017
    } catch (const std::exception &e) {
13!
3018
        error_msg = e.what();
×
3019
    }
×
3020
    Py_END_ALLOW_THREADS
26!
3021

3022
        if (!error_msg.empty()) {
26!
3023
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
3024
        return NULL;
×
3025
    }
3026

3027
    if (!result.error.empty()) {
26!
3028
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
3029
        return NULL;
×
3030
    }
3031

3032
    // Build result dict
3033
    PyObject *dict = PyDict_New();
26!
3034
    if (!dict) return NULL;
26✔
3035

3036
    // Build files list per partition
3037
    PyObject *partitions_dict = PyDict_New();
26!
3038
    if (!partitions_dict) {
26!
3039
        Py_DECREF(dict);
×
3040
        return NULL;
×
3041
    }
3042

3043
    for (const auto &[partition_name, partition_stats] :
54!
3044
         result.stats.partitions) {
53✔
3045
        PyObject *partition_dict = PyDict_New();
28!
3046
        if (!partition_dict) {
28!
3047
            Py_DECREF(partitions_dict);
×
3048
            Py_DECREF(dict);
×
3049
            return NULL;
×
3050
        }
3051

3052
        PyObject *files_list = PyList_New(0);
28!
3053
        if (!files_list) {
28!
3054
            Py_DECREF(partition_dict);
×
3055
            Py_DECREF(partitions_dict);
×
3056
            Py_DECREF(dict);
×
3057
            return NULL;
×
3058
        }
3059

3060
        for (const auto &f : partition_stats.files) {
44✔
3061
            PyObject *file_str = PyUnicode_FromString(f.c_str());
16!
3062
            if (!file_str || PyList_Append(files_list, file_str) < 0) {
16!
3063
                Py_XDECREF(file_str);
×
3064
                Py_DECREF(files_list);
×
3065
                Py_DECREF(partition_dict);
×
3066
                Py_DECREF(partitions_dict);
×
3067
                Py_DECREF(dict);
×
3068
                return NULL;
×
3069
            }
3070
            Py_DECREF(file_str);
8!
3071
        }
3072

3073
        PyDict_SetItemString(partition_dict, "files", files_list);
28!
3074
        dict_set_steal(partition_dict, "rows",
28!
3075
                       PyLong_FromLongLong(partition_stats.total_rows));
28!
3076
        dict_set_steal(
28!
3077
            partition_dict, "bytes",
14✔
3078
            PyLong_FromLongLong(partition_stats.total_uncompressed_bytes));
28!
3079
        Py_DECREF(files_list);
14!
3080

3081
        PyObject *key = partition_name.empty()
42!
3082
                            ? PyUnicode_FromString("_default")
14!
3083
                            : PyUnicode_FromString(partition_name.c_str());
28!
3084
        PyDict_SetItem(partitions_dict, key, partition_dict);
28!
3085
        Py_DECREF(key);
14!
3086
        Py_DECREF(partition_dict);
14!
3087
    }
3088

3089
    PyDict_SetItemString(dict, "partitions", partitions_dict);
26!
3090
    dict_set_steal(dict, "total_rows",
26!
3091
                   PyLong_FromLongLong(result.stats.total_rows));
26!
3092
    dict_set_steal(dict, "total_bytes",
26!
3093
                   PyLong_FromLongLong(result.stats.total_uncompressed_bytes));
26!
3094
    dict_set_steal(dict, "chunks_scanned",
26!
3095
                   PyLong_FromUnsignedLongLong(result.chunks_scanned));
26!
3096
    dict_set_steal(dict, "chunks_skipped",
26!
3097
                   PyLong_FromUnsignedLongLong(result.chunks_skipped));
26!
3098
    Py_DECREF(partitions_dict);
13!
3099

3100
    return dict;
26✔
3101
}
26✔
3102

3103
static PyObject *TraceReader_get_view_chunks(TraceReaderObject *self,
6✔
3104
                                             PyObject *args, PyObject *kwds) {
3105
    static const char *kwlist[] = {"view", NULL};
3106
    PyObject *view_obj = Py_None;
6✔
3107

3108
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char **)kwlist,
6!
3109
                                     &view_obj)) {
3110
        return NULL;
×
3111
    }
3112

3113
    ViewDefinition view;
6✔
3114
    if (view_obj && view_obj != Py_None) {
6!
3115
        if (PyUnicode_Check(view_obj)) {
2!
3116
            const char *name = PyUnicode_AsUTF8(view_obj);
×
3117
            if (!name) return NULL;
×
3118
            std::string name_str(name);
×
3119
            if (name_str == "io") {
×
3120
                view = ViewDefinition::io_view();
×
3121
            } else if (name_str == "compute") {
×
3122
                view = ViewDefinition::compute_view();
×
3123
            } else if (name_str == "dlio") {
×
3124
                view = ViewDefinition::dlio_view();
×
3125
            } else {
3126
                view.with_name(name_str);
×
3127
            }
3128
        } else if (PyDict_Check(view_obj)) {
2!
3129
            PyObject *name_obj = PyDict_GetItemString(view_obj, "name");
2!
3130
            if (name_obj && PyUnicode_Check(name_obj)) {
2!
3131
                view.with_name(PyUnicode_AsUTF8(name_obj));
2!
3132
            }
1✔
3133
            PyObject *query_obj = PyDict_GetItemString(view_obj, "query");
2!
3134
            if (query_obj && query_obj != Py_None &&
3!
3135
                PyUnicode_Check(query_obj)) {
2✔
3136
                view.with_query(PyUnicode_AsUTF8(query_obj));
2!
3137
            }
1✔
3138
        } else {
1✔
3139
            PyErr_SetString(PyExc_TypeError, "view must be a string or dict");
×
3140
            return NULL;
×
3141
        }
3142
    }
1✔
3143

3144
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
6!
3145
    std::string index_path;
6✔
3146
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
6!
3147
    if (idx && idx[0] != '\0') {
6!
3148
        index_path = idx;
×
3149
    }
3150
    std::size_t checkpoint_size = self->checkpoint_size;
6✔
3151

3152
    GetViewChunksResult result;
6✔
3153
    std::string error_msg;
6✔
3154

3155
    Py_BEGIN_ALLOW_THREADS try {
6!
3156
        Runtime *rt = get_runtime(self);
6!
3157
        result = rt->submit(get_view_chunks_pipeline(file_path, index_path,
18!
3158
                                                     checkpoint_size, view),
3!
3159
                            "get_view_chunks")
3!
3160
                     .get();
6!
3161
    } catch (const std::exception &e) {
3!
3162
        error_msg = e.what();
×
3163
    }
×
3164
    Py_END_ALLOW_THREADS
6!
3165

3166
        if (!error_msg.empty()) {
6!
3167
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
3168
        return NULL;
×
3169
    }
3170

3171
    if (!result.error.empty()) {
6!
3172
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
3173
        return NULL;
×
3174
    }
3175

3176
    PyObject *dict = PyDict_New();
6!
3177
    if (!dict) return NULL;
6!
3178

3179
    PyObject *chunks_list = PyList_New(result.chunks.size());
6!
3180
    if (!chunks_list) {
6!
3181
        Py_DECREF(dict);
×
3182
        return NULL;
×
3183
    }
3184

3185
    for (std::size_t i = 0; i < result.chunks.size(); ++i) {
14✔
3186
        const auto &chunk = result.chunks[i];
8✔
3187
        PyObject *chunk_dict = PyDict_New();
8!
3188
        if (!chunk_dict) {
8!
3189
            Py_DECREF(chunks_list);
×
3190
            Py_DECREF(dict);
×
3191
            return NULL;
×
3192
        }
3193
        dict_set_steal(chunk_dict, "checkpoint_idx",
8!
3194
                       PyLong_FromUnsignedLongLong(chunk.checkpoint_idx));
8!
3195
        dict_set_steal(chunk_dict, "start_byte",
8!
3196
                       PyLong_FromSize_t(chunk.start_byte));
8!
3197
        dict_set_steal(chunk_dict, "end_byte",
8!
3198
                       PyLong_FromSize_t(chunk.end_byte));
8!
3199
        PyList_SetItem(chunks_list, i, chunk_dict);
8!
3200
    }
4✔
3201

3202
    PyDict_SetItemString(dict, "chunks", chunks_list);
6!
3203
    dict_set_steal(dict, "total_checkpoints",
6!
3204
                   PyLong_FromUnsignedLongLong(result.total_checkpoints));
6!
3205
    dict_set_steal(dict, "skipped_checkpoints",
6!
3206
                   PyLong_FromUnsignedLongLong(result.skipped_checkpoints));
6!
3207
    dict_set_steal(dict, "file_may_match",
6!
3208
                   PyBool_FromLong(result.file_may_match ? 1 : 0));
6✔
3209
    Py_DECREF(chunks_list);
3!
3210

3211
    return dict;
6✔
3212
}
6✔
3213

3214
static PyObject *TraceReader_write_view_chunk(TraceReaderObject *self,
2✔
3215
                                              PyObject *args, PyObject *kwds) {
3216
    static const char *kwlist[] = {
3217
        "output_file", "checkpoint_idx", "start_byte", "end_byte",
3218
        "view",        "compression",    "batch_size", NULL};
3219
    const char *output_file = NULL;
2✔
3220
    unsigned long long checkpoint_idx = 0;
2✔
3221
    Py_ssize_t start_byte = 0;
2✔
3222
    Py_ssize_t end_byte = 0;
2✔
3223
    PyObject *view_obj = Py_None;
2✔
3224
    const char *compression_str = "zstd";
2✔
3225
    Py_ssize_t batch_size = 10000;
2✔
3226

3227
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "sKnn|Osn", (char **)kwlist,
2!
3228
                                     &output_file, &checkpoint_idx, &start_byte,
3229
                                     &end_byte, &view_obj, &compression_str,
3230
                                     &batch_size)) {
3231
        return NULL;
×
3232
    }
3233

3234
    IpcCompression compression = IpcCompression::ZSTD;
2✔
3235
    if (compression_str) {
2✔
3236
        std::string comp_lower(compression_str);
2!
3237
        for (auto &c : comp_lower) c = std::tolower(c);
10!
3238
        if (comp_lower == "none") {
2!
3239
            compression = IpcCompression::NONE;
×
3240
        } else if (comp_lower == "zstd") {
2✔
3241
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
3242
            compression = IpcCompression::ZSTD;
2✔
3243
#else
3244
            PyErr_SetString(PyExc_ValueError, "ZSTD compression not available");
3245
            return NULL;
3246
#endif
3247
        }
1✔
3248
    }
2✔
3249

3250
    ViewDefinition view;
2✔
3251
    if (view_obj && view_obj != Py_None) {
2!
3252
        if (PyUnicode_Check(view_obj)) {
×
3253
            const char *name = PyUnicode_AsUTF8(view_obj);
×
3254
            if (!name) return NULL;
×
3255
            std::string name_str(name);
×
3256
            if (name_str == "io") {
×
3257
                view = ViewDefinition::io_view();
×
3258
            } else if (name_str == "compute") {
×
3259
                view = ViewDefinition::compute_view();
×
3260
            } else if (name_str == "dlio") {
×
3261
                view = ViewDefinition::dlio_view();
×
3262
            } else {
3263
                view.with_name(name_str);
×
3264
            }
3265
        } else if (PyDict_Check(view_obj)) {
×
3266
            PyObject *name_obj = PyDict_GetItemString(view_obj, "name");
×
3267
            if (name_obj && PyUnicode_Check(name_obj)) {
×
3268
                view.with_name(PyUnicode_AsUTF8(name_obj));
×
3269
            }
3270
            PyObject *query_obj = PyDict_GetItemString(view_obj, "query");
×
3271
            if (query_obj && query_obj != Py_None &&
×
3272
                PyUnicode_Check(query_obj)) {
×
3273
                view.with_query(PyUnicode_AsUTF8(query_obj));
×
3274
            }
3275
        }
3276
    }
3277

3278
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
2!
3279
    std::string index_path;
2✔
3280
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
2!
3281
    if (idx && idx[0] != '\0') {
2!
3282
        index_path = idx;
×
3283
    }
3284
    std::size_t checkpoint_size = self->checkpoint_size;
2✔
3285

3286
    WriteViewChunkResult result;
2✔
3287
    std::string error_msg;
2✔
3288

3289
    Py_BEGIN_ALLOW_THREADS try {
2!
3290
        Runtime *rt = get_runtime(self);
2!
3291
        result =
1✔
3292
            rt->submit(write_view_chunk_pipeline(
7!
3293
                           file_path, index_path, checkpoint_size, view,
1!
3294
                           checkpoint_idx, static_cast<std::size_t>(start_byte),
1✔
3295
                           static_cast<std::size_t>(end_byte),
1✔
3296
                           std::string(output_file), compression,
3!
3297
                           static_cast<std::size_t>(batch_size)),
1✔
3298
                       "write_view_chunk")
1!
3299
                .get();
2!
3300
    } catch (const std::exception &e) {
1!
3301
        error_msg = e.what();
×
3302
    }
×
3303
    Py_END_ALLOW_THREADS
2!
3304

3305
        if (!error_msg.empty()) {
2!
3306
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
3307
        return NULL;
×
3308
    }
3309

3310
    if (!result.error.empty()) {
2!
3311
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
3312
        return NULL;
×
3313
    }
3314

3315
    PyObject *dict = PyDict_New();
2!
3316
    if (!dict) return NULL;
2✔
3317

3318
    dict_set_steal(dict, "output_file",
2!
3319
                   PyUnicode_FromString(result.output_file.c_str()));
1!
3320
    dict_set_steal(dict, "events_matched",
2!
3321
                   PyLong_FromUnsignedLongLong(result.events_matched));
2!
3322
    dict_set_steal(dict, "events_scanned",
2!
3323
                   PyLong_FromUnsignedLongLong(result.events_scanned));
2!
3324
    dict_set_steal(dict, "rows_written",
2!
3325
                   PyLong_FromLongLong(result.rows_written));
2!
3326
    dict_set_steal(dict, "bytes_written",
2!
3327
                   PyLong_FromLongLong(result.bytes_written));
2!
3328

3329
    return dict;
2✔
3330
}
2✔
3331

3332
static PyObject *TraceReader_write_view_chunks(TraceReaderObject *self,
2✔
3333
                                               PyObject *args, PyObject *kwds) {
3334
    static const char *kwlist[] = {"chunks",      "output_dir", "view",
3335
                                   "compression", "batch_size", NULL};
3336
    PyObject *chunks_list = NULL;
2✔
3337
    const char *output_dir = NULL;
2✔
3338
    PyObject *view_obj = Py_None;
2✔
3339
    const char *compression_str = "zstd";
2✔
3340
    Py_ssize_t batch_size = 10000;
2✔
3341

3342
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "Os|Osn", (char **)kwlist,
2!
3343
                                     &chunks_list, &output_dir, &view_obj,
3344
                                     &compression_str, &batch_size)) {
3345
        return NULL;
×
3346
    }
3347

3348
    if (!PyList_Check(chunks_list)) {
2✔
3349
        PyErr_SetString(PyExc_TypeError, "chunks must be a list");
×
3350
        return NULL;
×
3351
    }
3352

3353
    IpcCompression compression = IpcCompression::ZSTD;
2✔
3354
    if (strcmp(compression_str, "none") == 0) {
2!
3355
        compression = IpcCompression::NONE;
×
3356
    } else if (strcmp(compression_str, "zstd") != 0) {
2!
3357
        PyErr_SetString(PyExc_ValueError,
×
3358
                        "compression must be 'zstd' or 'none'");
3359
        return NULL;
×
3360
    }
3361

3362
    ViewDefinition view;
2✔
3363
    if (view_obj && view_obj != Py_None) {
2!
3364
        if (PyUnicode_Check(view_obj)) {
×
3365
            const char *name = PyUnicode_AsUTF8(view_obj);
×
3366
            if (!name) return NULL;
×
3367
            std::string name_str(name);
×
3368
            if (name_str == "io") {
×
3369
                view = ViewDefinition::io_view();
×
3370
            } else if (name_str == "compute") {
×
3371
                view = ViewDefinition::compute_view();
×
3372
            } else if (name_str == "dlio") {
×
3373
                view = ViewDefinition::dlio_view();
×
3374
            } else {
3375
                view.with_name(name_str);
×
3376
            }
3377
        } else if (PyDict_Check(view_obj)) {
×
3378
            PyObject *name_obj = PyDict_GetItemString(view_obj, "name");
×
3379
            if (name_obj && PyUnicode_Check(name_obj)) {
×
3380
                view.with_name(PyUnicode_AsUTF8(name_obj));
×
3381
            }
3382
            PyObject *query_obj = PyDict_GetItemString(view_obj, "query");
×
3383
            if (query_obj && query_obj != Py_None &&
×
3384
                PyUnicode_Check(query_obj)) {
×
3385
                view.with_query(PyUnicode_AsUTF8(query_obj));
×
3386
            }
3387
        }
3388
    }
3389

3390
    std::vector<ChunkDescriptor> chunks;
2✔
3391
    Py_ssize_t num_chunks = PyList_Size(chunks_list);
2!
3392
    chunks.reserve(static_cast<std::size_t>(num_chunks));
2!
3393

3394
    for (Py_ssize_t i = 0; i < num_chunks; i++) {
8✔
3395
        PyObject *chunk_dict = PyList_GetItem(chunks_list, i);
6!
3396
        if (!PyDict_Check(chunk_dict)) {
6!
3397
            PyErr_SetString(PyExc_TypeError, "each chunk must be a dict");
×
3398
            return NULL;
×
3399
        }
3400

3401
        ChunkDescriptor desc;
6✔
3402

3403
        PyObject *cp_idx = PyDict_GetItemString(chunk_dict, "checkpoint_idx");
6!
3404
        PyObject *start = PyDict_GetItemString(chunk_dict, "start_byte");
6!
3405
        PyObject *end = PyDict_GetItemString(chunk_dict, "end_byte");
6!
3406

3407
        if (!cp_idx || !start || !end) {
6!
3408
            PyErr_SetString(
×
3409
                PyExc_KeyError,
3410
                "chunk must have checkpoint_idx, start_byte, end_byte");
3411
            return NULL;
×
3412
        }
3413

3414
        desc.checkpoint_idx =
6✔
3415
            static_cast<std::uint64_t>(PyLong_AsUnsignedLongLong(cp_idx));
6!
3416
        desc.start_byte =
6✔
3417
            static_cast<std::size_t>(PyLong_AsUnsignedLongLong(start));
6!
3418
        desc.end_byte =
6✔
3419
            static_cast<std::size_t>(PyLong_AsUnsignedLongLong(end));
6!
3420

3421
        char filename[64];
3422
        snprintf(filename, sizeof(filename), "chunk-%05llu.arrow",
9✔
3423
                 (unsigned long long)desc.checkpoint_idx);
6✔
3424
        desc.output_file = std::string(output_dir) + "/" + filename;
6!
3425

3426
        chunks.push_back(std::move(desc));
6!
3427
    }
6✔
3428

3429
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
2!
3430
    std::string index_path;
2✔
3431
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
2!
3432
    if (idx && idx[0] != '\0') {
2!
3433
        index_path = idx;
×
3434
    }
3435
    std::size_t checkpoint_size = self->checkpoint_size;
2✔
3436

3437
    WriteViewChunksResult result;
2✔
3438
    std::string error_msg;
2✔
3439

3440
    Py_BEGIN_ALLOW_THREADS try {
2!
3441
        Runtime *rt = get_runtime(self);
2!
3442
        result = rt->submit(write_view_chunks_pipeline(
7!
3443
                                file_path, index_path, checkpoint_size, view,
1!
3444
                                std::move(chunks), compression,
2✔
3445
                                static_cast<std::size_t>(batch_size)),
1✔
3446
                            "write_view_chunks")
1!
3447
                     .get();
2!
3448
    } catch (const std::exception &e) {
1!
3449
        error_msg = e.what();
×
3450
    }
×
3451
    Py_END_ALLOW_THREADS
2!
3452

3453
        if (!error_msg.empty()) {
2!
3454
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
3455
        return NULL;
×
3456
    }
3457

3458
    PyObject *dict = PyDict_New();
2!
3459
    if (!dict) return NULL;
2✔
3460

3461
    PyObject *results_list =
1✔
3462
        PyList_New(static_cast<Py_ssize_t>(result.results.size()));
2!
3463
    if (!results_list) {
2!
3464
        Py_DECREF(dict);
×
3465
        return NULL;
×
3466
    }
3467

3468
    for (std::size_t i = 0; i < result.results.size(); i++) {
8✔
3469
        const auto &r = result.results[i];
6✔
3470
        PyObject *item = PyDict_New();
6!
3471
        if (!item) {
6!
3472
            Py_DECREF(results_list);
×
3473
            Py_DECREF(dict);
×
3474
            return NULL;
×
3475
        }
3476
        dict_set_steal(item, "output_file",
6!
3477
                       PyUnicode_FromString(r.output_file.c_str()));
3!
3478
        dict_set_steal(item, "rows_written",
6!
3479
                       PyLong_FromLongLong(r.rows_written));
6!
3480
        dict_set_steal(item, "events_matched",
6!
3481
                       PyLong_FromUnsignedLongLong(r.events_matched));
6!
3482
        if (!r.error.empty()) {
6!
3483
            dict_set_steal(item, "error",
×
3484
                           PyUnicode_FromString(r.error.c_str()));
×
3485
        }
3486
        PyList_SetItem(results_list, static_cast<Py_ssize_t>(i), item);
6!
3487
    }
3✔
3488

3489
    PyDict_SetItemString(dict, "results", results_list);
2!
3490
    Py_DECREF(results_list);
1!
3491
    dict_set_steal(dict, "total_rows", PyLong_FromLongLong(result.total_rows));
2!
3492
    dict_set_steal(dict, "total_events_matched",
2!
3493
                   PyLong_FromLongLong(result.total_events_matched));
2!
3494

3495
    return dict;
2✔
3496
}
2✔
3497

3498
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
3499

3500
static PyObject *TraceReader_enter(TraceReaderObject *self,
116!
3501
                                   PyObject *Py_UNUSED(ignored)) {
3502
    Py_INCREF(self);
58✔
3503
    return (PyObject *)self;
116✔
3504
}
3505

3506
static PyObject *TraceReader_exit(TraceReaderObject *self, PyObject *args) {
114✔
3507
    Py_RETURN_NONE;
114✔
3508
}
3509

3510
static PyObject *TraceReader_get_file_path(TraceReaderObject *self,
14✔
3511
                                           void *closure) {
3512
    Py_INCREF(self->file_path);
14!
3513
    return self->file_path;
14✔
3514
}
3515

3516
static PyObject *TraceReader_get_index_dir(TraceReaderObject *self,
6✔
3517
                                           void *closure) {
3518
    Py_INCREF(self->index_dir);
6✔
3519
    return self->index_dir;
6✔
3520
}
3521

3522
static PyObject *TraceReader_get_has_index(TraceReaderObject *self,
12✔
3523
                                           void *closure) {
3524
    return PyBool_FromLong(self->has_index);
12✔
3525
}
3526

3527
static PyObject *TraceReader_get_num_lines_prop(TraceReaderObject *self,
8✔
3528
                                                void *closure) {
3529
    try {
3530
        TraceReaderConfig cfg = build_config(self);
8!
3531
        TraceReader reader(std::move(cfg));
8!
3532
        std::size_t n = reader.get_num_lines();
8!
3533
        if (n > 0) return PyLong_FromSize_t(n);
8!
3534
    } catch (...) {
8!
3535
    }
×
3536
    PyObject *empty_args = PyTuple_New(0);
8✔
3537
    if (!empty_args) return NULL;
8✔
3538
    PyObject *list = TraceReader_read_lines(self, empty_args, NULL);
8✔
3539
    Py_DECREF(empty_args);
4✔
3540
    if (!list) return NULL;
8✔
3541
    Py_ssize_t n = PyList_GET_SIZE(list);
8✔
3542
    Py_DECREF(list);
4✔
3543
    return PyLong_FromSsize_t(n);
8✔
3544
}
4✔
3545

3546
static PyObject *TraceReader_get_max_bytes(TraceReaderObject *self,
24✔
3547
                                           PyObject *Py_UNUSED(ignored)) {
3548
    try {
3549
        TraceReaderConfig cfg = build_config(self);
24!
3550
        TraceReader reader(std::move(cfg));
24!
3551
        return PyLong_FromSize_t(reader.get_max_bytes());
24!
3552
    } catch (const std::exception &e) {
24!
3553
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
3554
        return NULL;
×
3555
    }
×
3556
}
12✔
3557

3558
static PyObject *TraceReader_get_num_lines(TraceReaderObject *self,
6✔
3559
                                           PyObject *Py_UNUSED(ignored)) {
3560
    try {
3561
        TraceReaderConfig cfg = build_config(self);
6!
3562
        TraceReader reader(std::move(cfg));
6!
3563
        return PyLong_FromSize_t(reader.get_num_lines());
6!
3564
    } catch (const std::exception &e) {
6!
3565
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
3566
        return NULL;
×
3567
    }
×
3568
}
3✔
3569

3570
static PyMethodDef TraceReader_methods[] = {
3571
    {"iter_lines", (PyCFunction)TraceReader_iter_lines,
3572
     METH_VARARGS | METH_KEYWORDS,
3573
     "Return an iterator over decoded lines.\n"
3574
     "\n"
3575
     "Args:\n"
3576
     "    start_line (int): First line (0 = beginning).\n"
3577
     "    end_line (int): Last line (0 = end of file).\n"
3578
     "    start_byte (int): First byte offset (0 = beginning).\n"
3579
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3580
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3581
    {"iter_raw", (PyCFunction)TraceReader_iter_raw,
3582
     METH_VARARGS | METH_KEYWORDS,
3583
     "Return an iterator over raw byte chunks.\n"
3584
     "\n"
3585
     "Args:\n"
3586
     "    start_line (int): First line (0 = beginning).\n"
3587
     "    end_line (int): Last line (0 = end of file).\n"
3588
     "    start_byte (int): First byte offset (0 = beginning).\n"
3589
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3590
     "    buffer_size (int): Internal read buffer size in bytes.\n"
3591
     "    line_aligned (bool): Align chunks to line boundaries.\n"
3592
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
3593
    {"read_lines", (PyCFunction)TraceReader_read_lines,
3594
     METH_VARARGS | METH_KEYWORDS,
3595
     "Read all lines and return as list.\n"
3596
     "\n"
3597
     "Args:\n"
3598
     "    start_line (int): First line (0 = beginning).\n"
3599
     "    end_line (int): Last line (0 = end of file).\n"
3600
     "    start_byte (int): First byte offset (0 = beginning).\n"
3601
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3602
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3603
    {"iter_json", (PyCFunction)TraceReader_iter_json,
3604
     METH_VARARGS | METH_KEYWORDS,
3605
     "Return an iterator over parsed JSON events as Python dicts.\n"
3606
     "\n"
3607
     "Each event is parsed once in C++ (single-pass simdjson ondemand)\n"
3608
     "and yielded as a Python dict. No double-parsing overhead.\n"
3609
     "\n"
3610
     "Args:\n"
3611
     "    start_line (int): First line (0 = beginning).\n"
3612
     "    end_line (int): Last line (0 = end of file).\n"
3613
     "    start_byte (int): First byte offset (0 = beginning).\n"
3614
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3615
     "    buffer_size (int): Internal read buffer size in bytes.\n"
3616
     "    query (str): Optional query filter.\n"
3617
     "    batch_size (int): Events per internal batch (default 1024).\n"},
3618
    {"read_json", (PyCFunction)TraceReader_read_json_py,
3619
     METH_VARARGS | METH_KEYWORDS,
3620
     "Read all events as parsed Python dicts (list).\n"
3621
     "\n"
3622
     "Equivalent to list(iter_json(...)).\n"},
3623
    {"read_raw", (PyCFunction)TraceReader_read_raw,
3624
     METH_VARARGS | METH_KEYWORDS,
3625
     "Read all raw chunks and return as list.\n"
3626
     "\n"
3627
     "Args:\n"
3628
     "    start_line (int): First line (0 = beginning).\n"
3629
     "    end_line (int): Last line (0 = end of file).\n"
3630
     "    start_byte (int): First byte offset (0 = beginning).\n"
3631
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3632
     "    buffer_size (int): Internal read buffer size in bytes.\n"
3633
     "    line_aligned (bool): Align chunks to line boundaries.\n"
3634
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
3635
#ifdef DFTRACER_UTILS_ENABLE_ARROW
3636
    {"iter_arrow", (PyCFunction)TraceReader_iter_arrow,
3637
     METH_VARARGS | METH_KEYWORDS,
3638
     "Return an iterator over Arrow record batches.\n"
3639
     "\n"
3640
     "Args:\n"
3641
     "    batch_size (int): Maximum rows per Arrow batch.\n"
3642
     "    start_line (int): First line (0 = beginning).\n"
3643
     "    end_line (int): Last line (0 = end of file).\n"
3644
     "    start_byte (int): First byte offset (0 = beginning).\n"
3645
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3646
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3647
    {"iter_arrow_stream", (PyCFunction)TraceReader_iter_arrow_stream,
3648
     METH_VARARGS | METH_KEYWORDS,
3649
     "Return an _ArrowBatchStream that exposes Arrow record batches\n"
3650
     "via the Arrow C Data Interface stream protocol\n"
3651
     "(__arrow_c_stream__). PyArrow can drain the producer channel\n"
3652
     "with a single call, without per-batch Python iteration.\n"},
3653
    {"read_arrow", (PyCFunction)TraceReader_read_arrow,
3654
     METH_VARARGS | METH_KEYWORDS,
3655
     "Read all events as a materialized ArrowTable.\n"
3656
     "\n"
3657
     "Args:\n"
3658
     "    batch_size (int): Maximum rows per Arrow batch.\n"
3659
     "    start_line (int): First line (0 = beginning).\n"
3660
     "    end_line (int): Last line (0 = end of file).\n"
3661
     "    start_byte (int): First byte offset (0 = beginning).\n"
3662
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3663
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3664
#endif
3665
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
3666
    {"write_arrow", (PyCFunction)TraceReader_write_arrow,
3667
     METH_VARARGS | METH_KEYWORDS,
3668
     "Write trace data to partitioned Arrow IPC files.\n"
3669
     "\n"
3670
     "Args:\n"
3671
     "    path (str): Output directory path.\n"
3672
     "    partition_by (list[str] or None): Column names to partition by.\n"
3673
     "    num_buckets (int): Number of hash buckets (0 = no bucketing).\n"
3674
     "    chunk_size_mb (int): Max uncompressed MB per file (default 32).\n"
3675
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
3676
     "    batch_size (int): Rows per internal batch (default 10000).\n"
3677
     "    normalize (bool): Use normalized schema (default False).\n"
3678
     "\n"
3679
     "Returns:\n"
3680
     "    dict: Statistics including partitions, total_rows, total_bytes.\n"},
3681
    {"get_view_chunks", (PyCFunction)TraceReader_get_view_chunks,
3682
     METH_VARARGS | METH_KEYWORDS,
3683
     "Get candidate chunks for a view after bloom filter pruning.\n"
3684
     "\n"
3685
     "Args:\n"
3686
     "    view (str or dict): View name ('io', 'compute', 'dlio') or\n"
3687
     "                        dict with 'name' and optional 'query'.\n"
3688
     "\n"
3689
     "Returns:\n"
3690
     "    dict: chunks list, total_checkpoints, skipped_checkpoints.\n"},
3691
    {"write_view_chunk", (PyCFunction)TraceReader_write_view_chunk,
3692
     METH_VARARGS | METH_KEYWORDS,
3693
     "Write a single chunk to an Arrow IPC file.\n"
3694
     "\n"
3695
     "Args:\n"
3696
     "    output_file (str): Path to output Arrow IPC file.\n"
3697
     "    checkpoint_idx (int): Checkpoint index.\n"
3698
     "    start_byte (int): Start byte offset.\n"
3699
     "    end_byte (int): End byte offset.\n"
3700
     "    view (str or dict): View definition.\n"
3701
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
3702
     "    batch_size (int): Events per batch (default 10000).\n"
3703
     "\n"
3704
     "Returns:\n"
3705
     "    dict: output_file, events_matched, rows_written, bytes_written.\n"},
3706
    {"write_view_chunks", (PyCFunction)TraceReader_write_view_chunks,
3707
     METH_VARARGS | METH_KEYWORDS,
3708
     "Write multiple chunks to Arrow IPC files in parallel.\n"
3709
     "\n"
3710
     "All chunks are processed concurrently on the Runtime thread pool.\n"
3711
     "\n"
3712
     "Args:\n"
3713
     "    chunks (list): List of dicts with checkpoint_idx, start_byte, "
3714
     "end_byte.\n"
3715
     "    output_dir (str): Directory for output Arrow IPC files.\n"
3716
     "    view (str or dict): View definition.\n"
3717
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
3718
     "    batch_size (int): Events per batch (default 10000).\n"
3719
     "\n"
3720
     "Returns:\n"
3721
     "    dict: results list, total_rows, total_events_matched.\n"},
3722
#endif
3723
    {"get_max_bytes", (PyCFunction)TraceReader_get_max_bytes, METH_NOARGS,
3724
     "Get the maximum byte position (0 if unknown for compressed\n"
3725
     "files without index)."},
3726
    {"get_num_lines", (PyCFunction)TraceReader_get_num_lines, METH_NOARGS,
3727
     "Get the total number of lines (0 if unknown for files without\n"
3728
     "index)."},
3729
    {"__enter__", (PyCFunction)TraceReader_enter, METH_NOARGS,
3730
     "Enter the runtime context for the with statement."},
3731
    {"__exit__", (PyCFunction)TraceReader_exit, METH_VARARGS,
3732
     "Exit the runtime context for the with statement.\n"
3733
     "\n"
3734
     "TraceReader does not own the shared RocksDB instance for an index path;\n"
3735
     "any shared DB lifetime remains manager-owned on the native side."},
3736
    {NULL}};
3737

3738
static PyGetSetDef TraceReader_getsetters[] = {
3739
    {"path", (getter)TraceReader_get_file_path, NULL,
3740
     "Path to the trace file or directory", NULL},
3741
    {"index_dir", (getter)TraceReader_get_index_dir, NULL,
3742
     "Directory for index files", NULL},
3743
    {"has_index", (getter)TraceReader_get_has_index, NULL,
3744
     "True if a checkpoint index was found", NULL},
3745
    {"num_lines", (getter)TraceReader_get_num_lines_prop, NULL,
3746
     "Total line count (reads all lines if needed)", NULL},
3747
    {NULL}};
3748

3749
PyTypeObject TraceReaderType = {
3750
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.TraceReader",
3751
    sizeof(TraceReaderObject),                /* tp_basicsize */
3752
    0,                                        /* tp_itemsize */
3753
    (destructor)TraceReader_dealloc,          /* tp_dealloc */
3754
    0,                                        /* tp_vectorcall_offset */
3755
    0,                                        /* tp_getattr */
3756
    0,                                        /* tp_setattr */
3757
    0,                                        /* tp_as_async */
3758
    0,                                        /* tp_repr */
3759
    0,                                        /* tp_as_number */
3760
    0,                                        /* tp_as_sequence */
3761
    0,                                        /* tp_as_mapping */
3762
    0,                                        /* tp_hash */
3763
    0,                                        /* tp_call */
3764
    0,                                        /* tp_str */
3765
    0,                                        /* tp_getattro */
3766
    0,                                        /* tp_setattro */
3767
    0,                                        /* tp_as_buffer */
3768
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
3769
    "TraceReader(file_path: str, index_dir: str = '',\n"
3770
    "            checkpoint_size: int = 33554432,\n"
3771
    "            auto_build_index: bool = False,\n"
3772
    "            runtime: Runtime | None = None)\n"
3773
    "--\n"
3774
    "\n"
3775
    "Smart trace file reader that auto-selects sequential or indexed\n"
3776
    "reading based on whether a ``.dftindex`` store exists.\n"
3777
    "\n"
3778
    "Args:\n"
3779
    "    file_path (str): Path to the trace file (.pfw.gz or plain "
3780
    "text).\n"
3781
    "    index_dir (str): Directory to search for ``.dftindex`` "
3782
    "stores.\n"
3783
    "        Empty string (default) searches next to the trace file.\n"
3784
    "    checkpoint_size (int): Checkpoint interval in bytes for index\n"
3785
    "        building (default 32 MB).\n"
3786
    "    auto_build_index (bool): If True, automatically build an "
3787
    "index\n"
3788
    "        when none exists.\n"
3789
    "    runtime (Runtime or None): Runtime instance for thread pool "
3790
    "control.\n"
3791
    "        If None, uses the default global Runtime.\n"
3792
    "\n"
3793
    "Raises:\n"
3794
    "    RuntimeError: If *file_path* does not exist or cannot be "
3795
    "opened.\n",                /* tp_doc */
3796
    0,                          /* tp_traverse */
3797
    0,                          /* tp_clear */
3798
    0,                          /* tp_richcompare */
3799
    0,                          /* tp_weaklistoffset */
3800
    0,                          /* tp_iter */
3801
    0,                          /* tp_iternext */
3802
    TraceReader_methods,        /* tp_methods */
3803
    0,                          /* tp_members */
3804
    TraceReader_getsetters,     /* tp_getset */
3805
    0,                          /* tp_base */
3806
    0,                          /* tp_dict */
3807
    0,                          /* tp_descr_get */
3808
    0,                          /* tp_descr_set */
3809
    0,                          /* tp_dictoffset */
3810
    (initproc)TraceReader_init, /* tp_init */
3811
    0,                          /* tp_alloc */
3812
    TraceReader_new,            /* tp_new */
3813
};
3814

3815
int init_trace_reader(PyObject *m) {
2✔
3816
    if (register_type(m, &TraceReaderType, "TraceReader") < 0) return -1;
2✔
3817

3818
    return 0;
2✔
3819
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc