• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26325072882

23 May 2026 05:51AM UTC coverage: 52.162% (-0.02%) from 52.184%
26325072882

Pull #72

github

web-flow
Merge 2db6929e7 into b6ec01bb1
Pull Request #72: chore(utils): add portable to_chars_double fallback for macOS

37192 of 92663 branches covered (40.14%)

Branch coverage included in aggregate %.

0 of 3 new or added lines in 1 file covered. (0.0%)

5 existing lines in 4 files now uncovered.

33420 of 42707 relevant lines covered (78.25%)

20429.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.69
/src/dftracer/utils/python/trace_reader.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <Python.h>
3
#include <dftracer/utils/core/common/config.h>
4
#include <dftracer/utils/core/common/filesystem.h>
5
#include <dftracer/utils/core/common/memory_budget.h>
6
#include <dftracer/utils/core/coro/channel.h>
7
#include <dftracer/utils/core/coro/task.h>
8
#include <dftracer/utils/core/coro/when_all.h>
9
#include <dftracer/utils/core/tasks/coro_scope.h>
10
#include <dftracer/utils/core/utils/string.h>
11
#include <dftracer/utils/python/arrow_helpers.h>
12
#include <dftracer/utils/python/batch_byte_size.h>
13
#include <dftracer/utils/python/json.h>
14
#include <dftracer/utils/python/runtime.h>
15
#include <dftracer/utils/python/trace_reader.h>
16
#include <dftracer/utils/python/trace_reader_iterator.h>
17
#include <dftracer/utils/utilities/common/query/query.h>
18
#include <dftracer/utils/utilities/composites/dft/indexing/chunk_pruner_utility.h>
19
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
20
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
21
#include <dftracer/utils/utilities/indexer/index_database.h>
22
#include <dftracer/utils/utilities/indexer/internal/helpers.h>
23
#include <dftracer/utils/utilities/reader/trace_reader.h>
24

25
#include <algorithm>
26
#include <cctype>
27
#include <cstddef>
28
#include <cstdio>
29
#include <cstring>
30
#include <exception>
31
#include <memory>
32
#include <string>
33
#include <unordered_map>
34
#include <vector>
35
#ifdef DFTRACER_UTILS_ENABLE_ARROW
36
#include <dftracer/utils/python/arrow_stream_capsule.h>
37
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
38
#include <dftracer/utils/utilities/common/json/parser.h>
39
#endif
40
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
41
#include <dftracer/utils/utilities/common/arrow/ipc_writer.h>
42
#include <dftracer/utils/utilities/common/arrow/partition_writer.h>
43
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
44
#include <dftracer/utils/utilities/composites/dft/metadata_collector_utility.h>
45
#include <dftracer/utils/utilities/composites/dft/views/view_builder_utility.h>
46
#include <dftracer/utils/utilities/composites/dft/views/view_definition.h>
47
#include <dftracer/utils/utilities/composites/dft/views/view_reader_utility.h>
48
#endif
49

50
namespace {
51

52
using dftracer::utils::CoroScope;
53
using dftracer::utils::Runtime;
54
using dftracer::utils::coro::CoroTask;
55
using dftracer::utils::coro::when_all;
56
using dftracer::utils::utilities::filesystem::PatternDirectoryScannerUtility;
57
using dftracer::utils::utilities::filesystem::
58
    PatternDirectoryScannerUtilityInput;
59
using dftracer::utils::utilities::reader::ReadConfig;
60
using dftracer::utils::utilities::reader::TraceReader;
61
using dftracer::utils::utilities::reader::TraceReaderConfig;
62
#ifdef DFTRACER_UTILS_ENABLE_ARROW
63
using dftracer::utils::utilities::common::arrow::ColumnType;
64
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
65
using dftracer::utils::utilities::common::json::JsonParser;
66
using dftracer::utils::utilities::common::json::JsonValueHelper;
67
#endif
68
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
69
using dftracer::utils::utilities::common::arrow::IpcCompression;
70
using dftracer::utils::utilities::common::arrow::PartitionWriter;
71
using dftracer::utils::utilities::common::arrow::PartitionWriteStats;
72
using dftracer::utils::utilities::composites::dft::MetadataCollectorUtility;
73
using dftracer::utils::utilities::composites::dft::
74
    MetadataCollectorUtilityInput;
75
using dftracer::utils::utilities::composites::dft::views::ViewBuilderInput;
76
using dftracer::utils::utilities::composites::dft::views::ViewBuilderUtility;
77
using dftracer::utils::utilities::composites::dft::views::ViewDefinition;
78
using dftracer::utils::utilities::composites::dft::views::ViewReaderInput;
79
using dftracer::utils::utilities::composites::dft::views::ViewReaderUtility;
80
#endif
81

82
using dftracer::utils::python::MemoryViewBatchData;
83
using dftracer::utils::python::MemoryViewBatchIteratorState;
84

85
CoroTask<void> produce_lines_batched(
4,536!
86
    std::shared_ptr<MemoryViewBatchIteratorState> state,
87
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer,
88
    TraceReaderConfig cfg, ReadConfig rc, std::size_t batch_size) {
56!
89
    auto guard = producer.guard();
56!
90
    try {
91
        TraceReader reader(std::move(cfg));
56!
92
        auto gen = reader.read_lines(rc);
56!
93
        MemoryViewBatchData batch;
56✔
94
        std::size_t count = 0;
56✔
95

96
        while (auto opt = co_await gen.next()) {
4,062!
97
            if (state->cancelled.load(std::memory_order_acquire)) break;
946!
98
            auto sv = opt->content;
946✔
99
            Py_ssize_t offset = static_cast<Py_ssize_t>(batch.buffer.size());
946✔
100
            batch.buffer.insert(batch.buffer.end(), sv.begin(), sv.end());
946!
101
            batch.offsets.push_back(offset);
946!
102
            batch.lengths.push_back(static_cast<Py_ssize_t>(sv.size()));
946!
103
            ++count;
946✔
104

105
            if (count >= batch_size) {
946!
106
                auto batch_bytes = dftracer::utils::python::byte_size(batch);
×
107
                state->bytes_in_queue.fetch_add(batch_bytes,
108
                                                std::memory_order_acq_rel);
109
                if (!co_await producer.send(std::move(batch))) break;
×
110
                batch = MemoryViewBatchData{};
111
                count = 0;
112
            }
×
113
        }
1,002!
114
        if (count > 0 && !state->cancelled.load(std::memory_order_acquire)) {
153✔
115
            auto batch_bytes = dftracer::utils::python::byte_size(batch);
156!
116
            state->bytes_in_queue.fetch_add(batch_bytes,
156✔
117
                                            std::memory_order_acq_rel);
118
            co_await producer.send(std::move(batch));
208!
119
        }
52!
120
    } catch (...) {
2,358✔
121
        state->set_error(std::current_exception());
1!
122
    }
1!
123
}
6,481!
124

125
CoroTask<void> produce_raw_batched(
96!
126
    std::shared_ptr<MemoryViewBatchIteratorState> state,
127
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer,
128
    TraceReaderConfig cfg, ReadConfig rc) {
10!
129
    auto guard = producer.guard();
10!
130
    try {
131
        TraceReader reader(std::move(cfg));
10!
132
        auto gen = reader.read_raw(rc);
10!
133
        while (auto opt = co_await gen.next()) {
738!
134
            if (state->cancelled.load(std::memory_order_acquire)) break;
519!
135
            MemoryViewBatchData batch;
519✔
136
            batch.buffer.assign(opt->data(), opt->data() + opt->size());
519!
137
            batch.offsets.push_back(0);
519!
138
            batch.lengths.push_back(static_cast<Py_ssize_t>(opt->size()));
519!
139
            auto batch_bytes = dftracer::utils::python::byte_size(batch);
519✔
140
            state->bytes_in_queue.fetch_add(batch_bytes,
519✔
141
                                            std::memory_order_acq_rel);
142
            if (!co_await producer.send(std::move(batch))) break;
692!
143
        }
528✔
144
    } catch (...) {
28✔
145
        state->set_error(std::current_exception());
×
146
    }
×
147
}
1,468!
148

149
using dftracer::utils::utilities::common::json::JsonParser;
150
using dftracer::utils::utilities::common::json::JsonValueHelper;
151

152
static constexpr std::size_t ESTIMATED_BYTES_PER_LINE = 256;
153
static constexpr std::size_t ESTIMATED_BYTES_PER_RAW_CHUNK = 4 * 1024 * 1024;
154
static constexpr std::size_t ESTIMATED_BYTES_PER_JSON_EVENT = 512;
155
static constexpr std::size_t ESTIMATED_BYTES_PER_ARROW_ROW = 1024;
156

157
static void insert_simdjson_value(ArgsMap &map, std::string_view key,
4,218✔
158
                                  simdjson::ondemand::value val) {
159
    auto type = val.type();
4,218✔
160
    if (type.error()) return;
4,225!
161
    switch (type.value_unsafe()) {
4,231!
162
        case simdjson::ondemand::json_type::string: {
916✔
163
            auto r = val.get_string();
1,876✔
164
            if (!r.error()) map.insert(key, std::string(r.value_unsafe()));
1,882!
165
            break;
1,862✔
166
        }
167
        case simdjson::ondemand::json_type::number: {
1,149✔
168
            auto ri = val.get_int64();
2,349✔
169
            if (!ri.error()) {
2,341✔
170
                auto v = ri.value_unsafe();
2,332✔
171
                if (v >= 0)
2,333!
172
                    map.insert(key, static_cast<std::uint64_t>(v));
2,334!
173
                else
UNCOV
174
                    map.insert(key, v);
×
175
            } else {
1,200✔
176
                auto rd = val.get_double();
×
177
                if (!rd.error()) map.insert(key, rd.value_unsafe());
×
178
            }
179
            break;
2,343✔
180
        }
181
        case simdjson::ondemand::json_type::boolean: {
UNCOV
182
            auto r = val.get_bool();
×
183
            if (!r.error()) map.insert(key, r.value_unsafe());
×
184
            break;
×
185
        }
186
        default:
187
            break;
×
188
    }
189
}
2,160✔
190

191
static void parse_json_to_event(JsonParser &parser, JsonDictEvent &ev) {
474✔
192
    ev.top.set_valid(true);
474✔
193
    parser.for_each_field(
715!
194
        [&](std::string_view key, simdjson::ondemand::value val) {
3,975✔
195
            if (key == "args") {
3,735✔
196
                auto obj = val.get_object();
465✔
197
                if (!obj.error()) {
469!
198
                    ev.args.set_valid(true);
464✔
199
                    for (auto field : obj.value_unsafe()) {
1,394✔
200
                        if (field.error()) continue;
932!
201
                        auto fkey = field.unescaped_key();
932✔
202
                        if (fkey.error()) continue;
936!
203
                        auto fval = field.value();
938✔
204
                        if (fval.error()) continue;
934!
205
                        insert_simdjson_value(ev.args, fkey.value_unsafe(),
1,417!
206
                                              fval.value_unsafe());
933✔
207
                    }
208
                }
240✔
209
            } else {
240✔
210
                insert_simdjson_value(ev.top, key, val);
3,291✔
211
            }
212
        });
3,758✔
213
}
472✔
214

215
CoroTask<void> produce_json_dicts(
134!
216
    std::shared_ptr<JsonDictIteratorState> state,
217
    dftracer::utils::coro::ChannelProducer<JsonDictBatch> producer,
218
    TraceReaderConfig cfg, ReadConfig rc, std::size_t batch_size) {
1!
219
    auto guard = producer.guard();
1!
220
    try {
221
        TraceReader reader(std::move(cfg));
1!
222
        auto gen = reader.read_json(rc);
1!
223
        JsonDictBatch batch;
1✔
224
        batch.events.reserve(batch_size);
1!
225

226
        while (auto opt = co_await gen.next()) {
125!
227
            if (state->cancelled.load(std::memory_order_acquire)) break;
30!
228

229
            JsonDictEvent ev;
30!
230
            parse_json_to_event(*opt->parser, ev);
30!
231
            batch.events.push_back(std::move(ev));
30!
232

233
            if (batch.events.size() >= batch_size) {
30!
234
                auto batch_bytes = dftracer::utils::python::byte_size(batch);
×
235
                state->bytes_in_queue.fetch_add(batch_bytes,
236
                                                std::memory_order_acq_rel);
237
                if (!co_await producer.send(std::move(batch))) break;
×
238
                batch = JsonDictBatch{};
239
                batch.events.reserve(batch_size);
×
240
            }
×
241
        }
31!
242
        if (!batch.events.empty() &&
3✔
243
            !state->cancelled.load(std::memory_order_acquire)) {
1✔
244
            auto batch_bytes = dftracer::utils::python::byte_size(batch);
3!
245
            state->bytes_in_queue.fetch_add(batch_bytes,
3✔
246
                                            std::memory_order_acq_rel);
247
            co_await producer.send(std::move(batch));
4!
248
        }
1!
249
    } catch (...) {
69✔
250
        state->set_error(std::current_exception());
×
251
    }
×
252
}
195!
253

254
static CoroTask<void> send_files_to_channel(
144!
255
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
256
    const std::vector<std::string> *files, std::atomic<bool> *cancelled) {
8!
257
    for (const auto &fp : *files) {
128✔
258
        if (cancelled->load(std::memory_order_acquire)) break;
72!
259
        if (!co_await file_chan->send(fp)) break;
104!
260
    }
24✔
261
    file_chan->close();
8!
262
    co_return;
8✔
263
}
112!
264

265
static CoroTask<void> json_dict_file_worker(
973!
266
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
267
    dftracer::utils::coro::Channel<JsonDictBatch> *out_chan,
268
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
269
    ReadConfig rc, std::size_t batch_size, std::atomic<bool> *cancelled) {
9!
270
    dftracer::utils::coro::ChannelProducer<JsonDictBatch> producer(out_chan);
9!
271
    auto guard = producer.guard();
9!
272

273
    while (auto file_path = co_await file_chan->receive()) {
56!
274
        if (cancelled->load(std::memory_order_acquire)) co_return;
14!
275
        TraceReaderConfig cfg;
14✔
276
        cfg.file_path = std::move(*file_path);
14✔
277
        cfg.index_dir = index_dir;
14!
278
        cfg.checkpoint_size = checkpoint_size;
14✔
279
        cfg.auto_build_index = auto_build_index;
14✔
280

281
        TraceReader reader(std::move(cfg));
14!
282
        auto gen = reader.read_json(rc);
14!
283
        JsonDictBatch batch;
14✔
284
        batch.events.reserve(batch_size);
14!
285

286
        while (auto opt = co_await gen.next()) {
896!
287
            if (cancelled->load(std::memory_order_acquire)) co_return;
210!
288
            JsonDictEvent ev;
210!
289
            parse_json_to_event(*opt->parser, ev);
210!
290
            batch.events.push_back(std::move(ev));
210!
291
            if (batch.events.size() >= batch_size) {
210!
292
                if (!co_await producer.send(std::move(batch))) co_return;
×
293
                batch = JsonDictBatch{};
294
                batch.events.reserve(batch_size);
×
295
            }
296
        }
224!
297
        if (!batch.events.empty()) {
42!
298
            if (!co_await producer.send(std::move(batch))) co_return;
56!
299
        }
14✔
300
    }
499✔
301
    co_return;
9✔
302
}
1,471!
303

304
static CoroTask<void> spawn_json_dict_producers(
30!
305
    CoroScope &child, dftracer::utils::coro::Channel<JsonDictBatch> *out_chan,
306
    const std::vector<std::string> *files, const std::string *index_dir,
307
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
308
    std::size_t batch_size, std::atomic<bool> *cancelled_ptr,
309
    std::size_t max_workers) {
5!
310
    std::size_t num_workers = std::min(files->size(), max_workers);
5!
311
    auto file_chan =
5✔
312
        dftracer::utils::coro::make_channel<std::string>(num_workers);
5!
313

314
    for (std::size_t i = 0; i < num_workers; ++i) {
14✔
315
        child.spawn([out_chan, fc = file_chan, idx = *index_dir,
27!
316
                     checkpoint_size, auto_build_index, r = *rc, batch_size,
27!
317
                     cancelled_ptr](CoroScope &) {
36✔
318
            return json_dict_file_worker(fc, out_chan, idx, checkpoint_size,
27!
319
                                         auto_build_index, r, batch_size,
18!
320
                                         cancelled_ptr);
18!
321
        });
322
    }
9✔
323

324
    child.spawn([fc = file_chan, files, cancelled_ptr](CoroScope &) {
15!
325
        return send_files_to_channel(fc, files, cancelled_ptr);
10!
326
    });
327
    co_return;
10✔
328
}
15!
329

330
static CoroTask<void> produce_json_dicts_parallel(
68!
331
    CoroScope &scope, JsonDictIteratorState *sp, std::string dir_path,
332
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
333
    ReadConfig rc, std::size_t batch_size, std::size_t max_workers) {
6!
334
    try {
335
        PatternDirectoryScannerUtility scanner;
18!
336
        auto scan_input = PatternDirectoryScannerUtilityInput(
36!
337
            dir_path, {".pfw", ".pfw.gz"}, true, false);
18!
338
        auto entries = co_await scope.spawn(scanner, scan_input);
30!
339

340
        std::vector<std::string> files;
16✔
341
        files.reserve(entries.size());
16✔
342
        for (auto &e : entries) files.push_back(e.path.string());
20!
343
        std::sort(files.begin(), files.end());
6✔
344

345
        if (files.empty()) {
16✔
346
            sp->channel->close();
1!
347
            co_return;
1✔
348
        }
349

350
        auto *chan_ptr = sp->channel.get();
15✔
351
        auto *cancelled_ptr = &sp->cancelled;
15✔
352

353
        co_await scope.scope([chan_ptr, &files, &index_dir, checkpoint_size,
120!
354
                              auto_build_index, &rc, batch_size, cancelled_ptr,
45✔
355
                              max_workers](CoroScope &child) -> CoroTask<void> {
20!
356
            co_await spawn_json_dict_producers(
40!
357
                child, chan_ptr, &files, &index_dir, checkpoint_size,
15✔
358
                auto_build_index, &rc, batch_size, cancelled_ptr, max_workers);
15✔
359
        });
20!
360
    } catch (...) {
16✔
361
        sp->set_error(std::current_exception());
×
362
    }
×
363
}
78!
364

365
static CoroTask<void> lines_file_worker(
548!
366
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
367
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
368
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
369
    ReadConfig rc, std::size_t batch_size, std::atomic<bool> *cancelled) {
4!
370
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer(
8!
371
        out_chan);
4✔
372
    auto guard = producer.guard();
4!
373

374
    while (auto file_path = co_await file_chan->receive()) {
24!
375
        if (cancelled->load(std::memory_order_acquire)) co_return;
7!
376
        TraceReaderConfig cfg;
7✔
377
        cfg.file_path = std::move(*file_path);
7✔
378
        cfg.index_dir = index_dir;
7!
379
        cfg.checkpoint_size = checkpoint_size;
7✔
380
        cfg.auto_build_index = auto_build_index;
7✔
381

382
        TraceReader reader(std::move(cfg));
7!
383
        auto gen = reader.read_lines(rc);
7!
384
        MemoryViewBatchData batch;
7✔
385
        std::size_t count = 0;
7✔
386

387
        while (auto opt = co_await gen.next()) {
508!
388
            if (cancelled->load(std::memory_order_acquire)) co_return;
120!
389
            auto sv = opt->content;
120✔
390
            Py_ssize_t offset = static_cast<Py_ssize_t>(batch.buffer.size());
120✔
391
            batch.buffer.insert(batch.buffer.end(), sv.begin(), sv.end());
120!
392
            batch.offsets.push_back(offset);
120!
393
            batch.lengths.push_back(static_cast<Py_ssize_t>(sv.size()));
120!
394
            ++count;
120✔
395
            if (count >= batch_size) {
120!
396
                if (!co_await producer.send(std::move(batch))) co_return;
×
397
                batch = MemoryViewBatchData{};
398
                count = 0;
399
            }
400
        }
127!
401
        if (count > 0) {
21!
402
            if (!co_await producer.send(std::move(batch))) co_return;
28!
403
        }
7✔
404
    }
279✔
405
    co_return;
4✔
406
}
822!
407

408
static CoroTask<void> spawn_lines_producers(
12!
409
    CoroScope &child,
410
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
411
    const std::vector<std::string> *files, const std::string *index_dir,
412
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
413
    std::size_t batch_size, std::atomic<bool> *cancelled_ptr,
414
    std::size_t max_workers) {
2!
415
    std::size_t num_workers = std::min(files->size(), max_workers);
2!
416
    auto file_chan =
2✔
417
        dftracer::utils::coro::make_channel<std::string>(num_workers);
2!
418

419
    for (std::size_t i = 0; i < num_workers; ++i) {
6✔
420
        child.spawn([out_chan, fc = file_chan, idx = *index_dir,
12!
421
                     checkpoint_size, auto_build_index, r = *rc, batch_size,
12!
422
                     cancelled_ptr](CoroScope &) {
16✔
423
            return lines_file_worker(fc, out_chan, idx, checkpoint_size,
12!
424
                                     auto_build_index, r, batch_size,
8!
425
                                     cancelled_ptr);
8!
426
        });
427
    }
4✔
428

429
    child.spawn([fc = file_chan, files, cancelled_ptr](CoroScope &) {
6!
430
        return send_files_to_channel(fc, files, cancelled_ptr);
4!
431
    });
432
    co_return;
4✔
433
}
6!
434

435
static CoroTask<void> produce_lines_parallel(
32!
436
    CoroScope &scope, MemoryViewBatchIteratorState *sp, std::string dir_path,
437
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
438
    ReadConfig rc, std::size_t batch_size, std::size_t max_workers) {
3!
439
    try {
440
        PatternDirectoryScannerUtility scanner;
9!
441
        auto scan_input = PatternDirectoryScannerUtilityInput(
18!
442
            dir_path, {".pfw", ".pfw.gz"}, true, false);
9!
443
        auto entries = co_await scope.spawn(scanner, scan_input);
15!
444

445
        std::vector<std::string> files;
7✔
446
        files.reserve(entries.size());
7✔
447
        for (auto &e : entries) files.push_back(e.path.string());
10!
448
        std::sort(files.begin(), files.end());
3✔
449

450
        if (files.empty()) {
7✔
451
            sp->channel->close();
1!
452
            co_return;
1✔
453
        }
454

455
        auto *chan_ptr = sp->channel.get();
6✔
456
        auto *cancelled_ptr = &sp->cancelled;
6✔
457

458
        co_await scope.scope([chan_ptr, &files, &index_dir, checkpoint_size,
48!
459
                              auto_build_index, &rc, batch_size, cancelled_ptr,
18✔
460
                              max_workers](CoroScope &child) -> CoroTask<void> {
8!
461
            co_await spawn_lines_producers(
16!
462
                child, chan_ptr, &files, &index_dir, checkpoint_size,
6✔
463
                auto_build_index, &rc, batch_size, cancelled_ptr, max_workers);
6✔
464
        });
8!
465
    } catch (...) {
7✔
466
        sp->set_error(std::current_exception());
×
467
    }
×
468
}
36!
469

470
static CoroTask<void> raw_file_worker(
16!
471
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
472
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
473
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
474
    ReadConfig rc, std::atomic<bool> *cancelled) {
2!
475
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer(
4!
476
        out_chan);
2✔
477
    auto guard = producer.guard();
2!
478

479
    while (auto file_path = co_await file_chan->receive()) {
13!
480
        if (cancelled->load(std::memory_order_acquire)) co_return;
3!
481
        TraceReaderConfig cfg;
3✔
482
        cfg.file_path = std::move(*file_path);
3✔
483
        cfg.index_dir = index_dir;
3!
484
        cfg.checkpoint_size = checkpoint_size;
3✔
485
        cfg.auto_build_index = auto_build_index;
3✔
486

487
        TraceReader reader(std::move(cfg));
3!
488
        auto gen = reader.read_raw(rc);
3!
489
        while (auto opt = co_await gen.next()) {
252!
490
            if (cancelled->load(std::memory_order_acquire)) co_return;
180!
491
            MemoryViewBatchData batch;
180✔
492
            batch.buffer.assign(opt->data(), opt->data() + opt->size());
180!
493
            batch.offsets.push_back(0);
180!
494
            batch.lengths.push_back(static_cast<Py_ssize_t>(opt->size()));
180!
495
            if (!co_await producer.send(std::move(batch))) co_return;
240!
496
        }
183!
497
    }
11✔
498
    co_return;
2✔
499
}
508!
500

501
static CoroTask<void> spawn_raw_producers(
6!
502
    CoroScope &child,
503
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
504
    const std::vector<std::string> *files, const std::string *index_dir,
505
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
506
    std::atomic<bool> *cancelled_ptr, std::size_t max_workers) {
1!
507
    std::size_t num_workers = std::min(files->size(), max_workers);
1!
508
    auto file_chan =
1✔
509
        dftracer::utils::coro::make_channel<std::string>(num_workers);
1!
510

511
    for (std::size_t i = 0; i < num_workers; ++i) {
3✔
512
        child.spawn([out_chan, fc = file_chan, idx = *index_dir,
6!
513
                     checkpoint_size, auto_build_index, r = *rc,
6!
514
                     cancelled_ptr](CoroScope &) {
8✔
515
            return raw_file_worker(fc, out_chan, idx, checkpoint_size,
6!
516
                                   auto_build_index, r, cancelled_ptr);
4!
517
        });
518
    }
2✔
519

520
    child.spawn([fc = file_chan, files, cancelled_ptr](CoroScope &) {
3!
521
        return send_files_to_channel(fc, files, cancelled_ptr);
2!
522
    });
523
    co_return;
2✔
524
}
3!
525

526
static CoroTask<void> produce_raw_parallel(
20!
527
    CoroScope &scope, MemoryViewBatchIteratorState *sp, std::string dir_path,
528
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
529
    ReadConfig rc, std::size_t max_workers) {
2!
530
    try {
531
        PatternDirectoryScannerUtility scanner;
6!
532
        auto scan_input = PatternDirectoryScannerUtilityInput(
12!
533
            dir_path, {".pfw", ".pfw.gz"}, true, false);
6!
534
        auto entries = co_await scope.spawn(scanner, scan_input);
10!
535

536
        std::vector<std::string> files;
4✔
537
        files.reserve(entries.size());
4✔
538
        for (auto &e : entries) files.push_back(e.path.string());
5!
539
        std::sort(files.begin(), files.end());
2✔
540

541
        if (files.empty()) {
4✔
542
            sp->channel->close();
1!
543
            co_return;
1✔
544
        }
545

546
        auto *chan_ptr = sp->channel.get();
3✔
547
        auto *cancelled_ptr = &sp->cancelled;
3✔
548

549
        co_await scope.scope([chan_ptr, &files, &index_dir, checkpoint_size,
21!
550
                              auto_build_index, &rc, cancelled_ptr,
6✔
551
                              max_workers](CoroScope &child) -> CoroTask<void> {
4!
552
            co_await spawn_raw_producers(child, chan_ptr, &files, &index_dir,
8!
553
                                         checkpoint_size, auto_build_index, &rc,
3✔
554
                                         cancelled_ptr, max_workers);
3✔
555
        });
4!
556
    } catch (...) {
4✔
557
        sp->set_error(std::current_exception());
×
558
    }
×
559
}
22!
560

561
#ifdef DFTRACER_UTILS_ENABLE_ARROW
562

563
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
564
using dftracer::utils::utilities::common::arrow::ColumnType;
565
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
566

567
// Bump arena for string_views that must survive until builder.finish().
568
struct StringArena {
569
    static constexpr std::size_t BLOCK_SIZE = 64 * 1024;
570
    std::vector<std::vector<char>> blocks;
571
    std::size_t pos = 0;
572

573
    StringArena() { blocks.emplace_back(BLOCK_SIZE); }
×
574

575
    std::string_view push(const char *data, std::size_t len) {
×
576
        if (pos + len > blocks.back().size()) {
×
577
            blocks.emplace_back(std::max(BLOCK_SIZE, len));
×
578
            pos = 0;
×
579
        }
580
        char *dst = blocks.back().data() + pos;
×
581
        std::memcpy(dst, data, len);
×
582
        pos += len;
×
583
        return {dst, len};
×
584
    }
585

586
    void clear() {
×
587
        if (blocks.size() > 1) blocks.resize(1);
×
588
        pos = 0;
×
589
    }
×
590
};
591

592
// --- Row type constants (must match Python TYPE_* constants) ---
593
enum RowType : int8_t {
594
    ROW_EVENT = 0,
595
    ROW_FILE_HASH = 1,
596
    ROW_HOST_HASH = 2,
597
    ROW_STRING_HASH = 3,
598
    ROW_METADATA = 4,
599
    ROW_PROC_METADATA = 5,
600
    ROW_PROFILE = 6,
601
    ROW_SYSTEM = 7,
602
};
603

604
// --- IO category constants (must match Python IOCategory values) ---
605
enum IOCat : int8_t {
606
    IO_READ = 1,
607
    IO_WRITE = 2,
608
    IO_METADATA = 3,
609
    IO_PCTL = 4,
610
    IO_IPC = 5,
611
    IO_OTHER = 6,
612
    IO_SYNC = 7,
613
};
614

615
static int8_t get_io_cat(std::string_view func) {
×
616
    // READ
617
    if (func == "fread" || func == "pread" || func == "preadv" ||
×
618
        func == "read" || func == "readv")
×
619
        return IO_READ;
×
620
    // WRITE
621
    if (func == "fwrite" || func == "pwrite" || func == "pwritev" ||
×
622
        func == "write" || func == "writev")
×
623
        return IO_WRITE;
×
624
    // SYNC
625
    if (func == "fsync" || func == "fdatasync" || func == "msync" ||
×
626
        func == "sync")
×
627
        return IO_SYNC;
×
628
    // PCTL
629
    if (func == "exec" || func == "exit" || func == "fork" || func == "kill" ||
×
630
        func == "pipe" || func == "wait")
×
631
        return IO_PCTL;
×
632
    // IPC
633
    if (func == "msgctl" || func == "msgget" || func == "msgrcv" ||
×
634
        func == "msgsnd" || func == "semctl" || func == "semget" ||
×
635
        func == "semop" || func == "shmat" || func == "shmctl" ||
×
636
        func == "shmdt" || func == "shmget")
×
637
        return IO_IPC;
×
638
    // METADATA
639
    if (func == "__fxstat" || func == "__fxstat64" || func == "__lxstat" ||
×
640
        func == "__lxstat64" || func == "__xstat" || func == "__xstat64" ||
×
641
        func == "access" || func == "close" || func == "closedir" ||
×
642
        func == "fclose" || func == "fcntl" || func == "fopen" ||
×
643
        func == "fopen64" || func == "fseek" || func == "fstat" ||
×
644
        func == "fstatat" || func == "ftell" || func == "ftruncate" ||
×
645
        func == "link" || func == "lseek" || func == "lseek64" ||
×
646
        func == "mkdir" || func == "open" || func == "open64" ||
×
647
        func == "opendir" || func == "readdir" || func == "readlink" ||
×
648
        func == "remove" || func == "rename" || func == "rmdir" ||
×
649
        func == "seek" || func == "stat" || func == "unlink")
×
650
        return IO_METADATA;
×
651
    return IO_OTHER;
×
652
}
653

654
static bool str_iequal(std::string_view a, const char *b) {
×
655
    std::size_t len = std::strlen(b);
×
656
    if (a.size() != len) return false;
×
657
    for (std::size_t i = 0; i < len; ++i) {
×
658
        if (std::tolower(static_cast<unsigned char>(a[i])) !=
×
659
            static_cast<unsigned char>(b[i]))
×
660
            return false;
×
661
    }
662
    return true;
×
663
}
664

665
static bool str_contains_lower(std::string_view s, const char *needle) {
×
666
    std::size_t nlen = std::strlen(needle);
×
667
    if (s.size() < nlen) return false;
×
668
    for (std::size_t i = 0; i <= s.size() - nlen; ++i) {
×
669
        bool match = true;
×
670
        for (std::size_t j = 0; j < nlen; ++j) {
×
671
            if (std::tolower(static_cast<unsigned char>(s[i + j])) !=
×
672
                static_cast<unsigned char>(needle[j])) {
×
673
                match = false;
×
674
                break;
×
675
            }
676
        }
677
        if (match) return true;
×
678
    }
679
    return false;
×
680
}
681

682
// Normalize a raw JSON row (parsed with simdjson) into the semantic
683
// output schema.  Appends one row to `builder` with the full set of output
684
// columns.  Returns false if the row should be skipped (no valid name).
685
static bool normalize_row(RecordBatchBuilder &builder, StringArena &arena,
×
686
                          JsonParser &parser) {
687
    using SVH = JsonValueHelper;
688

689
    // --- Extract top-level fields ---
690
    auto ph = parser.get_string("ph").value_or(std::string_view{});
×
691
    auto name_sv = parser.get_string("name").value_or(std::string_view{});
×
692
    auto cat_sv = parser.get_string("cat").value_or(std::string_view{});
×
693
    auto pid_opt = parser.get_int64("pid");
×
694
    auto tid_opt = parser.get_int64("tid");
×
695
    auto ts_opt = parser.get_int64("ts");
×
696
    auto dur_opt = parser.get_int64("dur");
×
697

698
    // Helper lambdas to access args fields (need to rewind after each access)
699
    // We'll do a single pass over args instead
700
    std::optional<std::string_view> args_name, args_value, args_hhash,
×
701
        args_fhash;
×
702
    std::optional<int64_t> args_epoch, args_step, args_size_sum, args_ret;
×
703
    std::optional<int64_t> args_offset, args_image_idx, args_image_size;
×
704
    std::unordered_map<std::string, int64_t> args_int_map;
×
705
    std::unordered_map<std::string, double> args_float_map;
×
706

707
    parser.rewind();
×
708
    parser.for_each_field(
×
709
        "args", [&](std::string_view key, simdjson::ondemand::value val) {
×
710
            if (key == "name") {
×
711
                if (auto s = SVH::get_string(val)) args_name = s;
×
712
            } else if (key == "value") {
×
713
                if (auto s = SVH::get_string(val)) args_value = s;
×
714
            } else if (key == "hhash") {
×
715
                if (auto s = SVH::get_string(val)) args_hhash = s;
×
716
            } else if (key == "fhash") {
×
717
                if (auto s = SVH::get_string(val)) args_fhash = s;
×
718
            } else if (key == "epoch") {
×
719
                if (auto i = SVH::get_int64(val)) args_epoch = i;
×
720
            } else if (key == "step") {
×
721
                if (auto i = SVH::get_int64(val)) args_step = i;
×
722
            } else if (key == "size_sum") {
×
723
                if (auto i = SVH::get_int64(val)) args_size_sum = i;
×
724
            } else if (key == "ret") {
×
725
                if (auto i = SVH::get_int64(val)) args_ret = i;
×
726
            } else if (key == "offset") {
×
727
                if (auto i = SVH::get_int64(val)) args_offset = i;
×
728
            } else if (key == "image_idx") {
×
729
                if (auto i = SVH::get_int64(val)) args_image_idx = i;
×
730
            } else if (key == "image_size") {
×
731
                if (auto i = SVH::get_int64(val)) args_image_size = i;
×
732
            } else {
733
                // Store other int/float args for profile/sys columns
734
                if (auto i = SVH::get_int64(val)) {
×
735
                    args_int_map[std::string(key)] = *i;
×
736
                } else if (auto d = SVH::get_double(val)) {
×
737
                    args_float_map[std::string(key)] = *d;
×
738
                }
739
            }
740
        });
×
741

742
    // --- Type classification ---
743
    bool is_M = (ph == "M");
×
744
    bool is_C = (ph == "C");
×
745
    bool is_event = !is_M && !is_C;
×
746

747
    int8_t row_type = ROW_EVENT;
×
748
    if (is_M) {
×
749
        if (name_sv == "FH")
×
750
            row_type = ROW_FILE_HASH;
×
751
        else if (name_sv == "HH")
×
752
            row_type = ROW_HOST_HASH;
×
753
        else if (name_sv == "SH")
×
754
            row_type = ROW_STRING_HASH;
×
755
        else if (name_sv == "PR")
×
756
            row_type = ROW_PROC_METADATA;
×
757
        else
758
            row_type = ROW_METADATA;
×
759
    } else if (is_C) {
×
760
        row_type = str_iequal(cat_sv, "sys") ? ROW_SYSTEM : ROW_PROFILE;
×
761
    }
762
    bool is_hash = (row_type >= ROW_FILE_HASH && row_type <= ROW_STRING_HASH) ||
×
763
                   row_type == ROW_PROC_METADATA;
764
    bool is_profile = (row_type == ROW_PROFILE);
×
765
    bool is_sys = (row_type == ROW_SYSTEM);
×
766

767
    // Name: metadata rows use args.name if available
768
    std::string_view out_name = name_sv;
×
769
    if (is_M && args_name && !args_name->empty()) {
×
770
        out_name = *args_name;
×
771
    }
772
    if (out_name.empty()) return false;  // skip rows without name
×
773

774
    // --- Declare all output columns ---
775
    auto ci_type = builder.add_or_get_column("type", ColumnType::INT64);
×
776
    auto ci_cat = builder.add_or_get_column("cat", ColumnType::STRING);
×
777
    auto ci_name = builder.add_or_get_column("name", ColumnType::STRING);
×
778
    auto ci_pid = builder.add_or_get_column("pid", ColumnType::INT64);
×
779
    auto ci_tid = builder.add_or_get_column("tid", ColumnType::INT64);
×
780
    auto ci_hash = builder.add_or_get_column("hash", ColumnType::STRING);
×
781
    auto ci_value = builder.add_or_get_column("value", ColumnType::STRING);
×
782
    auto ci_host_hash =
783
        builder.add_or_get_column("host_hash", ColumnType::STRING);
×
784
    auto ci_file_hash =
785
        builder.add_or_get_column("file_hash", ColumnType::STRING);
×
786
    auto ci_epoch = builder.add_or_get_column("epoch", ColumnType::INT64);
×
787
    auto ci_step = builder.add_or_get_column("step", ColumnType::INT64);
×
788
    auto ci_ts = builder.add_or_get_column("ts", ColumnType::INT64);
×
789
    auto ci_dur = builder.add_or_get_column("dur", ColumnType::INT64);
×
790
    auto ci_te = builder.add_or_get_column("te", ColumnType::INT64);
×
791
    [[maybe_unused]] auto ci_trange =
792
        builder.add_or_get_column("trange", ColumnType::INT64);
×
793
    auto ci_io_cat = builder.add_or_get_column("io_cat", ColumnType::INT64);
×
794
    auto ci_size = builder.add_or_get_column("size", ColumnType::INT64);
×
795
    auto ci_offset = builder.add_or_get_column("offset", ColumnType::INT64);
×
796
    auto ci_image_id = builder.add_or_get_column("image_id", ColumnType::INT64);
×
797

798
    // --- Populate core columns ---
799
    builder.append_int64(ci_type, row_type);
×
800

801
    // cat (lowercased) - write into arena
802
    if (!cat_sv.empty()) {
×
803
        char lbuf[256];
804
        std::size_t clen = std::min(cat_sv.size(), sizeof(lbuf));
×
805
        for (std::size_t i = 0; i < clen; ++i)
×
806
            lbuf[i] = static_cast<char>(
×
807
                std::tolower(static_cast<unsigned char>(cat_sv[i])));
×
808
        builder.append_string(ci_cat, arena.push(lbuf, clen));
×
809
    } else {
810
        builder.append_null(ci_cat);
×
811
    }
812

813
    builder.append_string(ci_name, out_name);
×
814

815
    if (pid_opt) builder.append_int64(ci_pid, *pid_opt);
×
816
    if (tid_opt) builder.append_int64(ci_tid, *tid_opt);
×
817

818
    // hash / value
819
    if (is_hash && args_value && !args_value->empty())
×
820
        builder.append_string(ci_hash, *args_value);
×
821
    if (row_type == ROW_METADATA && args_value && !args_value->empty())
×
822
        builder.append_string(ci_value, *args_value);
×
823

824
    // host_hash / file_hash
825
    if (args_hhash && !args_hhash->empty())
×
826
        builder.append_string(ci_host_hash, *args_hhash);
×
827
    if (args_fhash && !args_fhash->empty())
×
828
        builder.append_string(ci_file_hash, *args_fhash);
×
829

830
    // epoch / step
831
    if (args_epoch && *args_epoch >= 0)
×
832
        builder.append_int64(ci_epoch, *args_epoch);
×
833
    if (args_step && *args_step >= 0) builder.append_int64(ci_step, *args_step);
×
834

835
    // --- Temporal ---
836
    bool has_ts = (is_event || is_C) && ts_opt.has_value();
×
837
    bool has_dur = dur_opt.has_value();
×
838
    int64_t ts_val = 0, dur_val = 0;
×
839
    if (has_ts) {
×
840
        ts_val = *ts_opt;
×
841
        builder.append_int64(ci_ts, ts_val);
×
842
    }
843
    if (is_event && has_ts && has_dur) {
×
844
        dur_val = *dur_opt;
×
845
        builder.append_int64(ci_dur, dur_val);
×
846
        builder.append_int64(ci_te, ts_val + dur_val);
×
847
    }
848

849
    // --- IO columns (events only) ---
850
    if (is_event) {
×
851
        bool is_posix_stdio =
852
            str_iequal(cat_sv, "posix") || str_iequal(cat_sv, "stdio");
×
853
        int8_t io_cat = IO_OTHER;
×
854

855
        // size priority: size_sum > POSIX ret > image_size
856
        if (args_size_sum) {
×
857
            builder.append_int64(ci_size, *args_size_sum);
×
858
            if (is_posix_stdio) io_cat = get_io_cat(out_name);
×
859
        } else if (is_posix_stdio) {
×
860
            io_cat = get_io_cat(out_name);
×
861
            if (args_ret && *args_ret > 0 &&
×
862
                (io_cat == IO_READ || io_cat == IO_WRITE))
×
863
                builder.append_int64(ci_size, *args_ret);
×
864
            if (args_offset && *args_offset >= 0)
×
865
                builder.append_int64(ci_offset, *args_offset);
×
866
        } else {
867
            if (args_image_idx && *args_image_idx > 0)
×
868
                builder.append_int64(ci_image_id, *args_image_idx);
×
869
            if (args_image_size && *args_image_size > 0 &&
×
870
                !str_contains_lower(out_name, "open"))
×
871
                builder.append_int64(ci_size, *args_image_size);
×
872
        }
873
        builder.append_int64(ci_io_cat, io_cat);
×
874
    }
875

876
    // --- Profile columns ---
877
    if (is_profile) {
×
878
        bool is_posix_stdio =
879
            str_iequal(cat_sv, "posix") || str_iequal(cat_sv, "stdio");
×
880
        int8_t io_cat = is_posix_stdio ? get_io_cat(out_name) : IO_OTHER;
×
881
        builder.append_int64(ci_io_cat, io_cat);
×
882

883
        static const char *profile_keys[] = {
884
            "count",      "count_max",  "count_min",  "count_sum",
885
            "dft_cnt",    "dur",        "dur_max",    "dur_min",
886
            "dur_sum",    "epoch",      "flags",      "offset",
887
            "offset_max", "offset_min", "offset_sum", "ret",
888
            "ret_max",    "ret_min",    "ret_sum",    "whence",
889
            "whence_max", "whence_min", "whence_sum", nullptr};
890
        for (const char **pk = profile_keys; *pk; ++pk) {
×
891
            auto it = args_int_map.find(*pk);
×
892
            if (it != args_int_map.end()) {
×
893
                auto idx = builder.add_or_get_column(*pk, ColumnType::INT64);
×
894
                builder.append_int64(idx, it->second);
×
895
            }
896
        }
897
    }
898

899
    // --- System columns ---
900
    if (is_sys) {
×
901
        static const char *sys_keys[] = {
902
            "user_pct", "system_pct",  "iowait_pct",   "idle_pct",
903
            "irq_pct",  "softirq_pct", "MemAvailable", "MemFree",
904
            "Cached",   "Dirty",       "Active",       nullptr};
905
        for (const char **sk = sys_keys; *sk; ++sk) {
×
906
            auto it = args_float_map.find(*sk);
×
907
            if (it != args_float_map.end()) {
×
908
                auto idx = builder.add_or_get_column(*sk, ColumnType::DOUBLE);
×
909
                builder.append_double(idx, it->second);
×
910
            }
911
        }
912
    }
913

914
    builder.end_row();
×
915
    return true;
×
916
}
×
917

918
// Flatten a simdjson object into "prefix.key" columns using native types.
919
// On type mismatch (same key, different type across rows), appends null.
920
static void flatten_object_into(RecordBatchBuilder &builder, StringArena &arena,
921
                                std::string_view prefix,
922
                                simdjson::ondemand::object obj) {
923
    using SVH = JsonValueHelper;
924
    char key_buf[512];
925

926
    for (auto field : obj) {
×
927
        if (field.error()) continue;
×
928

929
        auto key_result = field.unescaped_key();
930
        if (key_result.error()) continue;
×
931
        std::string_view sk = key_result.value_unsafe();
932

933
        auto val_result = field.value();
934
        if (val_result.error()) continue;
×
935
        auto sub_val = val_result.value_unsafe();
936

937
        std::size_t needed = prefix.size() + 1 + sk.size();
938
        if (needed >= sizeof(key_buf)) continue;
×
939
        std::memcpy(key_buf, prefix.data(), prefix.size());
940
        key_buf[prefix.size()] = '.';
941
        std::memcpy(key_buf + prefix.size() + 1, sk.data(), sk.size());
942
        std::string_view full_key(key_buf, needed);
943

944
        auto type_result = sub_val.type();
945
        if (type_result.error()) continue;
×
946
        auto json_type = type_result.value_unsafe();
947

948
        switch (json_type) {
×
949
            case simdjson::ondemand::json_type::number: {
950
                auto num_result = sub_val.get_number();
951
                if (num_result.error()) break;
×
952
                auto num = num_result.value_unsafe();
953
                if (num.is_int64()) {
×
954
                    auto idx =
955
                        builder.add_or_get_column(full_key, ColumnType::INT64);
×
956
                    if (builder.column_type(idx) == ColumnType::INT64)
×
957
                        builder.append_int64(idx, num.get_int64());
×
958
                    else
959
                        builder.append_null(idx);
×
960
                } else if (num.is_uint64()) {
×
961
                    auto idx =
962
                        builder.add_or_get_column(full_key, ColumnType::UINT64);
×
963
                    if (builder.column_type(idx) == ColumnType::UINT64)
×
964
                        builder.append_uint64(idx, num.get_uint64());
×
965
                    else
966
                        builder.append_null(idx);
×
967
                } else {
968
                    auto idx =
969
                        builder.add_or_get_column(full_key, ColumnType::DOUBLE);
×
970
                    if (builder.column_type(idx) == ColumnType::DOUBLE)
×
971
                        builder.append_double(idx, num.get_double());
×
972
                    else
973
                        builder.append_null(idx);
×
974
                }
975
                break;
976
            }
977
            case simdjson::ondemand::json_type::string: {
978
                auto str_result = sub_val.get_string();
979
                if (str_result.error()) break;
×
980
                auto str = str_result.value_unsafe();
981
                auto idx =
982
                    builder.add_or_get_column(full_key, ColumnType::STRING);
×
983
                if (builder.column_type(idx) == ColumnType::STRING)
×
984
                    builder.append_string(idx, str);
×
985
                else
986
                    builder.append_null(idx);
×
987
                break;
988
            }
989
            case simdjson::ondemand::json_type::boolean: {
990
                auto bool_result = sub_val.get_bool();
991
                if (bool_result.error()) break;
×
992
                auto b = bool_result.value_unsafe();
993
                auto idx =
994
                    builder.add_or_get_column(full_key, ColumnType::BOOL);
×
995
                if (builder.column_type(idx) == ColumnType::BOOL)
×
996
                    builder.append_bool(idx, b);
×
997
                else
998
                    builder.append_null(idx);
×
999
                break;
1000
            }
1001
            case simdjson::ondemand::json_type::null: {
1002
                auto existing = builder.find_column(full_key);
×
1003
                if (existing) builder.append_null(*existing);
×
1004
                break;
1005
            }
1006
            case simdjson::ondemand::json_type::object:
1007
            case simdjson::ondemand::json_type::array: {
1008
                // Serialize nested object/array to JSON string
1009
                auto json_str = SVH::to_json_string(sub_val);
×
1010
                auto idx =
1011
                    builder.add_or_get_column(full_key, ColumnType::STRING);
×
1012
                if (json_str) {
×
1013
                    builder.append_string(
×
1014
                        idx, arena.push(json_str->data(), json_str->size()));
1015
                } else {
1016
                    builder.append_null(idx);
×
1017
                }
1018
                break;
1019
            }
1020
            default:
1021
                break;
1022
        }
1023
    }
1024
}
1025

1026
static bool build_arrow_row(RecordBatchBuilder &builder, JsonParser &parser,
×
1027
                            StringArena &arena, bool normalize) {
1028
    if (normalize) return normalize_row(builder, arena, parser);
×
1029

1030
    using SVH = JsonValueHelper;
1031
    parser.for_each_field([&](std::string_view key_sv,
×
1032
                              simdjson::ondemand::value val) {
1033
        auto type_result = val.type();
×
1034
        if (type_result.error()) return;
×
1035
        auto json_type = type_result.value_unsafe();
×
1036
        switch (json_type) {
×
1037
            case simdjson::ondemand::json_type::number: {
1038
                auto num_result = val.get_number();
×
1039
                if (num_result.error()) break;
×
1040
                auto num = num_result.value_unsafe();
×
1041
                if (num.is_int64()) {
×
1042
                    std::size_t idx =
1043
                        builder.add_or_get_column(key_sv, ColumnType::INT64);
×
1044
                    builder.append_int64(idx, num.get_int64());
×
1045
                } else if (num.is_uint64()) {
×
1046
                    std::size_t idx =
1047
                        builder.add_or_get_column(key_sv, ColumnType::UINT64);
×
1048
                    builder.append_uint64(idx, num.get_uint64());
×
1049
                } else {
1050
                    std::size_t idx =
1051
                        builder.add_or_get_column(key_sv, ColumnType::DOUBLE);
×
1052
                    builder.append_double(idx, num.get_double());
×
1053
                }
1054
                break;
×
1055
            }
1056
            case simdjson::ondemand::json_type::string: {
1057
                auto str_result = val.get_string();
×
1058
                if (str_result.error()) break;
×
1059
                auto str = str_result.value_unsafe();
×
1060
                std::size_t idx =
1061
                    builder.add_or_get_column(key_sv, ColumnType::STRING);
×
1062
                builder.append_string(idx, str);
×
1063
                break;
×
1064
            }
1065
            case simdjson::ondemand::json_type::boolean: {
1066
                auto bool_result = val.get_bool();
×
1067
                if (bool_result.error()) break;
×
1068
                auto b = bool_result.value_unsafe();
×
1069
                std::size_t idx =
1070
                    builder.add_or_get_column(key_sv, ColumnType::BOOL);
×
1071
                builder.append_bool(idx, b);
×
1072
                break;
×
1073
            }
1074
            case simdjson::ondemand::json_type::null: {
1075
                auto existing = builder.find_column(key_sv);
×
1076
                if (existing) builder.append_null(*existing);
×
1077
                break;
×
1078
            }
1079
            case simdjson::ondemand::json_type::object:
1080
            case simdjson::ondemand::json_type::array: {
1081
                auto json_str = SVH::to_json_string(val);
×
1082
                std::size_t idx =
1083
                    builder.add_or_get_column(key_sv, ColumnType::STRING);
×
1084
                if (json_str) {
×
1085
                    builder.append_string(
×
1086
                        idx, arena.push(json_str->data(), json_str->size()));
×
1087
                } else {
1088
                    builder.append_null(idx);
×
1089
                }
1090
                break;
1091
            }
×
1092
            default:
1093
                break;
×
1094
        }
1095
    });
1096
    builder.end_row();
×
1097
    return true;
×
1098
}
1099

1100
static bool process_json_line(RecordBatchBuilder &builder, JsonParser &parser,
1101
                              StringArena &arena, std::string_view content,
1102
                              bool normalize) {
1103
    const char *trimmed;
1104
    std::size_t trimmed_length;
1105
    if (!dftracer::utils::json_trim_and_validate_with_comma(
×
1106
            content.data(), content.size(), trimmed, trimmed_length))
1107
        return false;
1108
    if (!parser.parse(std::string_view(trimmed, trimmed_length))) return false;
×
1109
    return build_arrow_row(builder, parser, arena, normalize);
×
1110
}
1111

1112
static CoroTask<void> produce_arrow_for_file(
×
1113
    dftracer::utils::coro::Channel<ArrowExportResult> *chan,
1114
    std::string file_path, std::string index_dir, std::size_t checkpoint_size,
1115
    bool auto_build_index, ReadConfig rc, std::size_t batch_size,
1116
    bool normalize, std::atomic<bool> *cancelled) {
1117
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer(chan);
1118
    auto guard = producer.guard();
1119

1120
    TraceReaderConfig cfg;
1121
    cfg.file_path = std::move(file_path);
1122
    cfg.index_dir = std::move(index_dir);
1123
    cfg.checkpoint_size = checkpoint_size;
1124
    cfg.auto_build_index = auto_build_index;
1125

1126
    TraceReader reader(std::move(cfg));
1127

1128
    // Fast path: non-normalized Arrow build happens inside TraceReader.
1129
    // Normalize still goes through read_json + build_arrow_row for the
1130
    // richer schema derivation.
1131
    if (!normalize) {
1132
        auto batch_gen = reader.read_arrow(rc, batch_size);
1133
        while (auto batch_opt = co_await batch_gen.next()) {
1134
            if (cancelled->load(std::memory_order_acquire)) co_return;
1135
            if (!co_await producer.send(std::move(*batch_opt))) co_return;
1136
        }
1137
        co_return;
1138
    }
1139

1140
    auto gen = reader.read_json(rc);
1141
    RecordBatchBuilder builder;
1142
    builder.reserve(batch_size);
1143
    StringArena arena;
1144

1145
    while (auto opt = co_await gen.next()) {
1146
        if (cancelled->load(std::memory_order_acquire)) co_return;
1147
        if (!build_arrow_row(builder, *opt->parser, arena, normalize)) continue;
1148
        if (builder.num_rows() >= batch_size) {
1149
            auto result = builder.finish();
1150
            arena.clear();
1151
            if (!co_await producer.send(std::move(result))) co_return;
1152
            if (!builder.is_schema_locked()) builder.lock_schema();
1153
            builder.reset(true);
1154
            builder.reserve(batch_size);
1155
        }
1156
    }
1157
    if (builder.num_rows() > 0) {
1158
        co_await producer.send(builder.finish());
1159
    }
1160
    co_return;
1161
}
×
1162

1163
static CoroTask<void> file_worker(
×
1164
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
1165
    dftracer::utils::coro::Channel<ArrowExportResult> *out_chan,
1166
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1167
    ReadConfig rc, std::size_t batch_size, bool normalize,
1168
    std::atomic<bool> *cancelled) {
1169
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer(
1170
        out_chan);
1171
    auto guard = producer.guard();
1172

1173
    while (auto file_path = co_await file_chan->receive()) {
1174
        if (cancelled->load(std::memory_order_acquire)) co_return;
1175
        TraceReaderConfig cfg;
1176
        cfg.file_path = std::move(*file_path);
1177
        cfg.index_dir = index_dir;
1178
        cfg.checkpoint_size = checkpoint_size;
1179
        cfg.auto_build_index = auto_build_index;
1180

1181
        TraceReader reader(std::move(cfg));
1182

1183
        if (!normalize) {
1184
            auto batch_gen = reader.read_arrow(rc, batch_size);
1185
            while (auto batch_opt = co_await batch_gen.next()) {
1186
                if (cancelled->load(std::memory_order_acquire)) co_return;
1187
                if (!co_await producer.send(std::move(*batch_opt))) co_return;
1188
            }
1189
            continue;
1190
        }
1191

1192
        auto gen = reader.read_json(rc);
1193
        RecordBatchBuilder builder;
1194
        builder.reserve(batch_size);
1195
        StringArena arena;
1196

1197
        while (auto opt = co_await gen.next()) {
1198
            if (cancelled->load(std::memory_order_acquire)) co_return;
1199
            if (!build_arrow_row(builder, *opt->parser, arena, normalize))
1200
                continue;
1201
            if (builder.num_rows() >= batch_size) {
1202
                auto result = builder.finish();
1203
                arena.clear();
1204
                if (!co_await producer.send(std::move(result))) co_return;
1205
                if (!builder.is_schema_locked()) builder.lock_schema();
1206
                builder.reset(true);
1207
                builder.reserve(batch_size);
1208
            }
1209
        }
1210
        if (builder.num_rows() > 0) {
1211
            if (!co_await producer.send(builder.finish())) co_return;
1212
        }
1213
    }
1214
    co_return;
1215
}
×
1216

1217
// Extract AND-of-EQ leaves from a Query AST. Returns nullopt if the predicate
1218
// shape is anything else (NE, range ops, IN, NOT, OR), in which case the
1219
// uniform-match shortcut does not apply.
1220
static std::optional<std::vector<std::pair<std::string, std::string>>>
1221
extract_eq_leaves(
×
1222
    const dftracer::utils::utilities::common::query::QueryNode &node) {
1223
    namespace q_ns = dftracer::utils::utilities::common::query;
1224
    using LeafVec = std::vector<std::pair<std::string, std::string>>;
1225

1226
    auto literal_to_string = [](const q_ns::LiteralNode &lit) -> std::string {
×
1227
        return std::visit(
1228
            [](auto &&v) -> std::string {
×
1229
                using T = std::decay_t<decltype(v)>;
1230
                if constexpr (std::is_same_v<T, std::string>)
1231
                    return v;
×
1232
                else if constexpr (std::is_same_v<T, bool>)
1233
                    return v ? "true" : "false";
×
1234
                else if constexpr (std::is_same_v<T, int64_t>)
1235
                    return std::to_string(v);
×
1236
                else if constexpr (std::is_same_v<T, uint64_t>)
1237
                    return std::to_string(v);
×
1238
                else if constexpr (std::is_same_v<T, double>)
1239
                    return std::to_string(v);
×
1240
                else
1241
                    return {};
1242
            },
1243
            lit.value);
×
1244
    };
1245

1246
    return std::visit(
1247
        [&](const auto &n) -> std::optional<LeafVec> {
×
1248
            using T = std::decay_t<decltype(n)>;
1249
            if constexpr (std::is_same_v<T, q_ns::CompareNode>) {
1250
                if (n.op != q_ns::CompareOp::EQ) return std::nullopt;
×
1251
                return LeafVec{{n.field.path, literal_to_string(n.value)}};
×
1252
            } else if constexpr (std::is_same_v<T, q_ns::AndNode>) {
1253
                auto l = extract_eq_leaves(*n.left);
×
1254
                if (!l) return std::nullopt;
×
1255
                auto r = extract_eq_leaves(*n.right);
×
1256
                if (!r) return std::nullopt;
×
1257
                l->insert(l->end(), r->begin(), r->end());
×
1258
                return l;
×
1259
            } else {
×
1260
                return std::nullopt;
×
1261
            }
1262
        },
1263
        node.data);
×
1264
}
1265

1266
// True iff every checkpoint in `chunk_idxs` has dim_stats min == max == literal
1267
// for every leaf. Empty leaves -> false (no shortcut). Missing dim_stats for
1268
// any (chunk, leaf) -> false (we don't know, play safe).
1269
static bool all_chunks_uniform_match(
×
1270
    const dftracer::utils::utilities::indexer::IndexDatabase &db, int fid,
1271
    const std::vector<std::pair<std::string, std::string>> &leaves,
1272
    const std::vector<std::uint64_t> &chunk_idxs) {
1273
    if (leaves.empty() || chunk_idxs.empty()) return false;
×
1274
    namespace indexing = dftracer::utils::utilities::composites::dft::indexing;
1275

1276
    for (const auto &[dim, val] : leaves) {
×
1277
        auto rows = db.query_chunk_dimension_stats_for_dimension(fid, dim);
×
1278
        if (rows.empty()) return false;
×
1279
        std::unordered_map<std::uint64_t,
1280
                           const indexing::ChunkDimensionStatsResult *>
1281
            by_ckpt;
×
1282
        by_ckpt.reserve(rows.size());
×
1283
        for (const auto &r : rows) by_ckpt.emplace(r.checkpoint_idx, &r);
×
1284
        for (auto cidx : chunk_idxs) {
×
1285
            auto it = by_ckpt.find(cidx);
×
1286
            if (it == by_ckpt.end()) return false;
×
1287
            const auto &ds = *it->second;
×
1288
            if (ds.min_value != val || ds.max_value != val) return false;
×
1289
        }
1290
    }
×
1291
    return true;
×
1292
}
1293

1294
// Byte-range work unit for checkpoint-level parallelism. Each unit covers
1295
// one or more consecutive checkpoints from a single file. Decompression of
1296
// a single gz file is sequential per gzip stream, so splitting at
1297
// checkpoint-aligned byte offsets is what lets multiple workers share the
1298
// decode work for one file.
1299
struct ArrowWorkItem {
211✔
1300
    std::string file_path;
1301
    std::size_t start_byte = 0;
211✔
1302
    std::size_t end_byte = 0;
211✔
1303
    bool start_at_checkpoint = false;
211✔
1304
    bool end_at_checkpoint = false;
211✔
1305
    // When true, every kept chunk for this byte range is uniform-matching
1306
    // (dim_stats min == max == predicate literal for every AND-of-EQ leaf),
1307
    // so per-event predicate eval is skippable.
1308
    bool chunk_prune_only = false;
211✔
1309
    // Line-range work items override byte ranges: the worker passes these
1310
    // down as LINE_RANGE on the read, and the gzip stream resolves them to
1311
    // byte offsets via the checkpoint index. 0 = no line constraint.
1312
    std::size_t start_line = 0;
211✔
1313
    std::size_t end_line = 0;
211✔
1314
};
1315

1316
static std::vector<ArrowWorkItem> enumerate_work_items(
72✔
1317
    const std::vector<std::string> &files, const std::string &index_dir,
1318
    const std::string &query_str, std::size_t max_workers,
1319
    std::size_t clip_start_byte = 0, std::size_t clip_end_byte = 0,
1320
    std::size_t clip_start_line = 0, std::size_t clip_end_line = 0) {
1321
    namespace dft_internal =
1322
        dftracer::utils::utilities::composites::dft::internal;
1323
    namespace indexer_ns = dftracer::utils::utilities::indexer;
1324
    namespace indexing = dftracer::utils::utilities::composites::dft::indexing;
1325

1326
    std::vector<ArrowWorkItem> items;
72✔
1327
    items.reserve(files.size() * 4);
72!
1328

1329
    const bool has_line_clip = (clip_start_line > 0 || clip_end_line > 0);
72!
1330
    auto push_unsplit = [&](const std::string &fp) {
150✔
1331
        ArrowWorkItem item;
114✔
1332
        item.file_path = fp;
114!
1333
        item.start_line = clip_start_line;
114✔
1334
        item.end_line = clip_end_line;
114✔
1335
        items.push_back(std::move(item));
114!
1336
    };
114✔
1337

1338
    // Parse the query once. Pruner input copies a Query, so we keep the
1339
    // parsed form around to feed each ChunkPrunerInput without re-parsing.
1340
    std::optional<dftracer::utils::utilities::common::query::Query> parsed;
72✔
1341
    if (!query_str.empty()) {
72✔
1342
        auto r = dftracer::utils::utilities::common::query::Query::from_string(
3!
1343
            query_str);
6!
1344
        if (r) parsed = std::move(*r);
6!
1345
    }
6✔
1346

1347
    // All files in a directory-mode scan share the same `.dftindex` root.
1348
    // Group files by their resolved index path so we can open the RocksDB
1349
    // once per index and reuse it to prune every file against that handle.
1350
    std::unordered_map<std::string, std::vector<std::size_t>> by_index;
72✔
1351
    for (std::size_t i = 0; i < files.size(); ++i) {
194✔
1352
        std::string index_path =
1353
            dft_internal::determine_index_path(files[i], index_dir);
122!
1354
        by_index[index_path].push_back(i);
122!
1355
    }
122✔
1356

1357
    for (auto &entry : by_index) {
150!
1358
        const auto &index_path = entry.first;
78✔
1359
        const auto &file_idxs = entry.second;
78✔
1360
        if (!fs::exists(index_path)) {
78!
1361
            for (auto i : file_idxs) push_unsplit(files[i]);
156✔
1362
            continue;
62✔
1363
        }
31✔
1364
        std::unique_ptr<indexer_ns::IndexDatabase> idx_db;
16✔
1365
        try {
1366
            idx_db = std::make_unique<indexer_ns::IndexDatabase>(
16!
1367
                index_path,
8✔
1368
                dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
16!
1369
        } catch (...) {
8✔
1370
            for (auto i : file_idxs) push_unsplit(files[i]);
×
1371
            continue;
1372
        }
×
1373

1374
        // Resolve fid + checkpoints per file (cheap queries).
1375
        struct FileCtx {
1376
            std::size_t file_idx;
1377
            int fid;
1378
            std::vector<indexer_ns::IndexerCheckpoint> ckpts;
1379
        };
1380
        std::vector<FileCtx> file_ctxs;
16✔
1381
        file_ctxs.reserve(file_idxs.size());
16!
1382
        for (auto i : file_idxs) {
44✔
1383
            FileCtx fc;
28✔
1384
            fc.file_idx = i;
28✔
1385
            fc.fid = idx_db->get_file_info_id(
42✔
1386
                indexer_ns::internal::get_logical_path(files[i]));
42!
1387
            if (fc.fid < 0) {
28!
1388
                push_unsplit(files[i]);
×
1389
                continue;
×
1390
            }
1391
            fc.ckpts = idx_db->query_checkpoints(fc.fid);
28!
1392
            if (fc.ckpts.empty()) {
28✔
1393
                push_unsplit(files[i]);
20!
1394
                continue;
20✔
1395
            }
1396
            std::sort(fc.ckpts.begin(), fc.ckpts.end(),
8!
1397
                      [](const auto &a, const auto &b) {
×
1398
                          return a.first_line_num < b.first_line_num;
×
1399
                      });
1400
            file_ctxs.push_back(std::move(fc));
8!
1401
        }
28✔
1402

1403
        // Batch-prune all files against the shared index: dim_stats and
1404
        // chunk_statistics are loaded in one RocksDB scan each instead of
1405
        // one scan per file.
1406
        std::vector<indexing::ChunkPrunerOutput> pruner_outs(file_ctxs.size());
16!
1407
        if (parsed && !file_ctxs.empty()) {
16!
1408
            indexing::ChunkPrunerBatchInput batch_in;
×
1409
            batch_in.index_path = index_path;
×
1410
            batch_in.external_db = idx_db.get();
×
1411
            batch_in.items.reserve(file_ctxs.size());
×
1412
            for (auto &fc : file_ctxs) {
×
1413
                batch_in.items.push_back({files[fc.file_idx], *parsed});
×
1414
            }
1415
            indexing::ChunkPrunerUtility pruner;
×
1416
            auto batch_out = pruner.process_batch(batch_in);
×
1417
            if (batch_out.success) {
×
1418
                pruner_outs = std::move(batch_out.outputs);
×
1419
            }
1420
        }
×
1421

1422
        // For AND-of-EQ predicates, precompute uniform-match leaves once.
1423
        // Per-file pure_match is checked inline below and lets workers skip
1424
        // per-event predicate eval on chunks where dim_stats min == max ==
1425
        // literal for every leaf.
1426
        std::optional<std::vector<std::pair<std::string, std::string>>>
1427
            eq_leaves;
16✔
1428
        if (parsed) eq_leaves = extract_eq_leaves(parsed->root());
16!
1429

1430
        for (std::size_t fc_idx = 0; fc_idx < file_ctxs.size(); ++fc_idx) {
24✔
1431
            auto &fc = file_ctxs[fc_idx];
8✔
1432
            const auto &fp = files[fc.file_idx];
8✔
1433

1434
            // Pruner chunk_idx semantics: 0-indexed over uncompressed
1435
            // slices. fc.ckpts holds gzip recovery points; recovery point
1436
            // fc.ckpts[k] sits at the START of pruner chunk (k+1). Pruner
1437
            // chunk 0 has no recovery point at its start (decoded from
1438
            // gzip stream start). Total pruner chunks = fc.ckpts.size()+1.
1439
            const std::size_t total_chunks = fc.ckpts.size() + 1;
8✔
1440
            auto chunk_start_byte = [&](std::uint64_t cidx) -> std::size_t {
16✔
1441
                if (cidx == 0) return 0;
12✔
1442
                return fc.ckpts[cidx - 1].uc_offset;
8✔
1443
            };
10✔
1444
            auto chunk_end_byte = [&](std::uint64_t cidx) -> std::size_t {
16✔
1445
                if (cidx == 0)
12✔
1446
                    return fc.ckpts.empty() ? 0 : fc.ckpts[0].uc_offset;
4!
1447
                std::size_t k = cidx - 1;
8✔
1448
                return fc.ckpts[k].uc_offset + fc.ckpts[k].uc_size;
8✔
1449
            };
10✔
1450
            // Line ranges for a chunk. Chunk 0 covers everything before the
1451
            // first recovery point; chunk k>=1 spans recovery point (k-1).
1452
            auto chunk_first_line = [&](std::uint64_t cidx) -> std::size_t {
16✔
1453
                if (cidx == 0) return 1;
12✔
1454
                return fc.ckpts[cidx - 1].first_line_num;
8✔
1455
            };
10✔
1456
            auto chunk_last_line = [&](std::uint64_t cidx) -> std::size_t {
16✔
1457
                if (cidx == 0) {
12✔
1458
                    if (fc.ckpts.empty()) return SIZE_MAX;
4!
1459
                    return fc.ckpts[0].first_line_num > 0
4!
1460
                               ? fc.ckpts[0].first_line_num - 1
4!
1461
                               : 0;
2✔
1462
                }
1463
                return fc.ckpts[cidx - 1].last_line_num;
8✔
1464
            };
10✔
1465

1466
            std::vector<std::uint64_t> keep_chunks;
8✔
1467
            keep_chunks.reserve(total_chunks);
8!
1468
            if (parsed) {
8!
1469
                const auto &pr = pruner_outs[fc_idx];
×
1470
                if (pr.success && !pr.file_may_match) {
×
1471
                    continue;  // whole file pruned
×
1472
                }
1473
                if (pr.success && !pr.candidate_checkpoints.empty() &&
×
1474
                    pr.candidate_checkpoints.size() < pr.total_checkpoints) {
×
1475
                    for (auto cidx : pr.candidate_checkpoints) {
×
1476
                        if (cidx < total_chunks) keep_chunks.push_back(cidx);
×
1477
                    }
1478
                    std::sort(keep_chunks.begin(), keep_chunks.end());
×
1479
                    keep_chunks.erase(
×
1480
                        std::unique(keep_chunks.begin(), keep_chunks.end()),
×
1481
                        keep_chunks.end());
×
1482
                } else {
1483
                    for (std::uint64_t c = 0; c < total_chunks; ++c)
×
1484
                        keep_chunks.push_back(c);
×
1485
                }
1486
            } else {
1487
                for (std::uint64_t c = 0; c < total_chunks; ++c)
24✔
1488
                    keep_chunks.push_back(c);
16!
1489
            }
1490

1491
            // Intersect with the user's line range so workers only touch
1492
            // chunks that actually overlap it. Each work item carries the
1493
            // sub-line-range; LINE_RANGE on the read maps it back to bytes
1494
            // via the same checkpoint table the gzip stream uses.
1495
            if (has_line_clip) {
8✔
1496
                std::size_t lo = clip_start_line > 0 ? clip_start_line : 1;
4!
1497
                std::size_t hi = clip_end_line > 0 ? clip_end_line : SIZE_MAX;
4!
1498
                std::vector<std::uint64_t> filtered;
4✔
1499
                filtered.reserve(keep_chunks.size());
4!
1500
                for (auto c : keep_chunks) {
12✔
1501
                    std::size_t cf = chunk_first_line(c);
8!
1502
                    std::size_t cl = chunk_last_line(c);
8!
1503
                    if (cl < lo || cf > hi) continue;
8✔
1504
                    filtered.push_back(c);
4!
1505
                }
1506
                keep_chunks = std::move(filtered);
4✔
1507
            }
4✔
1508

1509
            if (keep_chunks.empty()) continue;
8✔
1510

1511
            // All-or-nothing per file: if every kept chunk is uniform-matching
1512
            // for every leaf, every work item from this file gets the
1513
            // chunk_prune_only fast path. Mixed files fall back to per-event
1514
            // eval to stay safe.
1515
            bool file_pure_match = false;
8✔
1516
            if (eq_leaves && !eq_leaves->empty() && idx_db) {
8!
1517
                file_pure_match = all_chunks_uniform_match(
×
1518
                    *idx_db, fc.fid, *eq_leaves, keep_chunks);
×
1519
            }
1520

1521
            std::size_t target_ranges = std::max<std::size_t>(1, max_workers);
8!
1522
            std::size_t per_range = std::max<std::size_t>(
8!
1523
                1, (keep_chunks.size() + target_ranges - 1) / target_ranges);
8✔
1524

1525
            std::size_t group_start = 0;
8✔
1526
            while (group_start < keep_chunks.size()) {
20✔
1527
                std::size_t group_end = group_start;
12✔
1528
                std::size_t emitted = 0;
12✔
1529
                while (group_end < keep_chunks.size() && emitted < per_range) {
24✔
1530
                    if (group_end > group_start &&
12!
1531
                        keep_chunks[group_end] !=
×
1532
                            keep_chunks[group_end - 1] + 1) {
×
1533
                        break;
×
1534
                    }
1535
                    ++group_end;
12✔
1536
                    ++emitted;
12✔
1537
                }
1538
                std::uint64_t scidx = keep_chunks[group_start];
12✔
1539
                std::uint64_t ecidx = keep_chunks[group_end - 1];
12✔
1540
                std::size_t start_byte = chunk_start_byte(scidx);
12!
1541
                std::size_t end_byte = chunk_end_byte(ecidx);
12!
1542
                // start_at_checkpoint: a gzip recovery point sits at
1543
                // start_byte (true for any cidx>=1; false for the implicit
1544
                // chunk 0 which decodes from stream start).
1545
                bool start_at_checkpoint = (scidx >= 1);
12✔
1546
                bool end_at_checkpoint = (group_end < keep_chunks.size());
12✔
1547
                if (has_line_clip) {
12✔
1548
                    std::size_t lo = clip_start_line > 0 ? clip_start_line : 1;
4!
1549
                    std::size_t hi =
4✔
1550
                        clip_end_line > 0 ? clip_end_line : SIZE_MAX;
4!
1551
                    std::size_t cluster_first = chunk_first_line(scidx);
4!
1552
                    std::size_t cluster_last = chunk_last_line(ecidx);
4!
1553
                    std::size_t item_start =
2✔
1554
                        std::max<std::size_t>(lo, cluster_first);
4!
1555
                    std::size_t item_end =
2✔
1556
                        std::min<std::size_t>(hi, cluster_last);
4!
1557
                    if (item_start > item_end) {
4!
1558
                        group_start = group_end;
×
1559
                        continue;
×
1560
                    }
1561
                    ArrowWorkItem item;
4✔
1562
                    item.file_path = fp;
4!
1563
                    item.chunk_prune_only = file_pure_match;
4✔
1564
                    item.start_line = item_start;
4✔
1565
                    item.end_line = item_end;
4✔
1566
                    items.push_back(std::move(item));
4!
1567
                    group_start = group_end;
4✔
1568
                    continue;
2✔
1569
                }
4✔
1570
                if (clip_end_byte > clip_start_byte) {
8✔
1571
                    if (start_byte < clip_start_byte) {
4✔
1572
                        start_byte = clip_start_byte;
×
1573
                        start_at_checkpoint = false;
×
1574
                    }
1575
                    if (end_byte > clip_end_byte) {
4✔
1576
                        end_byte = clip_end_byte;
4✔
1577
                        end_at_checkpoint = false;
4✔
1578
                    }
2✔
1579
                    if (start_byte >= end_byte) {
4✔
1580
                        group_start = group_end;
2✔
1581
                        continue;
2✔
1582
                    }
1583
                }
1✔
1584
                items.push_back({fp, start_byte, end_byte, start_at_checkpoint,
6!
1585
                                 end_at_checkpoint, file_pure_match});
6✔
1586
                group_start = group_end;
6✔
1587
            }
1588
        }
8✔
1589
    }
16✔
1590
    return items;
108✔
1591
}
72!
1592

1593
static CoroTask<void> send_work_items_to_channel(
464!
1594
    std::shared_ptr<dftracer::utils::coro::Channel<ArrowWorkItem>> chan,
1595
    const std::vector<ArrowWorkItem> *items, std::atomic<bool> *cancelled) {
36!
1596
    for (const auto &it : *items) {
346✔
1597
        if (cancelled->load(std::memory_order_acquire)) break;
186!
1598
        if (!co_await chan->send(it)) break;
284!
1599
    }
62✔
1600
    chan->close();
36!
1601
    co_return;
36✔
1602
}
320!
1603

1604
static CoroTask<void> checkpoint_worker(
333!
1605
    std::shared_ptr<dftracer::utils::coro::Channel<ArrowWorkItem>> work_chan,
1606
    dftracer::utils::coro::Channel<ArrowExportResult> *out_chan,
1607
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1608
    ReadConfig rc, std::size_t batch_size, bool normalize,
1609
    std::atomic<bool> *cancelled) {
52!
1610
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer(
104!
1611
        out_chan);
52✔
1612
    auto guard = producer.guard();
52!
1613

1614
    // Cache readers keyed by file path so we don't re-probe the same file
1615
    // when successive work items land on it.
1616
    std::unordered_map<std::string, std::shared_ptr<TraceReader>> readers;
52✔
1617

1618
    while (auto item = co_await work_chan->receive()) {
331!
1619
        if (cancelled->load(std::memory_order_acquire)) co_return;
184!
1620

1621
        auto &reader_ptr = readers[item->file_path];
184!
1622
        if (!reader_ptr) {
184✔
1623
            TraceReaderConfig cfg;
64✔
1624
            cfg.file_path = item->file_path;
64✔
1625
            cfg.index_dir = index_dir;
62✔
1626
            cfg.checkpoint_size = checkpoint_size;
60✔
1627
            cfg.auto_build_index = auto_build_index;
60✔
1628
            reader_ptr = std::make_shared<TraceReader>(std::move(cfg));
60!
1629
        }
60✔
1630

1631
        ReadConfig local_rc = rc;
180!
1632
        if (item->start_line > 0 || item->end_line > 0) {
180✔
1633
            // Line-range work items: the read drives off LINE_RANGE; the
1634
            // gzip stream resolves it back to byte offsets via checkpoints.
1635
            local_rc.start_line = item->start_line;
124✔
1636
            local_rc.end_line = item->end_line;
124✔
1637
            local_rc.start_byte = 0;
124✔
1638
            local_rc.end_byte = 0;
124✔
1639
            local_rc.start_at_checkpoint = false;
124✔
1640
            local_rc.end_at_checkpoint = false;
124✔
1641
        } else {
124✔
1642
            local_rc.start_byte = item->start_byte;
56✔
1643
            local_rc.end_byte = item->end_byte;
56✔
1644
            local_rc.start_at_checkpoint = item->start_at_checkpoint;
56✔
1645
            local_rc.end_at_checkpoint = item->end_at_checkpoint;
56✔
1646
        }
1647
        // Pruning already happened at enumeration time; avoid the per-
1648
        // work-item RocksDB opens that would otherwise dwarf the actual
1649
        // read cost at directory scale (256 files * N ranges).
1650
        local_rc.skip_pruning = true;
180✔
1651
        // chunks pre-classified as uniform-matching skip per-event eval.
1652
        if (item->chunk_prune_only) local_rc.chunk_prune_only = true;
180!
1653

1654
        if (!normalize) {
180!
1655
            auto batch_gen = reader_ptr->read_arrow(local_rc, batch_size);
180!
1656
            while (auto batch_opt = co_await batch_gen.next()) {
562!
1657
                if (cancelled->load(std::memory_order_acquire)) co_return;
243!
1658
                if (!co_await producer.send(std::move(*batch_opt))) co_return;
324!
1659
            }
305!
1660
            continue;
62✔
1661
        }
62✔
1662

1663
        auto gen = reader_ptr->read_json(local_rc);
×
1664
        RecordBatchBuilder builder;
×
1665
        builder.reserve(batch_size);
×
1666
        StringArena arena;
×
1667

1668
        while (auto opt = co_await gen.next()) {
×
1669
            if (cancelled->load(std::memory_order_acquire)) co_return;
×
1670
            if (!build_arrow_row(builder, *opt->parser, arena, normalize))
×
1671
                continue;
1672
            if (builder.num_rows() >= batch_size) {
×
1673
                auto result = builder.finish();
×
1674
                arena.clear();
×
1675
                if (!co_await producer.send(std::move(result))) co_return;
×
1676
                if (!builder.is_schema_locked()) builder.lock_schema();
×
1677
                builder.reset(true);
×
1678
                builder.reserve(batch_size);
×
1679
            }
×
1680
        }
×
1681
        if (builder.num_rows() > 0) {
×
1682
            if (!co_await producer.send(builder.finish())) co_return;
×
1683
        }
1684
    }
236!
1685
    co_return;
52✔
1686
}
1,272!
1687

1688
static CoroTask<void> spawn_arrow_producers(
216!
1689
    CoroScope &child,
1690
    dftracer::utils::coro::Channel<ArrowExportResult> *out_chan,
1691
    const std::vector<ArrowWorkItem> *work_items, const std::string *index_dir,
1692
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
1693
    std::size_t batch_size, bool normalize, std::atomic<bool> *cancelled_ptr,
1694
    std::size_t max_workers) {
36!
1695
    std::size_t num_workers = std::min(work_items->size(), max_workers);
36!
1696
    if (num_workers == 0) num_workers = 1;
36!
1697
    auto work_chan =
36✔
1698
        dftracer::utils::coro::make_channel<ArrowWorkItem>(num_workers);
36!
1699

1700
    for (std::size_t i = 0; i < num_workers; ++i) {
88✔
1701
        child.spawn([out_chan, wc = work_chan, idx = *index_dir,
156!
1702
                     checkpoint_size, auto_build_index, r = *rc, batch_size,
156!
1703
                     normalize, cancelled_ptr](CoroScope &) {
257✔
1704
            return checkpoint_worker(wc, out_chan, idx, checkpoint_size,
153!
1705
                                     auto_build_index, r, batch_size, normalize,
104!
1706
                                     cancelled_ptr);
104!
1707
        });
1708
    }
52✔
1709

1710
    child.spawn([wc = work_chan, work_items, cancelled_ptr](CoroScope &) {
108!
1711
        return send_work_items_to_channel(wc, work_items, cancelled_ptr);
72!
1712
    });
1713
    co_return;
72✔
1714
}
108!
1715

1716
static CoroTask<void> produce_arrow_batches_for_files(
294!
1717
    CoroScope &scope, ArrowIteratorState *sp, std::vector<std::string> files,
1718
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1719
    ReadConfig rc, std::size_t batch_size, bool normalize,
1720
    std::size_t max_workers) {
37!
1721
    try {
1722
        if (files.empty()) {
109✔
1723
            sp->channel->close();
1!
1724
            co_return;
38✔
1725
        }
1726

1727
        auto work_items = enumerate_work_items(
216!
1728
            files, index_dir, rc.query, max_workers, rc.start_byte, rc.end_byte,
108✔
1729
            rc.start_line, rc.end_line);
108✔
1730
        if (work_items.empty()) {
108!
1731
            sp->channel->close();
×
1732
            co_return;
1733
        }
1734

1735
        auto *chan_ptr = sp->channel.get();
108✔
1736
        auto *cancelled_ptr = &sp->cancelled;
108✔
1737

1738
        co_await scope.scope([chan_ptr, &work_items, &index_dir,
1,080!
1739
                              checkpoint_size, auto_build_index, &rc,
216✔
1740
                              batch_size, normalize, cancelled_ptr,
324✔
1741
                              max_workers](CoroScope &child) -> CoroTask<void> {
144!
1742
            co_await spawn_arrow_producers(
288!
1743
                child, chan_ptr, &work_items, &index_dir, checkpoint_size,
108✔
1744
                auto_build_index, &rc, batch_size, normalize, cancelled_ptr,
108✔
1745
                max_workers);
108✔
1746
        });
144!
1747
    } catch (...) {
36!
1748
        sp->set_error(std::current_exception());
×
1749
    }
×
1750
}
146!
1751

1752
static CoroTask<void> produce_arrow_batches_parallel(
180!
1753
    CoroScope &scope, ArrowIteratorState *sp, std::string dir_path,
1754
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1755
    ReadConfig rc, std::size_t batch_size, bool normalize,
1756
    std::size_t max_workers) {
15!
1757
    try {
1758
        PatternDirectoryScannerUtility scanner;
45!
1759
        auto scan_input = PatternDirectoryScannerUtilityInput(
90!
1760
            dir_path, {".pfw", ".pfw.gz"}, true, false);
45!
1761
        auto entries = co_await scope.spawn(scanner, scan_input);
75!
1762

1763
        std::vector<std::string> files;
45✔
1764
        files.reserve(entries.size());
45✔
1765
        for (auto &e : entries) files.push_back(e.path.string());
54!
1766
        std::sort(files.begin(), files.end());
15✔
1767

1768
        co_await produce_arrow_batches_for_files(
105!
1769
            scope, sp, std::move(files), std::move(index_dir), checkpoint_size,
45✔
1770
            auto_build_index, std::move(rc), batch_size, normalize,
45✔
1771
            max_workers);
45✔
1772
    } catch (...) {
45✔
1773
        sp->set_error(std::current_exception());
×
1774
    }
×
1775
}
210!
1776

1777
CoroTask<void> produce_arrow_batches(
136!
1778
    std::shared_ptr<ArrowIteratorState> state,
1779
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer,
1780
    TraceReaderConfig cfg, ReadConfig rc, std::size_t batch_size,
1781
    bool flatten_objects = false, bool normalize = false) {
17!
1782
    (void)flatten_objects;
1783

1784
    auto guard = producer.guard();
51!
1785
    try {
1786
        TraceReader reader(std::move(cfg));
51!
1787

1788
        if (!normalize) {
51!
1789
            auto batch_gen = reader.read_arrow(rc, batch_size);
51!
1790
            while (auto batch_opt = co_await batch_gen.next()) {
213!
1791
                if (state->cancelled.load(std::memory_order_acquire)) break;
97✔
1792
                auto result_bytes =
96✔
1793
                    dftracer::utils::python::byte_size(*batch_opt);
96!
1794
                state->bytes_in_queue.fetch_add(result_bytes,
96✔
1795
                                                std::memory_order_acq_rel);
1796
                if (!co_await producer.send(std::move(*batch_opt))) break;
128!
1797
            }
113!
1798
            co_return;
17✔
1799
        }
17✔
1800

1801
        auto gen = reader.read_json(rc);
×
1802
        RecordBatchBuilder builder;
×
1803
        builder.reserve(batch_size);
×
1804

1805
        StringArena arena;
×
1806

1807
        while (auto opt = co_await gen.next()) {
×
1808
            if (state->cancelled.load(std::memory_order_acquire)) break;
×
1809
            if (!build_arrow_row(builder, *opt->parser, arena, normalize))
×
1810
                continue;
1811

1812
            if (builder.num_rows() >= batch_size) {
×
1813
                auto result = builder.finish();
×
1814
                arena.clear();
×
1815
                auto result_bytes = dftracer::utils::python::byte_size(result);
×
1816
                state->bytes_in_queue.fetch_add(result_bytes,
1817
                                                std::memory_order_acq_rel);
1818
                if (!co_await producer.send(std::move(result))) break;
×
1819
                if (!builder.is_schema_locked()) {
×
1820
                    builder.lock_schema();
1821
                }
1822
                builder.reset(true);
×
1823
                builder.reserve(batch_size);
×
1824
            }
×
1825
        }
×
1826

1827
        if (builder.num_rows() > 0 &&
×
1828
            !state->cancelled.load(std::memory_order_acquire)) {
1829
            auto result = builder.finish();
×
1830
            auto result_bytes = dftracer::utils::python::byte_size(result);
×
1831
            state->bytes_in_queue.fetch_add(result_bytes,
1832
                                            std::memory_order_acq_rel);
1833
            co_await producer.send(std::move(result));
×
1834
        }
×
1835
    } catch (...) {
17!
1836
        state->set_error(std::current_exception());
×
1837
    }
×
1838
}
375!
1839

1840
#endif  // DFTRACER_UTILS_ENABLE_ARROW
1841

1842
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
1843

1844
struct WriteArrowStats {
39✔
1845
    std::unordered_map<std::string, PartitionWriteStats> partitions;
1846
    int64_t total_rows = 0;
39✔
1847
    int64_t total_uncompressed_bytes = 0;
39✔
1848
};
1849

1850
struct WriteArrowResult {
52✔
1851
    WriteArrowStats stats;
1852
    std::string error;
1853
    std::uint64_t chunks_scanned = 0;
39✔
1854
    std::uint64_t chunks_skipped = 0;
39✔
1855
};
1856

1857
CoroTask<WriteArrowResult> write_arrow_pipeline(
190!
1858
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
1859
    std::vector<ViewDefinition> views, std::string output_path,
1860
    int64_t chunk_size_bytes, IpcCompression compression,
1861
    std::size_t event_batch_size) {
13!
1862
    namespace dft_internal =
1863
        dftracer::utils::utilities::composites::dft::internal;
1864
    WriteArrowResult result;
13✔
1865

1866
    try {
1867
        if (views.empty()) {
13✔
1868
            views.push_back(ViewDefinition().with_name("all"));
8!
1869
        }
8✔
1870

1871
        std::string resolved_index =
13✔
1872
            index_path.empty()
26!
1873
                ? dft_internal::determine_index_path(file_path, "")
13!
1874
                : index_path;
×
1875

1876
        auto meta_input = MetadataCollectorUtilityInput::from_file(file_path)
26!
1877
                              .with_checkpoint_size(checkpoint_size)
13!
1878
                              .with_index(resolved_index);
13!
1879
        auto metadata = co_await MetadataCollectorUtility{}.process(meta_input);
39!
1880
        if (!metadata.success) {
13!
1881
            result.error =
1882
                "Failed to collect metadata: " + metadata.error_message;
×
1883
            co_return result;
×
1884
        }
1885

1886
        for (const auto &view : views) {
99✔
1887
            std::string view_output = output_path;
30!
1888
            if (views.size() > 1 || view.name != "all") {
30✔
1889
                view_output = output_path + "/" + view.name;
42!
1890
            }
6✔
1891

1892
            PartitionWriter writer;
42!
1893
            int rc_open = co_await writer.open(view_output, chunk_size_bytes,
98!
1894
                                               compression);
42✔
1895
            if (rc_open != 0) {
42!
1896
                result.error =
1897
                    "Failed to open partition writer for view: " + view.name;
×
1898
                co_return result;
×
1899
            }
1900

1901
            ViewBuilderInput builder_input;
42✔
1902
            builder_input.with_view(view)
42✔
1903
                .with_file_path(file_path)
14!
1904
                .with_index_path(resolved_index)
14!
1905
                .with_uncompressed_size(metadata.uncompressed_size)
14!
1906
                .with_num_checkpoints(metadata.num_checkpoints);
14✔
1907

1908
            auto build_output =
42✔
1909
                co_await ViewBuilderUtility{}.process(builder_input);
56!
1910
            if (!build_output.success) {
58!
1911
                result.error = "ViewBuilder failed for view: " + view.name;
×
1912
                co_return result;
×
1913
            }
1914

1915
            result.chunks_skipped += build_output.skipped_checkpoints;
58✔
1916

1917
            if (!build_output.file_may_match) {
58✔
1918
                auto stats = co_await writer.close();
24!
1919
                result.stats.partitions[view.name] = std::move(stats);
6!
1920
                continue;
1921
            }
6✔
1922

1923
            RecordBatchBuilder builder;
40!
1924
            bool schema_locked = false;
40✔
1925

1926
            for (const auto &candidate : build_output.candidates) {
64!
1927
                ViewReaderInput reader_input;
40✔
1928
                reader_input.with_file_path(file_path)
40✔
1929
                    .with_index_path(resolved_index)
8!
1930
                    .with_checkpoint_size(checkpoint_size)
8!
1931
                    .with_byte_range(candidate.start_byte, candidate.end_byte)
8!
1932
                    .with_checkpoint_idx(candidate.checkpoint_idx)
8!
1933
                    .with_event_batch_size(event_batch_size)
8!
1934
                    .with_view(view);
8!
1935
                reader_input.query = view.query;
8✔
1936

1937
                ViewReaderUtility reader;
40!
1938
                auto gen = reader.process(reader_input);
40!
1939
                while (auto opt = co_await gen.next()) {
64!
1940
                    auto arrow_batch = opt->to_arrow(builder);
24!
1941
                    int rc_write = co_await writer.write_batch(arrow_batch);
32!
1942
                    if (rc_write != 0) {
8!
1943
                        result.error =
1944
                            "Failed to write batch for view: " + view.name;
×
1945
                        co_return result;
×
1946
                    }
1947
                    if (!schema_locked) {
8!
1948
                        builder.lock_schema();
8✔
1949
                        schema_locked = true;
8✔
1950
                    }
8✔
1951
                    builder.reset(true);
8!
1952
                }
32!
1953
                result.chunks_scanned++;
8✔
1954
            }
24✔
1955

1956
            auto stats = co_await writer.close();
32!
1957
            result.stats.partitions[view.name] = std::move(stats);
8!
1958
            result.stats.total_rows +=
8✔
1959
                result.stats.partitions[view.name].total_rows;
8!
1960
            result.stats.total_uncompressed_bytes +=
8✔
1961
                result.stats.partitions[view.name].total_uncompressed_bytes;
8!
1962
        }
86✔
1963
    } catch (const std::exception &e) {
69!
1964
        result.error = e.what();
×
1965
    }
×
1966
    co_return result;
13!
1967
}
405!
1968

1969
struct ViewChunkInfo {
1970
    std::uint64_t checkpoint_idx;
1971
    std::size_t start_byte;
1972
    std::size_t end_byte;
1973
};
1974

1975
struct GetViewChunksResult {
12✔
1976
    std::vector<ViewChunkInfo> chunks;
1977
    std::uint64_t total_checkpoints = 0;
9✔
1978
    std::uint64_t skipped_checkpoints = 0;
9✔
1979
    bool file_may_match = false;
9✔
1980
    std::string error;
1981
};
1982

1983
CoroTask<GetViewChunksResult> get_view_chunks_pipeline(
30!
1984
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
1985
    ViewDefinition view) {
3!
1986
    namespace dft_internal =
1987
        dftracer::utils::utilities::composites::dft::internal;
1988
    GetViewChunksResult result;
3✔
1989

1990
    try {
1991
        std::string resolved_index =
3✔
1992
            index_path.empty()
6!
1993
                ? dft_internal::determine_index_path(file_path, "")
3!
1994
                : index_path;
×
1995

1996
        auto meta_input = MetadataCollectorUtilityInput::from_file(file_path)
3!
1997
                              .with_checkpoint_size(checkpoint_size)
3✔
1998
                              .with_index(resolved_index);
3!
1999
        auto metadata = co_await MetadataCollectorUtility{}.process(meta_input);
9!
2000
        if (!metadata.success) {
9!
2001
            result.error =
2002
                "Failed to collect metadata: " + metadata.error_message;
×
2003
            co_return result;
×
2004
        }
2005

2006
        ViewBuilderInput builder_input;
9✔
2007
        builder_input.with_view(view)
9✔
2008
            .with_file_path(file_path)
3!
2009
            .with_index_path(resolved_index)
3!
2010
            .with_uncompressed_size(metadata.uncompressed_size)
3!
2011
            .with_num_checkpoints(metadata.num_checkpoints);
3✔
2012

2013
        auto build_output =
9✔
2014
            co_await ViewBuilderUtility{}.process(builder_input);
12!
2015
        if (!build_output.success) {
3!
2016
            result.error = "ViewBuilder failed";
×
2017
            co_return result;
×
2018
        }
2019

2020
        result.file_may_match = build_output.file_may_match;
3✔
2021
        result.total_checkpoints = build_output.total_checkpoints;
3✔
2022
        result.skipped_checkpoints = build_output.skipped_checkpoints;
3✔
2023

2024
        for (const auto &candidate : build_output.candidates) {
7✔
2025
            result.chunks.push_back({candidate.checkpoint_idx,
12!
2026
                                     candidate.start_byte, candidate.end_byte});
8✔
2027
        }
4✔
2028
    } catch (const std::exception &e) {
9!
2029
        result.error = e.what();
×
2030
    }
×
2031
    co_return result;
3!
2032
}
39!
2033

2034
struct WriteViewChunkResult {
16✔
2035
    std::string output_file;
2036
    std::uint64_t events_matched = 0;
12✔
2037
    std::uint64_t events_scanned = 0;
12✔
2038
    int64_t rows_written = 0;
12✔
2039
    int64_t bytes_written = 0;
12✔
2040
    std::string error;
2041
};
2042

2043
CoroTask<WriteViewChunkResult> write_view_chunk_pipeline(
56!
2044
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
2045
    ViewDefinition view, std::uint64_t checkpoint_idx, std::size_t start_byte,
2046
    std::size_t end_byte, std::string output_file, IpcCompression compression,
2047
    std::size_t event_batch_size) {
4!
2048
    namespace dft_internal =
2049
        dftracer::utils::utilities::composites::dft::internal;
2050
    WriteViewChunkResult result;
4✔
2051
    result.output_file = output_file;
4!
2052

2053
    try {
2054
        std::string resolved_index =
4✔
2055
            index_path.empty()
8!
2056
                ? dft_internal::determine_index_path(file_path, "")
4!
2057
                : index_path;
×
2058

2059
        dftracer::utils::utilities::common::arrow::IpcWriter writer;
4!
2060
        int rc_open = co_await writer.open(output_file, compression);
12!
2061
        if (rc_open != 0) {
20!
2062
            result.error = "Failed to open output file";
×
2063
            co_return result;
×
2064
        }
2065

2066
        ViewReaderInput reader_input;
20✔
2067
        reader_input.with_file_path(file_path)
20✔
2068
            .with_index_path(resolved_index)
4✔
2069
            .with_checkpoint_size(checkpoint_size)
3!
2070
            .with_byte_range(start_byte, end_byte)
3!
2071
            .with_checkpoint_idx(checkpoint_idx)
3!
2072
            .with_event_batch_size(event_batch_size)
3!
2073
            .with_view(view);
3!
2074
        reader_input.query = view.query;
3✔
2075

2076
        RecordBatchBuilder builder;
18!
2077
        bool schema_locked = false;
18✔
2078

2079
        ViewReaderUtility reader;
18!
2080
        auto gen = reader.process(reader_input);
18!
2081
        while (auto opt = co_await gen.next()) {
29!
2082
            result.events_matched += opt->events_matched;
12✔
2083
            result.events_scanned += opt->events_scanned;
12✔
2084
            auto batch = opt->to_arrow(builder);
12!
2085
            if (batch.valid()) {
12!
2086
                result.rows_written += batch.num_rows();
12✔
2087
                int rc = co_await writer.write_batch(batch);
16!
2088
                if (rc != 0) {
4!
2089
                    result.error = "Failed to write batch";
×
2090
                    co_return result;
×
2091
                }
2092
                if (!schema_locked) {
4!
2093
                    builder.lock_schema();
4✔
2094
                    schema_locked = true;
4✔
2095
                }
4✔
2096
                builder.reset(true);
4!
2097
            }
4!
2098
        }
16!
2099

2100
        int rc = co_await writer.close();
16!
2101
        if (rc != 0) {
4!
2102
            result.error = "Failed to close output file";
×
2103
        }
2104
    } catch (const std::exception &e) {
20!
2105
        result.error = e.what();
×
2106
    }
×
2107
    co_return result;
4!
2108
}
114!
2109

2110
struct ChunkDescriptor {
2111
    std::uint64_t checkpoint_idx;
2112
    std::size_t start_byte;
2113
    std::size_t end_byte;
2114
    std::string output_file;
2115
};
2116

2117
struct WriteViewChunksResult {
3✔
2118
    std::vector<WriteViewChunkResult> results;
2119
    int64_t total_rows = 0;
3✔
2120
    int64_t total_events_matched = 0;
3✔
2121
};
2122

2123
CoroTask<WriteViewChunksResult> write_view_chunks_pipeline(
8!
2124
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
2125
    ViewDefinition view, std::vector<ChunkDescriptor> chunks,
2126
    IpcCompression compression, std::size_t event_batch_size) {
1!
2127
    WriteViewChunksResult result;
3✔
2128

2129
    if (chunks.empty()) {
3!
2130
        co_return result;
1!
2131
    }
2132

2133
    std::vector<CoroTask<WriteViewChunkResult>> tasks;
3✔
2134
    tasks.reserve(chunks.size());
3!
2135

2136
    for (const auto &chunk : chunks) {
6✔
2137
        tasks.push_back(write_view_chunk_pipeline(
6!
2138
            file_path, index_path, checkpoint_size, view, chunk.checkpoint_idx,
3!
2139
            chunk.start_byte, chunk.end_byte, chunk.output_file, compression,
3!
2140
            event_batch_size));
3✔
2141
    }
3✔
2142

2143
    result.results = co_await when_all(std::move(tasks));
4!
2144

2145
    for (const auto &r : result.results) {
4✔
2146
        result.total_rows += r.rows_written;
3✔
2147
        result.total_events_matched += r.events_matched;
3✔
2148
    }
3✔
2149

2150
    co_return result;
1!
2151
}
7!
2152

2153
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
2154

2155
TraceReaderConfig build_config(TraceReaderObject *self) {
302✔
2156
    TraceReaderConfig cfg;
302✔
2157
    cfg.file_path = PyUnicode_AsUTF8(self->file_path);
302!
2158
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
302!
2159
    if (idx) cfg.index_dir = idx;
302!
2160
    cfg.checkpoint_size = self->checkpoint_size;
302✔
2161
    cfg.auto_build_index = self->auto_build_index != 0;
302✔
2162
    return cfg;
302✔
2163
}
151!
2164

2165
static Runtime *get_runtime(TraceReaderObject *self) {
300✔
2166
    if (self->runtime_obj) {
300✔
2167
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
68✔
2168
    }
2169
    return get_default_runtime();
232✔
2170
}
150✔
2171

2172
static TraceReaderIteratorObject *make_memoryview_iterator(
142✔
2173
    std::shared_ptr<MemoryViewBatchIteratorState> state) {
2174
    TraceReaderIteratorObject *it =
71✔
2175
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
142✔
2176
            &TraceReaderIteratorType, 0);
2177
    if (!it) return NULL;
142✔
2178
    new (&it->batch_state)
142✔
2179
        std::shared_ptr<MemoryViewBatchIteratorState>(std::move(state));
142✔
2180
    it->current_batch = NULL;
142✔
2181
    it->batch_index = 0;
142✔
2182
    new (&it->json_dict_state) std::shared_ptr<JsonDictIteratorState>();
142✔
2183
    new (&it->json_dict_current_batch) std::shared_ptr<JsonDictBatch>();
142✔
2184
    it->json_dict_index = 0;
142✔
2185
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2186
    new (&it->arrow_state) std::shared_ptr<ArrowIteratorState>();
142✔
2187
#endif
2188
    it->mode = IteratorMode::MEMORYVIEW;
142✔
2189
    return it;
142✔
2190
}
71✔
2191

2192
static TraceReaderIteratorObject *make_json_dict_iterator(
14✔
2193
    std::shared_ptr<JsonDictIteratorState> state) {
2194
    TraceReaderIteratorObject *it =
7✔
2195
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
14✔
2196
            &TraceReaderIteratorType, 0);
2197
    if (!it) return NULL;
14✔
2198
    new (&it->batch_state) std::shared_ptr<MemoryViewBatchIteratorState>();
14✔
2199
    it->current_batch = NULL;
14✔
2200
    it->batch_index = 0;
14✔
2201
    new (&it->json_dict_state)
14✔
2202
        std::shared_ptr<JsonDictIteratorState>(std::move(state));
14✔
2203
    new (&it->json_dict_current_batch) std::shared_ptr<JsonDictBatch>();
14✔
2204
    it->json_dict_index = 0;
14✔
2205
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2206
    new (&it->arrow_state) std::shared_ptr<ArrowIteratorState>();
14✔
2207
#endif
2208
    it->mode = IteratorMode::JSON_DICT;
14✔
2209
    return it;
14✔
2210
}
7✔
2211

2212
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2213
static TraceReaderIteratorObject *make_arrow_iterator(
54✔
2214
    std::shared_ptr<ArrowIteratorState> state) {
2215
    TraceReaderIteratorObject *it =
27✔
2216
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
54✔
2217
            &TraceReaderIteratorType, 0);
2218
    if (!it) return NULL;
54✔
2219
    new (&it->batch_state) std::shared_ptr<MemoryViewBatchIteratorState>();
54✔
2220
    it->current_batch = NULL;
54✔
2221
    it->batch_index = 0;
54✔
2222
    new (&it->json_dict_state) std::shared_ptr<JsonDictIteratorState>();
54✔
2223
    new (&it->json_dict_current_batch) std::shared_ptr<JsonDictBatch>();
54✔
2224
    it->json_dict_index = 0;
54✔
2225
    new (&it->arrow_state)
54✔
2226
        std::shared_ptr<ArrowIteratorState>(std::move(state));
54✔
2227
    it->mode = IteratorMode::ARROW;
54✔
2228
    return it;
54✔
2229
}
27✔
2230
#endif
2231

2232
}  // namespace
2233

2234
static void TraceReader_dealloc(TraceReaderObject *self) {
316✔
2235
    Py_XDECREF(self->file_path);
316✔
2236
    Py_XDECREF(self->index_dir);
316✔
2237
    Py_XDECREF(self->runtime_obj);
316✔
2238
    Py_TYPE(self)->tp_free((PyObject *)self);
316✔
2239
}
316✔
2240

2241
static PyObject *TraceReader_new(PyTypeObject *type, PyObject *args,
316✔
2242
                                 PyObject *kwds) {
2243
    TraceReaderObject *self = (TraceReaderObject *)type->tp_alloc(type, 0);
316✔
2244
    if (self) {
316✔
2245
        self->file_path = NULL;
316✔
2246
        self->index_dir = NULL;
316✔
2247
        self->checkpoint_size = 32 * 1024 * 1024;
316✔
2248
        self->auto_build_index = 0;
316✔
2249
        self->has_index = 0;
316✔
2250
        self->runtime_obj = NULL;
316✔
2251
    }
158✔
2252
    return (PyObject *)self;
316✔
2253
}
2254

2255
static int TraceReader_init(TraceReaderObject *self, PyObject *args,
316✔
2256
                            PyObject *kwds) {
2257
    static const char *kwlist[] = {
2258
        "path",    "index_dir", "checkpoint_size", "auto_build_index",
2259
        "runtime", NULL};
2260

2261
    const char *file_path;
2262
    const char *index_dir = "";
316✔
2263
    std::size_t checkpoint_size = 32 * 1024 * 1024;
316✔
2264
    int auto_build_index = 0;
316✔
2265
    PyObject *runtime_arg = NULL;
316✔
2266

2267
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|snpO", (char **)kwlist,
316!
2268
                                     &file_path, &index_dir, &checkpoint_size,
2269
                                     &auto_build_index, &runtime_arg)) {
2270
        return -1;
×
2271
    }
2272

2273
    if (runtime_arg && runtime_arg != Py_None) {
316✔
2274
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
64!
2275
            // Direct C++ Runtime object
2276
            Py_INCREF(runtime_arg);
×
2277
            self->runtime_obj = runtime_arg;
×
2278
        } else {
2279
            // Python wrapper, extract _native attribute
2280
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
64!
2281
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
64!
2282
                self->runtime_obj = native;  // already incref'd by GetAttr
64✔
2283
            } else {
32✔
2284
                Py_XDECREF(native);
×
2285
                PyErr_SetString(PyExc_TypeError,
×
2286
                                "runtime must be a Runtime instance or None");
2287
                return -1;
×
2288
            }
2289
        }
2290
    }
32✔
2291

2292
    self->file_path = PyUnicode_FromString(file_path);
316!
2293
    if (!self->file_path) return -1;
316✔
2294

2295
    self->index_dir = PyUnicode_FromString(index_dir);
316!
2296
    if (!self->index_dir) {
316✔
2297
        Py_DECREF(self->file_path);
×
2298
        self->file_path = NULL;
×
2299
        return -1;
×
2300
    }
2301

2302
    self->checkpoint_size = checkpoint_size;
316✔
2303
    self->auto_build_index = auto_build_index;
316✔
2304

2305
    try {
2306
        TraceReaderConfig cfg;
316✔
2307
        cfg.file_path = file_path;
316!
2308
        cfg.index_dir = index_dir;
316!
2309
        cfg.checkpoint_size = checkpoint_size;
316✔
2310
        cfg.auto_build_index = auto_build_index != 0;
316✔
2311
        TraceReader probe(std::move(cfg));
316!
2312
        self->has_index = probe.has_index() ? 1 : 0;
316!
2313
    } catch (const std::exception &e) {
316!
2314
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2315
        Py_DECREF(self->file_path);
×
2316
        Py_DECREF(self->index_dir);
×
2317
        self->file_path = NULL;
×
2318
        self->index_dir = NULL;
×
2319
        return -1;
×
2320
    }
×
2321

2322
    return 0;
316✔
2323
}
158✔
2324

2325
static PyObject *TraceReader_iter_lines(TraceReaderObject *self, PyObject *args,
124✔
2326
                                        PyObject *kwds) {
2327
    static const char *kwlist[] = {"start_line",    "end_line",    "start_byte",
2328
                                   "end_byte",      "buffer_size", "query",
2329
                                   "memory_budget", NULL};
2330
    Py_ssize_t start_line = 0, end_line = 0;
124✔
2331
    Py_ssize_t start_byte = 0, end_byte = 0;
124✔
2332
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
124✔
2333
    const char *query_str = NULL;
124✔
2334
    Py_ssize_t memory_budget = 0;
124✔
2335

2336
    if (!PyArg_ParseTupleAndKeywords(
124!
2337
            args, kwds, "|nnnnnzn", (char **)kwlist, &start_line, &end_line,
62✔
2338
            &start_byte, &end_byte, &buffer_size, &query_str, &memory_budget)) {
2339
        return NULL;
×
2340
    }
2341

2342
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
124!
2343
        buffer_size <= 0) {
118!
2344
        PyErr_SetString(
6!
2345
            PyExc_ValueError,
3✔
2346
            "range arguments must be >= 0; buffer_size must be > 0");
2347
        return NULL;
6✔
2348
    }
2349

2350
    TraceReaderConfig cfg;
118✔
2351
    try {
2352
        cfg = build_config(self);
118!
2353
    } catch (const std::exception &e) {
59!
2354
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2355
        return NULL;
×
2356
    }
×
2357

2358
    ReadConfig rc;
118✔
2359
    rc.start_line = static_cast<std::size_t>(start_line);
118✔
2360
    rc.end_line = static_cast<std::size_t>(end_line);
118✔
2361
    rc.start_byte = static_cast<std::size_t>(start_byte);
118✔
2362
    rc.end_byte = static_cast<std::size_t>(end_byte);
118✔
2363
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
118✔
2364
    if (query_str) rc.query = query_str;
118!
2365

2366
    auto state = std::make_shared<MemoryViewBatchIteratorState>();
118!
2367
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
118!
2368
        static_cast<std::size_t>(memory_budget));
59✔
2369

2370
    Runtime *rt = get_runtime(self);
118!
2371
    std::size_t max_workers = rt->threads();
118!
2372
    constexpr std::size_t LINE_BATCH_SIZE = 1024;
118✔
2373
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
118!
2374
        state->memory_budget_bytes, LINE_BATCH_SIZE * ESTIMATED_BYTES_PER_LINE,
118✔
2375
        max_workers);
59✔
2376
    state->channel =
118✔
2377
        dftracer::utils::coro::make_channel<MemoryViewBatchData>(capacity);
177!
2378
    auto *sp = state.get();
118✔
2379

2380
    try {
2381
        bool is_dir = fs::is_directory(cfg.file_path);
118!
2382
        if (is_dir) {
118✔
2383
            auto handle = rt->scope(
6!
2384
                "iter_lines_parallel",
3!
2385
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
27!
2386
                 checkpoint_size = cfg.checkpoint_size,
6✔
2387
                 auto_build_index = cfg.auto_build_index, rc,
9!
2388
                 max_workers](CoroScope &scope) -> CoroTask<void> {
6!
2389
                    co_await produce_lines_parallel(
24!
2390
                        scope, sp, dir_path, index_dir, checkpoint_size,
9!
2391
                        auto_build_index, rc, LINE_BATCH_SIZE, max_workers);
9!
2392
                });
18!
2393
            state->task_future = handle.future;
6!
2394
        } else {
6✔
2395
            auto handle = rt->submit(
112!
2396
                produce_lines_batched(state, state->channel->producer(), cfg,
224!
2397
                                      rc, LINE_BATCH_SIZE),
56!
2398
                "iter_lines");
224!
2399
            state->task_future = handle.future;
112!
2400
        }
112✔
2401
    } catch (const std::exception &e) {
59!
2402
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2403
        return NULL;
×
2404
    }
×
2405

2406
    TraceReaderIteratorObject *it = make_memoryview_iterator(std::move(state));
118!
2407
    return (PyObject *)it;
118✔
2408
}
121✔
2409

2410
static PyObject *TraceReader_iter_raw(TraceReaderObject *self, PyObject *args,
26✔
2411
                                      PyObject *kwds) {
2412
    static const char *kwlist[] = {"start_line", "end_line",    "start_byte",
2413
                                   "end_byte",   "buffer_size", "line_aligned",
2414
                                   "multi_line", "query",       "memory_budget",
2415
                                   NULL};
2416
    Py_ssize_t start_line = 0, end_line = 0;
26✔
2417
    Py_ssize_t start_byte = 0, end_byte = 0;
26✔
2418
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
26✔
2419
    int line_aligned = 1;
26✔
2420
    int multi_line = 1;
26✔
2421
    const char *query_str = NULL;
26✔
2422
    Py_ssize_t memory_budget = 0;
26✔
2423

2424
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnppzn", (char **)kwlist,
26!
2425
                                     &start_line, &end_line, &start_byte,
2426
                                     &end_byte, &buffer_size, &line_aligned,
2427
                                     &multi_line, &query_str, &memory_budget)) {
2428
        return NULL;
×
2429
    }
2430

2431
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
26!
2432
        buffer_size <= 0) {
24!
2433
        PyErr_SetString(
2!
2434
            PyExc_ValueError,
1✔
2435
            "range arguments must be >= 0; buffer_size must be > 0");
2436
        return NULL;
2✔
2437
    }
2438

2439
    TraceReaderConfig cfg;
24✔
2440
    try {
2441
        cfg = build_config(self);
24!
2442
    } catch (const std::exception &e) {
12!
2443
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2444
        return NULL;
×
2445
    }
×
2446

2447
    ReadConfig rc;
24✔
2448
    rc.start_line = static_cast<std::size_t>(start_line);
24✔
2449
    rc.end_line = static_cast<std::size_t>(end_line);
24✔
2450
    rc.start_byte = static_cast<std::size_t>(start_byte);
24✔
2451
    rc.end_byte = static_cast<std::size_t>(end_byte);
24✔
2452
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
24✔
2453
    rc.line_aligned = line_aligned != 0;
24✔
2454
    rc.multi_line = multi_line != 0;
24✔
2455
    if (query_str) rc.query = query_str;
24!
2456

2457
    auto state = std::make_shared<MemoryViewBatchIteratorState>();
24!
2458
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
24!
2459
        static_cast<std::size_t>(memory_budget));
12✔
2460

2461
    Runtime *rt = get_runtime(self);
24!
2462
    std::size_t max_workers = rt->threads();
24!
2463
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
24!
2464
        state->memory_budget_bytes, ESTIMATED_BYTES_PER_RAW_CHUNK, max_workers);
24✔
2465
    state->channel =
24✔
2466
        dftracer::utils::coro::make_channel<MemoryViewBatchData>(capacity);
36!
2467
    auto *sp = state.get();
24✔
2468

2469
    try {
2470
        bool is_dir = fs::is_directory(cfg.file_path);
24!
2471
        if (is_dir) {
24✔
2472
            auto handle = rt->scope(
4!
2473
                "iter_raw_parallel",
2!
2474
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
18!
2475
                 checkpoint_size = cfg.checkpoint_size,
4✔
2476
                 auto_build_index = cfg.auto_build_index, rc,
6!
2477
                 max_workers](CoroScope &scope) -> CoroTask<void> {
4!
2478
                    co_await produce_raw_parallel(
16!
2479
                        scope, sp, dir_path, index_dir, checkpoint_size,
6!
2480
                        auto_build_index, rc, max_workers);
6!
2481
                });
12!
2482
            state->task_future = handle.future;
4!
2483
        } else {
4✔
2484
            auto handle = rt->submit(
20!
2485
                produce_raw_batched(state, state->channel->producer(), cfg, rc),
30!
2486
                "iter_raw");
40!
2487
            state->task_future = handle.future;
20!
2488
        }
20✔
2489
    } catch (const std::exception &e) {
12!
2490
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2491
        return NULL;
×
2492
    }
×
2493

2494
    TraceReaderIteratorObject *it = make_memoryview_iterator(std::move(state));
24!
2495
    return (PyObject *)it;
24✔
2496
}
25✔
2497

2498
static PyObject *TraceReader_read_lines(TraceReaderObject *self, PyObject *args,
92✔
2499
                                        PyObject *kwds) {
2500
    PyObject *iter = TraceReader_iter_lines(self, args, kwds);
92✔
2501
    if (!iter) return NULL;
92✔
2502
    PyObject *list = PySequence_List(iter);
88✔
2503
    Py_DECREF(iter);
44✔
2504
    return list;
88✔
2505
}
46✔
2506

2507
static PyObject *TraceReader_iter_json(TraceReaderObject *self, PyObject *args,
14✔
2508
                                       PyObject *kwds) {
2509
    static const char *kwlist[] = {"start_line", "end_line",      "start_byte",
2510
                                   "end_byte",   "buffer_size",   "query",
2511
                                   "batch_size", "memory_budget", NULL};
2512
    Py_ssize_t start_line = 0, end_line = 0;
14✔
2513
    Py_ssize_t start_byte = 0, end_byte = 0;
14✔
2514
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
14✔
2515
    const char *query_str = NULL;
14✔
2516
    Py_ssize_t batch_size = 1024;
14✔
2517
    Py_ssize_t memory_budget = 0;
14✔
2518

2519
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnznn", (char **)kwlist,
14!
2520
                                     &start_line, &end_line, &start_byte,
2521
                                     &end_byte, &buffer_size, &query_str,
2522
                                     &batch_size, &memory_budget)) {
2523
        return NULL;
×
2524
    }
2525

2526
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
21!
2527
        buffer_size <= 0 || batch_size <= 0) {
14!
2528
        PyErr_SetString(PyExc_ValueError,
×
2529
                        "range arguments must be >= 0; buffer_size and "
2530
                        "batch_size must be > 0");
2531
        return NULL;
×
2532
    }
2533

2534
    TraceReaderConfig cfg;
14✔
2535
    try {
2536
        cfg = build_config(self);
14!
2537
    } catch (const std::exception &e) {
7!
2538
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2539
        return NULL;
×
2540
    }
×
2541

2542
    ReadConfig rc;
14✔
2543
    rc.start_line = static_cast<std::size_t>(start_line);
14✔
2544
    rc.end_line = static_cast<std::size_t>(end_line);
14✔
2545
    rc.start_byte = static_cast<std::size_t>(start_byte);
14✔
2546
    rc.end_byte = static_cast<std::size_t>(end_byte);
14✔
2547
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
14✔
2548
    if (query_str) rc.query = query_str;
14!
2549

2550
    auto state = std::make_shared<JsonDictIteratorState>();
14!
2551
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
14!
2552
        static_cast<std::size_t>(memory_budget));
7✔
2553

2554
    Runtime *rt = get_runtime(self);
14!
2555
    std::size_t max_workers = rt->threads();
14!
2556
    auto bs = static_cast<std::size_t>(batch_size);
14✔
2557
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
14!
2558
        state->memory_budget_bytes, bs * ESTIMATED_BYTES_PER_JSON_EVENT,
14✔
2559
        max_workers);
7✔
2560
    state->channel =
14✔
2561
        dftracer::utils::coro::make_channel<JsonDictBatch>(capacity);
21!
2562
    auto *sp = state.get();
14✔
2563

2564
    try {
2565
        bool is_dir = fs::is_directory(cfg.file_path);
14!
2566
        if (is_dir) {
14✔
2567
            auto handle = rt->scope(
12!
2568
                "iter_json_parallel",
6!
2569
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
54!
2570
                 checkpoint_size = cfg.checkpoint_size,
12✔
2571
                 auto_build_index = cfg.auto_build_index, rc, bs,
18!
2572
                 max_workers](CoroScope &scope) -> CoroTask<void> {
12!
2573
                    co_await produce_json_dicts_parallel(
48!
2574
                        scope, sp, dir_path, index_dir, checkpoint_size,
18!
2575
                        auto_build_index, rc, bs, max_workers);
18!
2576
                });
36!
2577
            state->task_future = handle.future;
12!
2578
        } else {
12✔
2579
            auto handle =
2580
                rt->submit(produce_json_dicts(state, state->channel->producer(),
5!
2581
                                              cfg, rc, bs),
1!
2582
                           "iter_json");
4!
2583
            state->task_future = handle.future;
2!
2584
        }
2✔
2585
    } catch (const std::exception &e) {
7!
2586
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2587
        return NULL;
×
2588
    }
×
2589

2590
    TraceReaderIteratorObject *it = make_json_dict_iterator(std::move(state));
14!
2591
    return (PyObject *)it;
14✔
2592
}
14✔
2593

2594
static PyObject *TraceReader_read_json_py(TraceReaderObject *self,
2✔
2595
                                          PyObject *args, PyObject *kwds) {
2596
    PyObject *iter = TraceReader_iter_json(self, args, kwds);
2✔
2597
    if (!iter) return NULL;
2✔
2598
    PyObject *list = PySequence_List(iter);
2✔
2599
    Py_DECREF(iter);
1✔
2600
    return list;
2✔
2601
}
1✔
2602

2603
static PyObject *TraceReader_read_raw(TraceReaderObject *self, PyObject *args,
8✔
2604
                                      PyObject *kwds) {
2605
    PyObject *iter = TraceReader_iter_raw(self, args, kwds);
8✔
2606
    if (!iter) return NULL;
8✔
2607
    PyObject *list = PySequence_List(iter);
8✔
2608
    Py_DECREF(iter);
4✔
2609
    return list;
8✔
2610
}
4✔
2611

2612
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2613

2614
static PyObject *TraceReader_iter_arrow(TraceReaderObject *self, PyObject *args,
54✔
2615
                                        PyObject *kwds) {
2616
    static const char *kwlist[] = {
2617
        "batch_size", "start_line",    "end_line", "start_byte",
2618
        "end_byte",   "buffer_size",   "query",    "flatten_objects",
2619
        "normalize",  "memory_budget", NULL};
2620
    Py_ssize_t batch_size = 10000;
54✔
2621
    Py_ssize_t start_line = 0, end_line = 0;
54✔
2622
    Py_ssize_t start_byte = 0, end_byte = 0;
54✔
2623
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
54✔
2624
    const char *query_str = NULL;
54✔
2625
    int flatten_objects = 1;  // default: expand top-level objects
54✔
2626
    int normalize = 0;
54✔
2627
    Py_ssize_t memory_budget = 0;
54✔
2628

2629
    if (!PyArg_ParseTupleAndKeywords(
54!
2630
            args, kwds, "|nnnnnnzppn", (char **)kwlist, &batch_size,
27✔
2631
            &start_line, &end_line, &start_byte, &end_byte, &buffer_size,
2632
            &query_str, &flatten_objects, &normalize, &memory_budget)) {
2633
        return NULL;
×
2634
    }
2635

2636
    if (batch_size <= 0) {
54!
2637
        PyErr_SetString(PyExc_ValueError, "batch_size must be > 0");
×
2638
        return NULL;
×
2639
    }
2640
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
54!
2641
        buffer_size <= 0) {
54!
2642
        PyErr_SetString(
×
2643
            PyExc_ValueError,
2644
            "range arguments must be >= 0; buffer_size must be > 0");
2645
        return NULL;
×
2646
    }
2647

2648
    TraceReaderConfig cfg;
54✔
2649
    try {
2650
        cfg = build_config(self);
54!
2651
    } catch (const std::exception &e) {
27!
2652
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2653
        return NULL;
×
2654
    }
×
2655

2656
    ReadConfig rc;
54✔
2657
    rc.start_line = static_cast<std::size_t>(start_line);
54✔
2658
    rc.end_line = static_cast<std::size_t>(end_line);
54✔
2659
    rc.start_byte = static_cast<std::size_t>(start_byte);
54✔
2660
    rc.end_byte = static_cast<std::size_t>(end_byte);
54✔
2661
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
54✔
2662
    rc.flatten_objects = flatten_objects != 0;
54✔
2663
    if (query_str) rc.query = query_str;
54!
2664

2665
    auto state = std::make_shared<ArrowIteratorState>();
54!
2666
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
54!
2667
        static_cast<std::size_t>(memory_budget));
27✔
2668

2669
    Runtime *rt = get_runtime(self);
54!
2670
    std::size_t max_workers = rt->threads();
54!
2671
    auto bs = static_cast<std::size_t>(batch_size);
54✔
2672
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
54!
2673
        state->memory_budget_bytes, bs * ESTIMATED_BYTES_PER_ARROW_ROW,
54✔
2674
        max_workers);
27✔
2675
    state->channel =
54✔
2676
        dftracer::utils::coro::make_channel<ArrowIteratorState::BatchType>(
81!
2677
            capacity);
54✔
2678
    auto *sp = state.get();
54✔
2679

2680
    try {
2681
        bool is_dir = fs::is_directory(cfg.file_path);
54!
2682
        if (is_dir) {
54✔
2683
            auto handle = rt->scope(
10!
2684
                "iter_arrow_parallel",
5!
2685
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
45!
2686
                 checkpoint_size = cfg.checkpoint_size,
10✔
2687
                 auto_build_index = cfg.auto_build_index, rc, bs,
15!
2688
                 norm = normalize != 0,
10✔
2689
                 max_workers](CoroScope &scope) -> CoroTask<void> {
10!
2690
                    co_await produce_arrow_batches_parallel(
40!
2691
                        scope, sp, dir_path, index_dir, checkpoint_size,
15!
2692
                        auto_build_index, rc, bs, norm, max_workers);
15!
2693
                });
30!
2694
            state->task_future = handle.future;
10!
2695
        } else if (normalize) {
54!
2696
            auto handle = rt->submit(
×
2697
                produce_arrow_batches(state, state->channel->producer(), cfg,
×
2698
                                      rc, static_cast<std::size_t>(batch_size),
×
2699
                                      flatten_objects != 0, normalize != 0),
2700
                "iter_arrow");
×
2701
            state->task_future = handle.future;
×
2702
        } else {
×
2703
            std::vector<std::string> files_vec{cfg.file_path};
88!
2704
            auto handle = rt->scope(
44!
2705
                "iter_arrow_parallel",
22!
2706
                [sp, files = std::move(files_vec), index_dir = cfg.index_dir,
198!
2707
                 checkpoint_size = cfg.checkpoint_size,
44✔
2708
                 auto_build_index = cfg.auto_build_index, rc, bs,
66!
2709
                 norm = normalize != 0,
44✔
2710
                 max_workers](CoroScope &scope) mutable -> CoroTask<void> {
44!
2711
                    co_await produce_arrow_batches_for_files(
176!
2712
                        scope, sp, std::move(files), index_dir, checkpoint_size,
66!
2713
                        auto_build_index, rc, bs, norm, max_workers);
66!
2714
                });
132!
2715
            state->task_future = handle.future;
44!
2716
        }
44✔
2717
    } catch (const std::exception &e) {
27!
2718
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2719
        return NULL;
×
2720
    }
×
2721

2722
    TraceReaderIteratorObject *it = make_arrow_iterator(std::move(state));
54!
2723
    return (PyObject *)it;
54✔
2724
}
54✔
2725

2726
// Build ArrowIteratorState + spawn the producer task. Same plumbing as
2727
// TraceReader_iter_arrow but returns the state so callers can wrap it as
2728
// either a per-batch iterator or an ArrowArrayStream.
2729
static std::shared_ptr<ArrowIteratorState> spawn_arrow_producer(
54✔
2730
    TraceReaderObject *self, PyObject *args, PyObject *kwds) {
2731
    static const char *kwlist[] = {
2732
        "batch_size", "start_line",    "end_line", "start_byte",
2733
        "end_byte",   "buffer_size",   "query",    "flatten_objects",
2734
        "normalize",  "memory_budget", NULL};
2735
    Py_ssize_t batch_size = 10000;
54✔
2736
    Py_ssize_t start_line = 0, end_line = 0;
54✔
2737
    Py_ssize_t start_byte = 0, end_byte = 0;
54✔
2738
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
54✔
2739
    const char *query_str = NULL;
54✔
2740
    int flatten_objects = 1;  // default: expand top-level objects
54✔
2741
    int normalize = 0;
54✔
2742
    Py_ssize_t memory_budget = 0;
54✔
2743

2744
    if (!PyArg_ParseTupleAndKeywords(
54!
2745
            args, kwds, "|nnnnnnzppn", (char **)kwlist, &batch_size,
27✔
2746
            &start_line, &end_line, &start_byte, &end_byte, &buffer_size,
2747
            &query_str, &flatten_objects, &normalize, &memory_budget)) {
2748
        return nullptr;
×
2749
    }
2750

2751
    if (batch_size <= 0) {
54!
2752
        PyErr_SetString(PyExc_ValueError, "batch_size must be > 0");
×
2753
        return nullptr;
×
2754
    }
2755
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
54!
2756
        buffer_size <= 0) {
54!
2757
        PyErr_SetString(
×
2758
            PyExc_ValueError,
2759
            "range arguments must be >= 0; buffer_size must be > 0");
2760
        return nullptr;
×
2761
    }
2762

2763
    TraceReaderConfig cfg;
54✔
2764
    try {
2765
        cfg = build_config(self);
54!
2766
    } catch (const std::exception &e) {
27!
2767
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2768
        return nullptr;
×
2769
    }
×
2770

2771
    ReadConfig rc;
54✔
2772
    rc.start_line = static_cast<std::size_t>(start_line);
54✔
2773
    rc.end_line = static_cast<std::size_t>(end_line);
54✔
2774
    rc.start_byte = static_cast<std::size_t>(start_byte);
54✔
2775
    rc.end_byte = static_cast<std::size_t>(end_byte);
54✔
2776
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
54✔
2777
    rc.flatten_objects = flatten_objects != 0;
54✔
2778
    if (query_str) rc.query = query_str;
54!
2779

2780
    auto state = std::make_shared<ArrowIteratorState>();
54!
2781
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
54!
2782
        static_cast<std::size_t>(memory_budget));
27✔
2783

2784
    Runtime *rt = get_runtime(self);
54!
2785
    std::size_t max_workers = rt->threads();
54!
2786
    auto bs = static_cast<std::size_t>(batch_size);
54✔
2787
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
54!
2788
        state->memory_budget_bytes, bs * ESTIMATED_BYTES_PER_ARROW_ROW,
54✔
2789
        max_workers);
27✔
2790
    state->channel =
54✔
2791
        dftracer::utils::coro::make_channel<ArrowIteratorState::BatchType>(
81!
2792
            capacity);
54✔
2793
    auto *sp = state.get();
54✔
2794

2795
    try {
2796
        bool is_dir = fs::is_directory(cfg.file_path);
54!
2797
        if (is_dir) {
54✔
2798
            auto handle = rt->scope(
20!
2799
                "iter_arrow_parallel",
10!
2800
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
90!
2801
                 checkpoint_size = cfg.checkpoint_size,
20✔
2802
                 auto_build_index = cfg.auto_build_index, rc, bs,
30!
2803
                 norm = normalize != 0,
20✔
2804
                 max_workers](CoroScope &scope) -> CoroTask<void> {
20!
2805
                    co_await produce_arrow_batches_parallel(
80!
2806
                        scope, sp, dir_path, index_dir, checkpoint_size,
30!
2807
                        auto_build_index, rc, bs, norm, max_workers);
30!
2808
                });
60!
2809
            state->task_future = handle.future;
20!
2810
        } else {
20✔
2811
            auto handle = rt->submit(
34!
2812
                produce_arrow_batches(state, state->channel->producer(), cfg,
68!
2813
                                      rc, static_cast<std::size_t>(batch_size),
17!
2814
                                      flatten_objects != 0, normalize != 0),
17✔
2815
                "iter_arrow");
68!
2816
            state->task_future = handle.future;
34!
2817
        }
34✔
2818
    } catch (const std::exception &e) {
27!
2819
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
2820
        return nullptr;
×
2821
    }
×
2822

2823
    return state;
54✔
2824
}
54✔
2825

2826
static PyObject *TraceReader_iter_arrow_stream(TraceReaderObject *self,
28✔
2827
                                               PyObject *args, PyObject *kwds) {
2828
    auto state = spawn_arrow_producer(self, args, kwds);
28!
2829
    if (!state) return NULL;
28!
2830
    return make_arrow_batch_stream(std::move(state));
28!
2831
}
28✔
2832

2833
static PyObject *TraceReader_read_arrow(TraceReaderObject *self, PyObject *args,
26✔
2834
                                        PyObject *kwds) {
2835
    auto state = spawn_arrow_producer(self, args, kwds);
26!
2836
    if (!state) return NULL;
26!
2837
    PyObject *stream = make_arrow_batch_stream(std::move(state));
26!
2838
    if (!stream) return NULL;
26✔
2839
    return dftracer::utils::python::wrap_arrow_stream_table(stream);
26!
2840
}
26✔
2841

2842
#endif  // DFTRACER_UTILS_ENABLE_ARROW
2843

2844
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
2845

2846
static int parse_str_list_trace(PyObject *obj, std::vector<std::string> &out,
2847
                                const char *param_name) {
2848
    if (!obj || obj == Py_None) return 0;
×
2849
    if (!PyList_Check(obj)) {
×
2850
        PyErr_Format(PyExc_TypeError, "%s must be a list of str", param_name);
2851
        return -1;
2852
    }
2853
    Py_ssize_t n = PyList_Size(obj);
2854
    for (Py_ssize_t i = 0; i < n; i++) {
×
2855
        const char *s = PyUnicode_AsUTF8(PyList_GetItem(obj, i));
×
2856
        if (!s) return -1;
×
2857
        out.emplace_back(s);
×
2858
    }
2859
    return 0;
2860
}
2861

2862
static PyObject *TraceReader_write_arrow(TraceReaderObject *self,
26✔
2863
                                         PyObject *args, PyObject *kwds) {
2864
    static const char *kwlist[] = {"path",        "views",      "chunk_size_mb",
2865
                                   "compression", "batch_size", NULL};
2866
    const char *path = NULL;
26✔
2867
    PyObject *views_obj = Py_None;
26✔
2868
    int chunk_size_mb = 32;
26✔
2869
    const char *compression_str = "zstd";
26✔
2870
    Py_ssize_t batch_size = 10000;
26✔
2871

2872
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|Oisn", (char **)kwlist,
26!
2873
                                     &path, &views_obj, &chunk_size_mb,
2874
                                     &compression_str, &batch_size)) {
2875
        return NULL;
×
2876
    }
2877

2878
    if (chunk_size_mb < 0) {
26✔
2879
        PyErr_SetString(PyExc_ValueError, "chunk_size_mb must be >= 0");
×
2880
        return NULL;
×
2881
    }
2882

2883
    std::vector<ViewDefinition> views;
26✔
2884
    if (views_obj && views_obj != Py_None) {
26!
2885
        if (!PyList_Check(views_obj)) {
10!
2886
            PyErr_SetString(PyExc_TypeError, "views must be a list or None");
×
2887
            return NULL;
×
2888
        }
2889
        Py_ssize_t n = PyList_Size(views_obj);
10!
2890
        for (Py_ssize_t i = 0; i < n; i++) {
22✔
2891
            PyObject *item = PyList_GetItem(views_obj, i);
12!
2892
            ViewDefinition vd;
12✔
2893

2894
            if (PyUnicode_Check(item)) {
12✔
2895
                const char *name = PyUnicode_AsUTF8(item);
2!
2896
                if (!name) return NULL;
2✔
2897
                std::string name_str(name);
2!
2898
                if (name_str == "io") {
2!
2899
                    vd = ViewDefinition::io_view();
2!
2900
                } else if (name_str == "compute") {
1!
2901
                    vd = ViewDefinition::compute_view();
×
2902
                } else if (name_str == "dlio") {
×
2903
                    vd = ViewDefinition::dlio_view();
×
2904
                } else {
2905
                    vd.with_name(name_str);
×
2906
                }
2907
            } else if (PyDict_Check(item)) {
12!
2908
                PyObject *name_obj = PyDict_GetItemString(item, "name");
10!
2909
                if (!name_obj || !PyUnicode_Check(name_obj)) {
10!
2910
                    PyErr_SetString(PyExc_ValueError,
×
2911
                                    "view dict must have 'name' string");
2912
                    return NULL;
×
2913
                }
2914
                vd.with_name(PyUnicode_AsUTF8(name_obj));
10!
2915

2916
                PyObject *query_obj = PyDict_GetItemString(item, "query");
10!
2917
                if (query_obj && query_obj != Py_None) {
10!
2918
                    if (!PyUnicode_Check(query_obj)) {
10!
2919
                        PyErr_SetString(PyExc_ValueError,
×
2920
                                        "view 'query' must be a string");
2921
                        return NULL;
×
2922
                    }
2923
                    vd.with_query(PyUnicode_AsUTF8(query_obj));
10!
2924
                }
5✔
2925

2926
                PyObject *meta_obj =
5✔
2927
                    PyDict_GetItemString(item, "include_metadata");
10!
2928
                if (meta_obj && meta_obj != Py_None) {
10!
2929
                    vd.with_include_metadata(PyObject_IsTrue(meta_obj));
2!
2930
                }
1✔
2931
            } else {
5✔
2932
                PyErr_SetString(PyExc_TypeError,
×
2933
                                "views list must contain strings or dicts");
2934
                return NULL;
×
2935
            }
2936
            views.push_back(std::move(vd));
12!
2937
        }
12✔
2938
    }
5✔
2939

2940
    IpcCompression compression = IpcCompression::ZSTD;
26✔
2941
    if (compression_str) {
26!
2942
        std::string comp_lower(compression_str);
26!
2943
        for (auto &c : comp_lower) c = std::tolower(c);
130!
2944
        if (comp_lower == "none") {
26✔
2945
            compression = IpcCompression::NONE;
2✔
2946
        } else if (comp_lower == "zstd") {
25✔
2947
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
2948
            compression = IpcCompression::ZSTD;
24✔
2949
#else
2950
            PyErr_SetString(
2951
                PyExc_ValueError,
2952
                "ZSTD compression not available (built without ZSTD)");
2953
            return NULL;
2954
#endif
2955
        } else {
12✔
2956
            PyErr_Format(PyExc_ValueError,
×
2957
                         "Unknown compression: %s (use 'none' or 'zstd')",
2958
                         compression_str);
2959
            return NULL;
×
2960
        }
2961
    }
26✔
2962

2963
    int64_t chunk_size_bytes =
26✔
2964
        static_cast<int64_t>(chunk_size_mb) * 1024 * 1024;
26✔
2965

2966
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
26!
2967
    std::string index_path;
26✔
2968
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
26!
2969
    if (idx && idx[0] != '\0') {
26!
2970
        index_path = idx;
×
2971
    }
2972
    std::size_t checkpoint_size = self->checkpoint_size;
26✔
2973

2974
    std::string output_path(path);
26!
2975
    WriteArrowResult result;
26✔
2976
    std::string error_msg;
26✔
2977

2978
    Py_BEGIN_ALLOW_THREADS try {
26!
2979
        Runtime *rt = get_runtime(self);
26!
2980
        result =
13✔
2981
            rt->submit(write_arrow_pipeline(
78!
2982
                           file_path, index_path, checkpoint_size,
13!
2983
                           std::move(views), output_path, chunk_size_bytes,
26!
2984
                           compression, static_cast<std::size_t>(batch_size)),
13✔
2985
                       "write_arrow")
13!
2986
                .get();
26!
2987
    } catch (const std::exception &e) {
13!
2988
        error_msg = e.what();
×
2989
    }
×
2990
    Py_END_ALLOW_THREADS
26!
2991

2992
        if (!error_msg.empty()) {
26!
2993
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
2994
        return NULL;
×
2995
    }
2996

2997
    if (!result.error.empty()) {
26!
2998
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
2999
        return NULL;
×
3000
    }
3001

3002
    // Build result dict
3003
    PyObject *dict = PyDict_New();
26!
3004
    if (!dict) return NULL;
26✔
3005

3006
    // Build files list per partition
3007
    PyObject *partitions_dict = PyDict_New();
26!
3008
    if (!partitions_dict) {
26!
3009
        Py_DECREF(dict);
×
3010
        return NULL;
×
3011
    }
3012

3013
    for (const auto &[partition_name, partition_stats] :
110!
3014
         result.stats.partitions) {
53✔
3015
        PyObject *partition_dict = PyDict_New();
28!
3016
        if (!partition_dict) {
28!
3017
            Py_DECREF(partitions_dict);
×
3018
            Py_DECREF(dict);
×
3019
            return NULL;
×
3020
        }
3021

3022
        PyObject *files_list = PyList_New(0);
28!
3023
        if (!files_list) {
28!
3024
            Py_DECREF(partition_dict);
×
3025
            Py_DECREF(partitions_dict);
×
3026
            Py_DECREF(dict);
×
3027
            return NULL;
×
3028
        }
3029

3030
        for (const auto &f : partition_stats.files) {
44✔
3031
            PyObject *file_str = PyUnicode_FromString(f.c_str());
16!
3032
            if (!file_str || PyList_Append(files_list, file_str) < 0) {
16!
3033
                Py_XDECREF(file_str);
×
3034
                Py_DECREF(files_list);
×
3035
                Py_DECREF(partition_dict);
×
3036
                Py_DECREF(partitions_dict);
×
3037
                Py_DECREF(dict);
×
3038
                return NULL;
×
3039
            }
3040
            Py_DECREF(file_str);
8!
3041
        }
3042

3043
        PyDict_SetItemString(partition_dict, "files", files_list);
28!
3044
        PyDict_SetItemString(partition_dict, "rows",
28!
3045
                             PyLong_FromLongLong(partition_stats.total_rows));
28!
3046
        PyDict_SetItemString(
28!
3047
            partition_dict, "bytes",
14✔
3048
            PyLong_FromLongLong(partition_stats.total_uncompressed_bytes));
28!
3049
        Py_DECREF(files_list);
14!
3050

3051
        PyObject *key = partition_name.empty()
42!
3052
                            ? PyUnicode_FromString("_default")
14!
3053
                            : PyUnicode_FromString(partition_name.c_str());
28!
3054
        PyDict_SetItem(partitions_dict, key, partition_dict);
28!
3055
        Py_DECREF(key);
14!
3056
        Py_DECREF(partition_dict);
14!
3057
    }
3058

3059
    PyDict_SetItemString(dict, "partitions", partitions_dict);
26!
3060
    PyDict_SetItemString(dict, "total_rows",
26!
3061
                         PyLong_FromLongLong(result.stats.total_rows));
26!
3062
    PyDict_SetItemString(
26!
3063
        dict, "total_bytes",
13✔
3064
        PyLong_FromLongLong(result.stats.total_uncompressed_bytes));
26!
3065
    PyDict_SetItemString(dict, "chunks_scanned",
26!
3066
                         PyLong_FromUnsignedLongLong(result.chunks_scanned));
26!
3067
    PyDict_SetItemString(dict, "chunks_skipped",
26!
3068
                         PyLong_FromUnsignedLongLong(result.chunks_skipped));
26!
3069
    Py_DECREF(partitions_dict);
13!
3070

3071
    return dict;
26✔
3072
}
26✔
3073

3074
static PyObject *TraceReader_get_view_chunks(TraceReaderObject *self,
6✔
3075
                                             PyObject *args, PyObject *kwds) {
3076
    static const char *kwlist[] = {"view", NULL};
3077
    PyObject *view_obj = Py_None;
6✔
3078

3079
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char **)kwlist,
6!
3080
                                     &view_obj)) {
3081
        return NULL;
×
3082
    }
3083

3084
    ViewDefinition view;
6✔
3085
    if (view_obj && view_obj != Py_None) {
6!
3086
        if (PyUnicode_Check(view_obj)) {
2!
3087
            const char *name = PyUnicode_AsUTF8(view_obj);
×
3088
            if (!name) return NULL;
×
3089
            std::string name_str(name);
×
3090
            if (name_str == "io") {
×
3091
                view = ViewDefinition::io_view();
×
3092
            } else if (name_str == "compute") {
×
3093
                view = ViewDefinition::compute_view();
×
3094
            } else if (name_str == "dlio") {
×
3095
                view = ViewDefinition::dlio_view();
×
3096
            } else {
3097
                view.with_name(name_str);
×
3098
            }
3099
        } else if (PyDict_Check(view_obj)) {
2!
3100
            PyObject *name_obj = PyDict_GetItemString(view_obj, "name");
2!
3101
            if (name_obj && PyUnicode_Check(name_obj)) {
2!
3102
                view.with_name(PyUnicode_AsUTF8(name_obj));
2!
3103
            }
1✔
3104
            PyObject *query_obj = PyDict_GetItemString(view_obj, "query");
2!
3105
            if (query_obj && query_obj != Py_None &&
3!
3106
                PyUnicode_Check(query_obj)) {
2✔
3107
                view.with_query(PyUnicode_AsUTF8(query_obj));
2!
3108
            }
1✔
3109
        } else {
1✔
3110
            PyErr_SetString(PyExc_TypeError, "view must be a string or dict");
×
3111
            return NULL;
×
3112
        }
3113
    }
1✔
3114

3115
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
6!
3116
    std::string index_path;
6✔
3117
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
6!
3118
    if (idx && idx[0] != '\0') {
6!
3119
        index_path = idx;
×
3120
    }
3121
    std::size_t checkpoint_size = self->checkpoint_size;
6✔
3122

3123
    GetViewChunksResult result;
6✔
3124
    std::string error_msg;
6✔
3125

3126
    Py_BEGIN_ALLOW_THREADS try {
6!
3127
        Runtime *rt = get_runtime(self);
6!
3128
        result = rt->submit(get_view_chunks_pipeline(file_path, index_path,
18!
3129
                                                     checkpoint_size, view),
3!
3130
                            "get_view_chunks")
3!
3131
                     .get();
6!
3132
    } catch (const std::exception &e) {
3!
3133
        error_msg = e.what();
×
3134
    }
×
3135
    Py_END_ALLOW_THREADS
6!
3136

3137
        if (!error_msg.empty()) {
6!
3138
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
3139
        return NULL;
×
3140
    }
3141

3142
    if (!result.error.empty()) {
6!
3143
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
3144
        return NULL;
×
3145
    }
3146

3147
    PyObject *dict = PyDict_New();
6!
3148
    if (!dict) return NULL;
6!
3149

3150
    PyObject *chunks_list = PyList_New(result.chunks.size());
6!
3151
    if (!chunks_list) {
6!
3152
        Py_DECREF(dict);
×
3153
        return NULL;
×
3154
    }
3155

3156
    for (std::size_t i = 0; i < result.chunks.size(); ++i) {
14✔
3157
        const auto &chunk = result.chunks[i];
8✔
3158
        PyObject *chunk_dict = PyDict_New();
8!
3159
        if (!chunk_dict) {
8!
3160
            Py_DECREF(chunks_list);
×
3161
            Py_DECREF(dict);
×
3162
            return NULL;
×
3163
        }
3164
        PyDict_SetItemString(chunk_dict, "checkpoint_idx",
8!
3165
                             PyLong_FromUnsignedLongLong(chunk.checkpoint_idx));
8!
3166
        PyDict_SetItemString(chunk_dict, "start_byte",
8!
3167
                             PyLong_FromSize_t(chunk.start_byte));
8!
3168
        PyDict_SetItemString(chunk_dict, "end_byte",
8!
3169
                             PyLong_FromSize_t(chunk.end_byte));
8!
3170
        PyList_SetItem(chunks_list, i, chunk_dict);
8!
3171
    }
4✔
3172

3173
    PyDict_SetItemString(dict, "chunks", chunks_list);
6!
3174
    PyDict_SetItemString(dict, "total_checkpoints",
6!
3175
                         PyLong_FromUnsignedLongLong(result.total_checkpoints));
6!
3176
    PyDict_SetItemString(
6!
3177
        dict, "skipped_checkpoints",
3✔
3178
        PyLong_FromUnsignedLongLong(result.skipped_checkpoints));
6!
3179
    PyDict_SetItemString(dict, "file_may_match",
6!
3180
                         PyBool_FromLong(result.file_may_match ? 1 : 0));
6✔
3181
    Py_DECREF(chunks_list);
3!
3182

3183
    return dict;
6✔
3184
}
6✔
3185

3186
static PyObject *TraceReader_write_view_chunk(TraceReaderObject *self,
2✔
3187
                                              PyObject *args, PyObject *kwds) {
3188
    static const char *kwlist[] = {
3189
        "output_file", "checkpoint_idx", "start_byte", "end_byte",
3190
        "view",        "compression",    "batch_size", NULL};
3191
    const char *output_file = NULL;
2✔
3192
    unsigned long long checkpoint_idx = 0;
2✔
3193
    Py_ssize_t start_byte = 0;
2✔
3194
    Py_ssize_t end_byte = 0;
2✔
3195
    PyObject *view_obj = Py_None;
2✔
3196
    const char *compression_str = "zstd";
2✔
3197
    Py_ssize_t batch_size = 10000;
2✔
3198

3199
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "sKnn|Osn", (char **)kwlist,
2!
3200
                                     &output_file, &checkpoint_idx, &start_byte,
3201
                                     &end_byte, &view_obj, &compression_str,
3202
                                     &batch_size)) {
3203
        return NULL;
×
3204
    }
3205

3206
    IpcCompression compression = IpcCompression::ZSTD;
2✔
3207
    if (compression_str) {
2✔
3208
        std::string comp_lower(compression_str);
2!
3209
        for (auto &c : comp_lower) c = std::tolower(c);
10!
3210
        if (comp_lower == "none") {
2!
3211
            compression = IpcCompression::NONE;
×
3212
        } else if (comp_lower == "zstd") {
2✔
3213
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
3214
            compression = IpcCompression::ZSTD;
2✔
3215
#else
3216
            PyErr_SetString(PyExc_ValueError, "ZSTD compression not available");
3217
            return NULL;
3218
#endif
3219
        }
1✔
3220
    }
2✔
3221

3222
    ViewDefinition view;
2✔
3223
    if (view_obj && view_obj != Py_None) {
2!
3224
        if (PyUnicode_Check(view_obj)) {
×
3225
            const char *name = PyUnicode_AsUTF8(view_obj);
×
3226
            if (!name) return NULL;
×
3227
            std::string name_str(name);
×
3228
            if (name_str == "io") {
×
3229
                view = ViewDefinition::io_view();
×
3230
            } else if (name_str == "compute") {
×
3231
                view = ViewDefinition::compute_view();
×
3232
            } else if (name_str == "dlio") {
×
3233
                view = ViewDefinition::dlio_view();
×
3234
            } else {
3235
                view.with_name(name_str);
×
3236
            }
3237
        } else if (PyDict_Check(view_obj)) {
×
3238
            PyObject *name_obj = PyDict_GetItemString(view_obj, "name");
×
3239
            if (name_obj && PyUnicode_Check(name_obj)) {
×
3240
                view.with_name(PyUnicode_AsUTF8(name_obj));
×
3241
            }
3242
            PyObject *query_obj = PyDict_GetItemString(view_obj, "query");
×
3243
            if (query_obj && query_obj != Py_None &&
×
3244
                PyUnicode_Check(query_obj)) {
×
3245
                view.with_query(PyUnicode_AsUTF8(query_obj));
×
3246
            }
3247
        }
3248
    }
3249

3250
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
2!
3251
    std::string index_path;
2✔
3252
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
2!
3253
    if (idx && idx[0] != '\0') {
2!
3254
        index_path = idx;
×
3255
    }
3256
    std::size_t checkpoint_size = self->checkpoint_size;
2✔
3257

3258
    WriteViewChunkResult result;
2✔
3259
    std::string error_msg;
2✔
3260

3261
    Py_BEGIN_ALLOW_THREADS try {
2!
3262
        Runtime *rt = get_runtime(self);
2!
3263
        result =
1✔
3264
            rt->submit(write_view_chunk_pipeline(
7!
3265
                           file_path, index_path, checkpoint_size, view,
1!
3266
                           checkpoint_idx, static_cast<std::size_t>(start_byte),
1✔
3267
                           static_cast<std::size_t>(end_byte),
1✔
3268
                           std::string(output_file), compression,
3!
3269
                           static_cast<std::size_t>(batch_size)),
1✔
3270
                       "write_view_chunk")
1!
3271
                .get();
2!
3272
    } catch (const std::exception &e) {
1!
3273
        error_msg = e.what();
×
3274
    }
×
3275
    Py_END_ALLOW_THREADS
2!
3276

3277
        if (!error_msg.empty()) {
2!
3278
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
3279
        return NULL;
×
3280
    }
3281

3282
    if (!result.error.empty()) {
2!
3283
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
3284
        return NULL;
×
3285
    }
3286

3287
    PyObject *dict = PyDict_New();
2!
3288
    if (!dict) return NULL;
2✔
3289

3290
    PyDict_SetItemString(dict, "output_file",
2!
3291
                         PyUnicode_FromString(result.output_file.c_str()));
1!
3292
    PyDict_SetItemString(dict, "events_matched",
2!
3293
                         PyLong_FromUnsignedLongLong(result.events_matched));
2!
3294
    PyDict_SetItemString(dict, "events_scanned",
2!
3295
                         PyLong_FromUnsignedLongLong(result.events_scanned));
2!
3296
    PyDict_SetItemString(dict, "rows_written",
2!
3297
                         PyLong_FromLongLong(result.rows_written));
2!
3298
    PyDict_SetItemString(dict, "bytes_written",
2!
3299
                         PyLong_FromLongLong(result.bytes_written));
2!
3300

3301
    return dict;
2✔
3302
}
2✔
3303

3304
static PyObject *TraceReader_write_view_chunks(TraceReaderObject *self,
2✔
3305
                                               PyObject *args, PyObject *kwds) {
3306
    static const char *kwlist[] = {"chunks",      "output_dir", "view",
3307
                                   "compression", "batch_size", NULL};
3308
    PyObject *chunks_list = NULL;
2✔
3309
    const char *output_dir = NULL;
2✔
3310
    PyObject *view_obj = Py_None;
2✔
3311
    const char *compression_str = "zstd";
2✔
3312
    Py_ssize_t batch_size = 10000;
2✔
3313

3314
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "Os|Osn", (char **)kwlist,
2!
3315
                                     &chunks_list, &output_dir, &view_obj,
3316
                                     &compression_str, &batch_size)) {
3317
        return NULL;
×
3318
    }
3319

3320
    if (!PyList_Check(chunks_list)) {
2✔
3321
        PyErr_SetString(PyExc_TypeError, "chunks must be a list");
×
3322
        return NULL;
×
3323
    }
3324

3325
    IpcCompression compression = IpcCompression::ZSTD;
2✔
3326
    if (strcmp(compression_str, "none") == 0) {
2!
3327
        compression = IpcCompression::NONE;
×
3328
    } else if (strcmp(compression_str, "zstd") != 0) {
2!
3329
        PyErr_SetString(PyExc_ValueError,
×
3330
                        "compression must be 'zstd' or 'none'");
3331
        return NULL;
×
3332
    }
3333

3334
    ViewDefinition view;
2✔
3335
    if (view_obj && view_obj != Py_None) {
2!
3336
        if (PyUnicode_Check(view_obj)) {
×
3337
            const char *name = PyUnicode_AsUTF8(view_obj);
×
3338
            if (!name) return NULL;
×
3339
            std::string name_str(name);
×
3340
            if (name_str == "io") {
×
3341
                view = ViewDefinition::io_view();
×
3342
            } else if (name_str == "compute") {
×
3343
                view = ViewDefinition::compute_view();
×
3344
            } else if (name_str == "dlio") {
×
3345
                view = ViewDefinition::dlio_view();
×
3346
            } else {
3347
                view.with_name(name_str);
×
3348
            }
3349
        } else if (PyDict_Check(view_obj)) {
×
3350
            PyObject *name_obj = PyDict_GetItemString(view_obj, "name");
×
3351
            if (name_obj && PyUnicode_Check(name_obj)) {
×
3352
                view.with_name(PyUnicode_AsUTF8(name_obj));
×
3353
            }
3354
            PyObject *query_obj = PyDict_GetItemString(view_obj, "query");
×
3355
            if (query_obj && query_obj != Py_None &&
×
3356
                PyUnicode_Check(query_obj)) {
×
3357
                view.with_query(PyUnicode_AsUTF8(query_obj));
×
3358
            }
3359
        }
3360
    }
3361

3362
    std::vector<ChunkDescriptor> chunks;
2✔
3363
    Py_ssize_t num_chunks = PyList_Size(chunks_list);
2!
3364
    chunks.reserve(static_cast<std::size_t>(num_chunks));
2!
3365

3366
    for (Py_ssize_t i = 0; i < num_chunks; i++) {
8✔
3367
        PyObject *chunk_dict = PyList_GetItem(chunks_list, i);
6!
3368
        if (!PyDict_Check(chunk_dict)) {
6!
3369
            PyErr_SetString(PyExc_TypeError, "each chunk must be a dict");
×
3370
            return NULL;
×
3371
        }
3372

3373
        ChunkDescriptor desc;
6✔
3374

3375
        PyObject *cp_idx = PyDict_GetItemString(chunk_dict, "checkpoint_idx");
6!
3376
        PyObject *start = PyDict_GetItemString(chunk_dict, "start_byte");
6!
3377
        PyObject *end = PyDict_GetItemString(chunk_dict, "end_byte");
6!
3378

3379
        if (!cp_idx || !start || !end) {
6!
3380
            PyErr_SetString(
×
3381
                PyExc_KeyError,
3382
                "chunk must have checkpoint_idx, start_byte, end_byte");
3383
            return NULL;
×
3384
        }
3385

3386
        desc.checkpoint_idx =
6✔
3387
            static_cast<std::uint64_t>(PyLong_AsUnsignedLongLong(cp_idx));
6!
3388
        desc.start_byte =
6✔
3389
            static_cast<std::size_t>(PyLong_AsUnsignedLongLong(start));
6!
3390
        desc.end_byte =
6✔
3391
            static_cast<std::size_t>(PyLong_AsUnsignedLongLong(end));
6!
3392

3393
        char filename[64];
3394
        snprintf(filename, sizeof(filename), "chunk-%05llu.arrow",
9✔
3395
                 (unsigned long long)desc.checkpoint_idx);
6✔
3396
        desc.output_file = std::string(output_dir) + "/" + filename;
6!
3397

3398
        chunks.push_back(std::move(desc));
6!
3399
    }
6✔
3400

3401
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
2!
3402
    std::string index_path;
2✔
3403
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
2!
3404
    if (idx && idx[0] != '\0') {
2!
3405
        index_path = idx;
×
3406
    }
3407
    std::size_t checkpoint_size = self->checkpoint_size;
2✔
3408

3409
    WriteViewChunksResult result;
2✔
3410
    std::string error_msg;
2✔
3411

3412
    Py_BEGIN_ALLOW_THREADS try {
2!
3413
        Runtime *rt = get_runtime(self);
2!
3414
        result = rt->submit(write_view_chunks_pipeline(
7!
3415
                                file_path, index_path, checkpoint_size, view,
1!
3416
                                std::move(chunks), compression,
2✔
3417
                                static_cast<std::size_t>(batch_size)),
1✔
3418
                            "write_view_chunks")
1!
3419
                     .get();
2!
3420
    } catch (const std::exception &e) {
1!
3421
        error_msg = e.what();
×
3422
    }
×
3423
    Py_END_ALLOW_THREADS
2!
3424

3425
        if (!error_msg.empty()) {
2!
3426
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
3427
        return NULL;
×
3428
    }
3429

3430
    PyObject *dict = PyDict_New();
2!
3431
    if (!dict) return NULL;
2✔
3432

3433
    PyObject *results_list =
1✔
3434
        PyList_New(static_cast<Py_ssize_t>(result.results.size()));
2!
3435
    if (!results_list) {
2!
3436
        Py_DECREF(dict);
×
3437
        return NULL;
×
3438
    }
3439

3440
    for (std::size_t i = 0; i < result.results.size(); i++) {
8✔
3441
        const auto &r = result.results[i];
6✔
3442
        PyObject *item = PyDict_New();
6!
3443
        if (!item) {
6!
3444
            Py_DECREF(results_list);
×
3445
            Py_DECREF(dict);
×
3446
            return NULL;
×
3447
        }
3448
        PyDict_SetItemString(item, "output_file",
6!
3449
                             PyUnicode_FromString(r.output_file.c_str()));
3!
3450
        PyDict_SetItemString(item, "rows_written",
6!
3451
                             PyLong_FromLongLong(r.rows_written));
6!
3452
        PyDict_SetItemString(item, "events_matched",
6!
3453
                             PyLong_FromUnsignedLongLong(r.events_matched));
6!
3454
        if (!r.error.empty()) {
6!
3455
            PyDict_SetItemString(item, "error",
×
3456
                                 PyUnicode_FromString(r.error.c_str()));
×
3457
        }
3458
        PyList_SetItem(results_list, static_cast<Py_ssize_t>(i), item);
6!
3459
    }
3✔
3460

3461
    PyDict_SetItemString(dict, "results", results_list);
2!
3462
    Py_DECREF(results_list);
1!
3463
    PyDict_SetItemString(dict, "total_rows",
2!
3464
                         PyLong_FromLongLong(result.total_rows));
2!
3465
    PyDict_SetItemString(dict, "total_events_matched",
2!
3466
                         PyLong_FromLongLong(result.total_events_matched));
2!
3467

3468
    return dict;
2✔
3469
}
2✔
3470

3471
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
3472

3473
static PyObject *TraceReader_enter(TraceReaderObject *self,
116!
3474
                                   PyObject *Py_UNUSED(ignored)) {
3475
    Py_INCREF(self);
58✔
3476
    return (PyObject *)self;
116✔
3477
}
3478

3479
static PyObject *TraceReader_exit(TraceReaderObject *self, PyObject *args) {
114✔
3480
    Py_RETURN_NONE;
114✔
3481
}
3482

3483
static PyObject *TraceReader_get_file_path(TraceReaderObject *self,
14✔
3484
                                           void *closure) {
3485
    Py_INCREF(self->file_path);
14!
3486
    return self->file_path;
14✔
3487
}
3488

3489
static PyObject *TraceReader_get_index_dir(TraceReaderObject *self,
6✔
3490
                                           void *closure) {
3491
    Py_INCREF(self->index_dir);
6✔
3492
    return self->index_dir;
6✔
3493
}
3494

3495
static PyObject *TraceReader_get_has_index(TraceReaderObject *self,
12✔
3496
                                           void *closure) {
3497
    return PyBool_FromLong(self->has_index);
12✔
3498
}
3499

3500
static PyObject *TraceReader_get_num_lines_prop(TraceReaderObject *self,
8✔
3501
                                                void *closure) {
3502
    try {
3503
        TraceReaderConfig cfg = build_config(self);
8!
3504
        TraceReader reader(std::move(cfg));
8!
3505
        std::size_t n = reader.get_num_lines();
8!
3506
        if (n > 0) return PyLong_FromSize_t(n);
8!
3507
    } catch (...) {
8!
3508
    }
×
3509
    PyObject *empty_args = PyTuple_New(0);
8✔
3510
    if (!empty_args) return NULL;
8✔
3511
    PyObject *list = TraceReader_read_lines(self, empty_args, NULL);
8✔
3512
    Py_DECREF(empty_args);
4✔
3513
    if (!list) return NULL;
8✔
3514
    Py_ssize_t n = PyList_GET_SIZE(list);
8✔
3515
    Py_DECREF(list);
4✔
3516
    return PyLong_FromSsize_t(n);
8✔
3517
}
4✔
3518

3519
static PyObject *TraceReader_get_max_bytes(TraceReaderObject *self,
24✔
3520
                                           PyObject *Py_UNUSED(ignored)) {
3521
    try {
3522
        TraceReaderConfig cfg = build_config(self);
24!
3523
        TraceReader reader(std::move(cfg));
24!
3524
        return PyLong_FromSize_t(reader.get_max_bytes());
24!
3525
    } catch (const std::exception &e) {
24!
3526
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
3527
        return NULL;
×
3528
    }
×
3529
}
12✔
3530

3531
static PyObject *TraceReader_get_num_lines(TraceReaderObject *self,
6✔
3532
                                           PyObject *Py_UNUSED(ignored)) {
3533
    try {
3534
        TraceReaderConfig cfg = build_config(self);
6!
3535
        TraceReader reader(std::move(cfg));
6!
3536
        return PyLong_FromSize_t(reader.get_num_lines());
6!
3537
    } catch (const std::exception &e) {
6!
3538
        PyErr_SetString(PyExc_RuntimeError, e.what());
×
3539
        return NULL;
×
3540
    }
×
3541
}
3✔
3542

3543
static PyMethodDef TraceReader_methods[] = {
3544
    {"iter_lines", (PyCFunction)TraceReader_iter_lines,
3545
     METH_VARARGS | METH_KEYWORDS,
3546
     "Return an iterator over decoded lines.\n"
3547
     "\n"
3548
     "Args:\n"
3549
     "    start_line (int): First line (0 = beginning).\n"
3550
     "    end_line (int): Last line (0 = end of file).\n"
3551
     "    start_byte (int): First byte offset (0 = beginning).\n"
3552
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3553
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3554
    {"iter_raw", (PyCFunction)TraceReader_iter_raw,
3555
     METH_VARARGS | METH_KEYWORDS,
3556
     "Return an iterator over raw byte chunks.\n"
3557
     "\n"
3558
     "Args:\n"
3559
     "    start_line (int): First line (0 = beginning).\n"
3560
     "    end_line (int): Last line (0 = end of file).\n"
3561
     "    start_byte (int): First byte offset (0 = beginning).\n"
3562
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3563
     "    buffer_size (int): Internal read buffer size in bytes.\n"
3564
     "    line_aligned (bool): Align chunks to line boundaries.\n"
3565
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
3566
    {"read_lines", (PyCFunction)TraceReader_read_lines,
3567
     METH_VARARGS | METH_KEYWORDS,
3568
     "Read all lines and return as list.\n"
3569
     "\n"
3570
     "Args:\n"
3571
     "    start_line (int): First line (0 = beginning).\n"
3572
     "    end_line (int): Last line (0 = end of file).\n"
3573
     "    start_byte (int): First byte offset (0 = beginning).\n"
3574
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3575
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3576
    {"iter_json", (PyCFunction)TraceReader_iter_json,
3577
     METH_VARARGS | METH_KEYWORDS,
3578
     "Return an iterator over parsed JSON events as Python dicts.\n"
3579
     "\n"
3580
     "Each event is parsed once in C++ (single-pass simdjson ondemand)\n"
3581
     "and yielded as a Python dict. No double-parsing overhead.\n"
3582
     "\n"
3583
     "Args:\n"
3584
     "    start_line (int): First line (0 = beginning).\n"
3585
     "    end_line (int): Last line (0 = end of file).\n"
3586
     "    start_byte (int): First byte offset (0 = beginning).\n"
3587
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3588
     "    buffer_size (int): Internal read buffer size in bytes.\n"
3589
     "    query (str): Optional query filter.\n"
3590
     "    batch_size (int): Events per internal batch (default 1024).\n"},
3591
    {"read_json", (PyCFunction)TraceReader_read_json_py,
3592
     METH_VARARGS | METH_KEYWORDS,
3593
     "Read all events as parsed Python dicts (list).\n"
3594
     "\n"
3595
     "Equivalent to list(iter_json(...)).\n"},
3596
    {"read_raw", (PyCFunction)TraceReader_read_raw,
3597
     METH_VARARGS | METH_KEYWORDS,
3598
     "Read all raw chunks and return as list.\n"
3599
     "\n"
3600
     "Args:\n"
3601
     "    start_line (int): First line (0 = beginning).\n"
3602
     "    end_line (int): Last line (0 = end of file).\n"
3603
     "    start_byte (int): First byte offset (0 = beginning).\n"
3604
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3605
     "    buffer_size (int): Internal read buffer size in bytes.\n"
3606
     "    line_aligned (bool): Align chunks to line boundaries.\n"
3607
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
3608
#ifdef DFTRACER_UTILS_ENABLE_ARROW
3609
    {"iter_arrow", (PyCFunction)TraceReader_iter_arrow,
3610
     METH_VARARGS | METH_KEYWORDS,
3611
     "Return an iterator over Arrow record batches.\n"
3612
     "\n"
3613
     "Args:\n"
3614
     "    batch_size (int): Maximum rows per Arrow batch.\n"
3615
     "    start_line (int): First line (0 = beginning).\n"
3616
     "    end_line (int): Last line (0 = end of file).\n"
3617
     "    start_byte (int): First byte offset (0 = beginning).\n"
3618
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3619
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3620
    {"iter_arrow_stream", (PyCFunction)TraceReader_iter_arrow_stream,
3621
     METH_VARARGS | METH_KEYWORDS,
3622
     "Return an _ArrowBatchStream that exposes Arrow record batches\n"
3623
     "via the Arrow C Data Interface stream protocol\n"
3624
     "(__arrow_c_stream__). PyArrow can drain the producer channel\n"
3625
     "with a single call, without per-batch Python iteration.\n"},
3626
    {"read_arrow", (PyCFunction)TraceReader_read_arrow,
3627
     METH_VARARGS | METH_KEYWORDS,
3628
     "Read all events as a materialized ArrowTable.\n"
3629
     "\n"
3630
     "Args:\n"
3631
     "    batch_size (int): Maximum rows per Arrow batch.\n"
3632
     "    start_line (int): First line (0 = beginning).\n"
3633
     "    end_line (int): Last line (0 = end of file).\n"
3634
     "    start_byte (int): First byte offset (0 = beginning).\n"
3635
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3636
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3637
#endif
3638
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
3639
    {"write_arrow", (PyCFunction)TraceReader_write_arrow,
3640
     METH_VARARGS | METH_KEYWORDS,
3641
     "Write trace data to partitioned Arrow IPC files.\n"
3642
     "\n"
3643
     "Args:\n"
3644
     "    path (str): Output directory path.\n"
3645
     "    partition_by (list[str] or None): Column names to partition by.\n"
3646
     "    num_buckets (int): Number of hash buckets (0 = no bucketing).\n"
3647
     "    chunk_size_mb (int): Max uncompressed MB per file (default 32).\n"
3648
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
3649
     "    batch_size (int): Rows per internal batch (default 10000).\n"
3650
     "    normalize (bool): Use normalized schema (default False).\n"
3651
     "\n"
3652
     "Returns:\n"
3653
     "    dict: Statistics including partitions, total_rows, total_bytes.\n"},
3654
    {"get_view_chunks", (PyCFunction)TraceReader_get_view_chunks,
3655
     METH_VARARGS | METH_KEYWORDS,
3656
     "Get candidate chunks for a view after bloom filter pruning.\n"
3657
     "\n"
3658
     "Args:\n"
3659
     "    view (str or dict): View name ('io', 'compute', 'dlio') or\n"
3660
     "                        dict with 'name' and optional 'query'.\n"
3661
     "\n"
3662
     "Returns:\n"
3663
     "    dict: chunks list, total_checkpoints, skipped_checkpoints.\n"},
3664
    {"write_view_chunk", (PyCFunction)TraceReader_write_view_chunk,
3665
     METH_VARARGS | METH_KEYWORDS,
3666
     "Write a single chunk to an Arrow IPC file.\n"
3667
     "\n"
3668
     "Args:\n"
3669
     "    output_file (str): Path to output Arrow IPC file.\n"
3670
     "    checkpoint_idx (int): Checkpoint index.\n"
3671
     "    start_byte (int): Start byte offset.\n"
3672
     "    end_byte (int): End byte offset.\n"
3673
     "    view (str or dict): View definition.\n"
3674
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
3675
     "    batch_size (int): Events per batch (default 10000).\n"
3676
     "\n"
3677
     "Returns:\n"
3678
     "    dict: output_file, events_matched, rows_written, bytes_written.\n"},
3679
    {"write_view_chunks", (PyCFunction)TraceReader_write_view_chunks,
3680
     METH_VARARGS | METH_KEYWORDS,
3681
     "Write multiple chunks to Arrow IPC files in parallel.\n"
3682
     "\n"
3683
     "All chunks are processed concurrently on the Runtime thread pool.\n"
3684
     "\n"
3685
     "Args:\n"
3686
     "    chunks (list): List of dicts with checkpoint_idx, start_byte, "
3687
     "end_byte.\n"
3688
     "    output_dir (str): Directory for output Arrow IPC files.\n"
3689
     "    view (str or dict): View definition.\n"
3690
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
3691
     "    batch_size (int): Events per batch (default 10000).\n"
3692
     "\n"
3693
     "Returns:\n"
3694
     "    dict: results list, total_rows, total_events_matched.\n"},
3695
#endif
3696
    {"get_max_bytes", (PyCFunction)TraceReader_get_max_bytes, METH_NOARGS,
3697
     "Get the maximum byte position (0 if unknown for compressed\n"
3698
     "files without index)."},
3699
    {"get_num_lines", (PyCFunction)TraceReader_get_num_lines, METH_NOARGS,
3700
     "Get the total number of lines (0 if unknown for files without\n"
3701
     "index)."},
3702
    {"__enter__", (PyCFunction)TraceReader_enter, METH_NOARGS,
3703
     "Enter the runtime context for the with statement."},
3704
    {"__exit__", (PyCFunction)TraceReader_exit, METH_VARARGS,
3705
     "Exit the runtime context for the with statement.\n"
3706
     "\n"
3707
     "TraceReader does not own the shared RocksDB instance for an index path;\n"
3708
     "any shared DB lifetime remains manager-owned on the native side."},
3709
    {NULL}};
3710

3711
static PyGetSetDef TraceReader_getsetters[] = {
3712
    {"path", (getter)TraceReader_get_file_path, NULL,
3713
     "Path to the trace file or directory", NULL},
3714
    {"index_dir", (getter)TraceReader_get_index_dir, NULL,
3715
     "Directory for index files", NULL},
3716
    {"has_index", (getter)TraceReader_get_has_index, NULL,
3717
     "True if a checkpoint index was found", NULL},
3718
    {"num_lines", (getter)TraceReader_get_num_lines_prop, NULL,
3719
     "Total line count (reads all lines if needed)", NULL},
3720
    {NULL}};
3721

3722
PyTypeObject TraceReaderType = {
3723
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.TraceReader",
3724
    sizeof(TraceReaderObject),                /* tp_basicsize */
3725
    0,                                        /* tp_itemsize */
3726
    (destructor)TraceReader_dealloc,          /* tp_dealloc */
3727
    0,                                        /* tp_vectorcall_offset */
3728
    0,                                        /* tp_getattr */
3729
    0,                                        /* tp_setattr */
3730
    0,                                        /* tp_as_async */
3731
    0,                                        /* tp_repr */
3732
    0,                                        /* tp_as_number */
3733
    0,                                        /* tp_as_sequence */
3734
    0,                                        /* tp_as_mapping */
3735
    0,                                        /* tp_hash */
3736
    0,                                        /* tp_call */
3737
    0,                                        /* tp_str */
3738
    0,                                        /* tp_getattro */
3739
    0,                                        /* tp_setattro */
3740
    0,                                        /* tp_as_buffer */
3741
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
3742
    "TraceReader(file_path: str, index_dir: str = '',\n"
3743
    "            checkpoint_size: int = 33554432,\n"
3744
    "            auto_build_index: bool = False,\n"
3745
    "            runtime: Runtime | None = None)\n"
3746
    "--\n"
3747
    "\n"
3748
    "Smart trace file reader that auto-selects sequential or indexed\n"
3749
    "reading based on whether a ``.dftindex`` store exists.\n"
3750
    "\n"
3751
    "Args:\n"
3752
    "    file_path (str): Path to the trace file (.pfw.gz or plain "
3753
    "text).\n"
3754
    "    index_dir (str): Directory to search for ``.dftindex`` "
3755
    "stores.\n"
3756
    "        Empty string (default) searches next to the trace file.\n"
3757
    "    checkpoint_size (int): Checkpoint interval in bytes for index\n"
3758
    "        building (default 32 MB).\n"
3759
    "    auto_build_index (bool): If True, automatically build an "
3760
    "index\n"
3761
    "        when none exists.\n"
3762
    "    runtime (Runtime or None): Runtime instance for thread pool "
3763
    "control.\n"
3764
    "        If None, uses the default global Runtime.\n"
3765
    "\n"
3766
    "Raises:\n"
3767
    "    RuntimeError: If *file_path* does not exist or cannot be "
3768
    "opened.\n",                /* tp_doc */
3769
    0,                          /* tp_traverse */
3770
    0,                          /* tp_clear */
3771
    0,                          /* tp_richcompare */
3772
    0,                          /* tp_weaklistoffset */
3773
    0,                          /* tp_iter */
3774
    0,                          /* tp_iternext */
3775
    TraceReader_methods,        /* tp_methods */
3776
    0,                          /* tp_members */
3777
    TraceReader_getsetters,     /* tp_getset */
3778
    0,                          /* tp_base */
3779
    0,                          /* tp_dict */
3780
    0,                          /* tp_descr_get */
3781
    0,                          /* tp_descr_set */
3782
    0,                          /* tp_dictoffset */
3783
    (initproc)TraceReader_init, /* tp_init */
3784
    0,                          /* tp_alloc */
3785
    TraceReader_new,            /* tp_new */
3786
};
3787

3788
int init_trace_reader(PyObject *m) {
2✔
3789
    if (PyType_Ready(&TraceReaderType) < 0) return -1;
2✔
3790

3791
    Py_INCREF(&TraceReaderType);
1✔
3792
    if (PyModule_AddObject(m, "TraceReader", (PyObject *)&TraceReaderType) <
2✔
3793
        0) {
3794
        Py_DECREF(&TraceReaderType);
3795
        Py_DECREF(m);
3796
        return -1;
×
3797
    }
3798

3799
    return 0;
2✔
3800
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc