• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 28496595030

01 Jul 2026 05:50AM UTC coverage: 50.727% (-1.6%) from 52.278%
28496595030

Pull #83

github

web-flow
Merge 8f1ff4df5 into 2efed6649
Pull Request #83: refactor and improve code QoL

31872 of 80367 branches covered (39.66%)

Branch coverage included in aggregate %.

770 of 1591 new or added lines in 85 files covered. (48.4%)

5070 existing lines in 182 files now uncovered.

32742 of 47009 relevant lines covered (69.65%)

9887.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.36
/src/dftracer/utils/python/trace_reader.cpp
1
#define PY_SSIZE_T_CLEAN
2
#include <Python.h>
3
#include <dftracer/utils/core/common/config.h>
4
#include <dftracer/utils/core/common/constants.h>
5
#include <dftracer/utils/core/common/filesystem.h>
6
#include <dftracer/utils/core/common/memory_budget.h>
7
#include <dftracer/utils/core/coro/channel.h>
8
#include <dftracer/utils/core/coro/task.h>
9
#include <dftracer/utils/core/coro/when_all.h>
10
#include <dftracer/utils/core/tasks/coro_scope.h>
11
#include <dftracer/utils/core/utils/string.h>
12
#include <dftracer/utils/python/arrow_helpers.h>
13
#include <dftracer/utils/python/batch_byte_size.h>
14
#include <dftracer/utils/python/json.h>
15
#include <dftracer/utils/python/py_dict_helpers.h>
16
#include <dftracer/utils/python/py_errors.h>
17
#include <dftracer/utils/python/py_type_helpers.h>
18
#include <dftracer/utils/python/runtime.h>
19
#include <dftracer/utils/python/trace_reader.h>
20
#include <dftracer/utils/python/trace_reader_iterator.h>
21
#include <dftracer/utils/utilities/common/query/query.h>
22
#include <dftracer/utils/utilities/composites/dft/indexing/chunk_pruner_utility.h>
23
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
24
#include <dftracer/utils/utilities/filesystem/pattern_directory_scanner_utility.h>
25
#include <dftracer/utils/utilities/indexer/index_database.h>
26
#include <dftracer/utils/utilities/indexer/internal/helpers.h>
27
#include <dftracer/utils/utilities/reader/trace_reader.h>
28

29
#include <algorithm>
30
#include <cctype>
31
#include <cstddef>
32
#include <cstdio>
33
#include <cstring>
34
#include <exception>
35
#include <memory>
36
#include <string>
37
#include <unordered_map>
38
#include <vector>
39
#ifdef DFTRACER_UTILS_ENABLE_ARROW
40
#include <dftracer/utils/python/arrow_stream_capsule.h>
41
#include <dftracer/utils/utilities/common/arrow/column_builder.h>
42
#include <dftracer/utils/utilities/common/json/parser.h>
43
#endif
44
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
45
#include <dftracer/utils/utilities/common/arrow/ipc_writer.h>
46
#include <dftracer/utils/utilities/common/arrow/partition_writer.h>
47
#include <dftracer/utils/utilities/composites/dft/internal/utils.h>
48
#include <dftracer/utils/utilities/composites/dft/metadata_collector_utility.h>
49
#include <dftracer/utils/utilities/composites/dft/views/view_builder_utility.h>
50
#include <dftracer/utils/utilities/composites/dft/views/view_definition.h>
51
#include <dftracer/utils/utilities/composites/dft/views/view_reader_utility.h>
52
#endif
53

54
namespace {
55

56
using dftracer::utils::CoroScope;
57
using dftracer::utils::Runtime;
58
using dftracer::utils::coro::CoroTask;
59
using dftracer::utils::coro::when_all;
60
using dftracer::utils::utilities::filesystem::PatternDirectoryScannerUtility;
61
using dftracer::utils::utilities::filesystem::
62
    PatternDirectoryScannerUtilityInput;
63
using dftracer::utils::utilities::reader::ReadConfig;
64
using dftracer::utils::utilities::reader::TraceReader;
65
using dftracer::utils::utilities::reader::TraceReaderConfig;
66
#ifdef DFTRACER_UTILS_ENABLE_ARROW
67
using dftracer::utils::utilities::common::arrow::ColumnType;
68
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
69
using dftracer::utils::utilities::common::json::JsonParser;
70
using dftracer::utils::utilities::common::json::JsonValueHelper;
71
#endif
72
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
73
using dftracer::utils::utilities::common::arrow::IpcCompression;
74
using dftracer::utils::utilities::common::arrow::PartitionWriter;
75
using dftracer::utils::utilities::common::arrow::PartitionWriteStats;
76
using dftracer::utils::utilities::composites::dft::MetadataCollectorUtility;
77
using dftracer::utils::utilities::composites::dft::
78
    MetadataCollectorUtilityInput;
79
using dftracer::utils::utilities::composites::dft::views::ViewBuilderInput;
80
using dftracer::utils::utilities::composites::dft::views::ViewBuilderUtility;
81
using dftracer::utils::utilities::composites::dft::views::ViewDefinition;
82
using dftracer::utils::utilities::composites::dft::views::ViewReaderInput;
83
using dftracer::utils::utilities::composites::dft::views::ViewReaderUtility;
84
#endif
85

86
using dftracer::utils::python::MemoryViewBatchData;
87
using dftracer::utils::python::MemoryViewBatchIteratorState;
88

89
CoroTask<void> produce_lines_batched(
4,480!
90
    std::shared_ptr<MemoryViewBatchIteratorState> state,
91
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer,
92
    TraceReaderConfig cfg, ReadConfig rc, std::size_t batch_size) {
56!
93
    auto guard = producer.guard();
56!
94
    try {
95
        TraceReader reader(std::move(cfg));
56!
96
        auto gen = reader.read_lines(rc);
56!
97
        MemoryViewBatchData batch;
56✔
98
        std::size_t count = 0;
56✔
99

100
        while (auto opt = co_await gen.next()) {
4,062!
101
            if (state->cancelled.load(std::memory_order_acquire)) break;
946!
102
            auto sv = opt->content;
946✔
103
            Py_ssize_t offset = static_cast<Py_ssize_t>(batch.buffer.size());
946✔
104
            batch.buffer.insert(batch.buffer.end(), sv.begin(), sv.end());
946!
105
            batch.offsets.push_back(offset);
946!
106
            batch.lengths.push_back(static_cast<Py_ssize_t>(sv.size()));
946!
107
            ++count;
946✔
108

109
            if (count >= batch_size) {
946!
UNCOV
110
                auto batch_bytes = dftracer::utils::python::byte_size(batch);
×
UNCOV
111
                state->bytes_in_queue.fetch_add(batch_bytes,
×
112
                                                std::memory_order_acq_rel);
UNCOV
113
                if (!co_await producer.send(std::move(batch))) break;
×
UNCOV
114
                batch = MemoryViewBatchData{};
×
UNCOV
115
                count = 0;
×
UNCOV
116
            }
×
117
        }
1,002!
118
        if (count > 0 && !state->cancelled.load(std::memory_order_acquire)) {
153✔
119
            auto batch_bytes = dftracer::utils::python::byte_size(batch);
156!
120
            state->bytes_in_queue.fetch_add(batch_bytes,
156✔
121
                                            std::memory_order_acq_rel);
122
            co_await producer.send(std::move(batch));
208!
123
        }
52!
124
    } catch (...) {
2,358✔
125
        state->set_error(std::current_exception());
1!
126
    }
1!
127
}
6,369✔
128

129
CoroTask<void> produce_raw_batched(
90!
130
    std::shared_ptr<MemoryViewBatchIteratorState> state,
131
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer,
132
    TraceReaderConfig cfg, ReadConfig rc) {
10!
133
    auto guard = producer.guard();
10!
134
    try {
135
        TraceReader reader(std::move(cfg));
10!
136
        auto gen = reader.read_raw(rc);
10!
137
        while (auto opt = co_await gen.next()) {
658!
138
            if (state->cancelled.load(std::memory_order_acquire)) break;
457!
139
            MemoryViewBatchData batch;
457✔
140
            batch.buffer.assign(opt->data(), opt->data() + opt->size());
457!
141
            batch.offsets.push_back(0);
457!
142
            batch.lengths.push_back(static_cast<Py_ssize_t>(opt->size()));
457!
143
            auto batch_bytes = dftracer::utils::python::byte_size(batch);
457✔
144
            state->bytes_in_queue.fetch_add(batch_bytes,
457✔
145
                                            std::memory_order_acq_rel);
146
            if (!co_await producer.send(std::move(batch))) break;
609!
147
        }
466✔
148
    } catch (...) {
30✔
UNCOV
149
        state->set_error(std::current_exception());
×
UNCOV
150
    }
×
151
}
1,286✔
152

153
using dftracer::utils::utilities::common::json::JsonParser;
154
using dftracer::utils::utilities::common::json::JsonValueHelper;
155

156
static constexpr std::size_t ESTIMATED_BYTES_PER_LINE = 256;
157
static constexpr std::size_t ESTIMATED_BYTES_PER_RAW_CHUNK = 4 * 1024 * 1024;
158
static constexpr std::size_t ESTIMATED_BYTES_PER_JSON_EVENT = 512;
159
static constexpr std::size_t ESTIMATED_BYTES_PER_ARROW_ROW = 1024;
160

161
static void insert_simdjson_value(ArgsMap &map, std::string_view key,
2,159✔
162
                                  simdjson::ondemand::value val) {
163
    auto type = val.type();
2,159✔
164
    if (type.error()) return;
2,159!
165
    switch (type.value_unsafe()) {
2,159!
166
        case simdjson::ondemand::json_type::string: {
167
            auto r = val.get_string();
960✔
168
            if (!r.error()) map.insert(key, std::string(r.value_unsafe()));
960!
169
            break;
960✔
170
        }
171
        case simdjson::ondemand::json_type::number: {
172
            auto ri = val.get_int64();
1,199✔
173
            if (!ri.error()) {
1,199!
174
                auto v = ri.value_unsafe();
1,199✔
175
                if (v >= 0)
1,199!
176
                    map.insert(key, static_cast<std::uint64_t>(v));
1,199!
177
                else
178
                    map.insert(key, v);
×
179
            } else {
1,199✔
180
                auto rd = val.get_double();
×
181
                if (!rd.error()) map.insert(key, rd.value_unsafe());
×
182
            }
183
            break;
1,199✔
184
        }
185
        case simdjson::ondemand::json_type::boolean: {
186
            auto r = val.get_bool();
×
187
            if (!r.error()) map.insert(key, r.value_unsafe());
×
188
            break;
×
189
        }
190
        default:
191
            break;
×
192
    }
193
}
2,159✔
194

195
static void parse_json_to_event(JsonParser &parser, JsonDictEvent &ev) {
239✔
196
    ev.top.set_valid(true);
239✔
197
    parser.for_each_field(
478✔
198
        [&](std::string_view key, simdjson::ondemand::value val) {
2,155✔
199
            if (key == "args") {
1,916✔
200
                auto obj = val.get_object();
240✔
201
                if (!obj.error()) {
240!
202
                    ev.args.set_valid(true);
240✔
203
                    for (auto field : obj.value_unsafe()) {
720✔
204
                        if (field.error()) continue;
480!
205
                        auto fkey = field.unescaped_key();
480✔
206
                        if (fkey.error()) continue;
480!
207
                        auto fval = field.value();
480✔
208
                        if (fval.error()) continue;
480!
209
                        insert_simdjson_value(ev.args, fkey.value_unsafe(),
960✔
210
                                              fval.value_unsafe());
480✔
211
                    }
212
                }
240✔
213
            } else {
240✔
214
                insert_simdjson_value(ev.top, key, val);
1,676✔
215
            }
216
        });
1,916✔
217
}
239✔
218

219
CoroTask<void> produce_json_dicts(
133!
220
    std::shared_ptr<JsonDictIteratorState> state,
221
    dftracer::utils::coro::ChannelProducer<JsonDictBatch> producer,
222
    TraceReaderConfig cfg, ReadConfig rc, std::size_t batch_size) {
1!
223
    auto guard = producer.guard();
1!
224
    try {
225
        TraceReader reader(std::move(cfg));
1!
226
        auto gen = reader.read_json(rc);
1!
227
        JsonDictBatch batch;
1✔
228
        batch.events.reserve(batch_size);
1!
229

230
        while (auto opt = co_await gen.next()) {
125!
231
            if (state->cancelled.load(std::memory_order_acquire)) break;
30!
232

233
            JsonDictEvent ev;
30!
234
            parse_json_to_event(*opt->parser, ev);
30!
235
            batch.events.push_back(std::move(ev));
30!
236

237
            if (batch.events.size() >= batch_size) {
30!
UNCOV
238
                auto batch_bytes = dftracer::utils::python::byte_size(batch);
×
UNCOV
239
                state->bytes_in_queue.fetch_add(batch_bytes,
×
240
                                                std::memory_order_acq_rel);
UNCOV
241
                if (!co_await producer.send(std::move(batch))) break;
×
UNCOV
242
                batch = JsonDictBatch{};
×
UNCOV
243
                batch.events.reserve(batch_size);
×
UNCOV
244
            }
×
245
        }
31!
246
        if (!batch.events.empty() &&
3✔
247
            !state->cancelled.load(std::memory_order_acquire)) {
1✔
248
            auto batch_bytes = dftracer::utils::python::byte_size(batch);
3!
249
            state->bytes_in_queue.fetch_add(batch_bytes,
3✔
250
                                            std::memory_order_acq_rel);
251
            co_await producer.send(std::move(batch));
4!
252
        }
1!
253
    } catch (...) {
69✔
UNCOV
254
        state->set_error(std::current_exception());
×
UNCOV
255
    }
×
256
}
193✔
257

258
static CoroTask<void> send_files_to_channel(
136!
259
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
260
    const std::vector<std::string> *files, std::atomic<bool> *cancelled) {
8!
261
    for (const auto &fp : *files) {
128✔
262
        if (cancelled->load(std::memory_order_acquire)) break;
72!
263
        if (!co_await file_chan->send(fp)) break;
104!
264
    }
24✔
265
    file_chan->close();
8!
266
    co_return;
8✔
267
}
96✔
268

269
static CoroTask<void> json_dict_file_worker(
953!
270
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
271
    dftracer::utils::coro::Channel<JsonDictBatch> *out_chan,
272
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
273
    ReadConfig rc, std::size_t batch_size, std::atomic<bool> *cancelled) {
9!
274
    dftracer::utils::coro::ChannelProducer<JsonDictBatch> producer(out_chan);
9!
275
    auto guard = producer.guard();
9!
276

277
    while (auto file_path = co_await file_chan->receive()) {
59!
278
        if (cancelled->load(std::memory_order_acquire)) co_return;
14!
279
        TraceReaderConfig cfg;
14✔
280
        cfg.file_path = std::move(*file_path);
14✔
281
        cfg.index_dir = index_dir;
14!
282
        cfg.checkpoint_size = checkpoint_size;
14✔
283
        cfg.auto_build_index = auto_build_index;
14✔
284

285
        TraceReader reader(std::move(cfg));
14!
286
        auto gen = reader.read_json(rc);
14!
287
        JsonDictBatch batch;
14✔
288
        batch.events.reserve(batch_size);
14!
289

290
        while (auto opt = co_await gen.next()) {
892!
291
            if (cancelled->load(std::memory_order_acquire)) co_return;
211!
292
            JsonDictEvent ev;
211!
293
            parse_json_to_event(*opt->parser, ev);
211✔
294
            batch.events.push_back(std::move(ev));
210✔
295
            if (batch.events.size() >= batch_size) {
209!
UNCOV
296
                if (!co_await producer.send(std::move(batch))) co_return;
×
UNCOV
297
                batch = JsonDictBatch{};
×
UNCOV
298
                batch.events.reserve(batch_size);
×
UNCOV
299
            }
×
300
        }
225✔
301
        if (!batch.events.empty()) {
42!
302
            if (!co_await producer.send(std::move(batch))) co_return;
56!
303
        }
14✔
304
    }
495✔
305
    co_return;
9✔
306
}
1,449✔
307

308
static CoroTask<void> spawn_json_dict_producers(
25!
309
    CoroScope &child, dftracer::utils::coro::Channel<JsonDictBatch> *out_chan,
310
    const std::vector<std::string> *files, const std::string *index_dir,
311
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
312
    std::size_t batch_size, std::atomic<bool> *cancelled_ptr,
313
    std::size_t max_workers) {
5!
314
    std::size_t num_workers = std::min(files->size(), max_workers);
5!
315
    auto file_chan =
5✔
316
        dftracer::utils::coro::make_channel<std::string>(num_workers);
5!
317

318
    for (std::size_t i = 0; i < num_workers; ++i) {
14✔
319
        child.spawn([out_chan, fc = file_chan, idx = *index_dir,
18!
320
                     checkpoint_size, auto_build_index, r = *rc, batch_size,
27!
321
                     cancelled_ptr](CoroScope &) {
9✔
322
            return json_dict_file_worker(fc, out_chan, idx, checkpoint_size,
9!
323
                                         auto_build_index, r, batch_size,
9!
324
                                         cancelled_ptr);
9✔
UNCOV
325
        });
×
326
    }
9✔
327

328
    child.spawn([fc = file_chan, files, cancelled_ptr](CoroScope &) {
10!
329
        return send_files_to_channel(fc, files, cancelled_ptr);
5!
UNCOV
330
    });
×
331
    co_return;
10✔
332
}
5✔
333

334
static CoroTask<void> produce_json_dicts_parallel(
62!
335
    CoroScope &scope, JsonDictIteratorState *sp, std::string dir_path,
336
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
337
    ReadConfig rc, std::size_t batch_size, std::size_t max_workers) {
6!
338
    try {
339
        PatternDirectoryScannerUtility scanner;
18!
340
        auto scan_input = PatternDirectoryScannerUtilityInput(
36!
341
            dir_path, {".pfw", ".pfw.gz"}, true, false);
18!
342
        auto entries = co_await scope.spawn(scanner, scan_input);
30!
343

344
        std::vector<std::string> files;
16✔
345
        files.reserve(entries.size());
16✔
346
        for (auto &e : entries) files.push_back(e.path.string());
20!
347
        std::sort(files.begin(), files.end());
6✔
348

349
        if (files.empty()) {
16✔
350
            sp->channel->close();
1!
351
            co_return;
1✔
352
        }
353

354
        auto *chan_ptr = sp->channel.get();
15✔
355
        auto *cancelled_ptr = &sp->cancelled;
15✔
356

357
        co_await scope.scope([chan_ptr, &files, &index_dir, checkpoint_size,
115!
358
                              auto_build_index, &rc, batch_size, cancelled_ptr,
45✔
359
                              max_workers](CoroScope &child) -> CoroTask<void> {
20!
360
            co_await spawn_json_dict_producers(
40!
361
                child, chan_ptr, &files, &index_dir, checkpoint_size,
15✔
362
                auto_build_index, &rc, batch_size, cancelled_ptr, max_workers);
15✔
363
        });
10✔
364
    } catch (...) {
16✔
UNCOV
365
        sp->set_error(std::current_exception());
×
UNCOV
366
    }
×
367
}
66✔
368

369
static CoroTask<void> lines_file_worker(
544!
370
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
371
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
372
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
373
    ReadConfig rc, std::size_t batch_size, std::atomic<bool> *cancelled) {
4!
374
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer(
8!
375
        out_chan);
4✔
376
    auto guard = producer.guard();
4!
377

378
    while (auto file_path = co_await file_chan->receive()) {
27!
379
        if (cancelled->load(std::memory_order_acquire)) co_return;
5!
380
        TraceReaderConfig cfg;
5✔
381
        cfg.file_path = std::move(*file_path);
5✔
382
        cfg.index_dir = index_dir;
5!
383
        cfg.checkpoint_size = checkpoint_size;
5✔
384
        cfg.auto_build_index = auto_build_index;
5✔
385

386
        TraceReader reader(std::move(cfg));
5!
387
        auto gen = reader.read_lines(rc);
5!
388
        MemoryViewBatchData batch;
5✔
389
        std::size_t count = 0;
5✔
390

391
        while (auto opt = co_await gen.next()) {
505!
392
            if (cancelled->load(std::memory_order_acquire)) co_return;
120!
393
            auto sv = opt->content;
120✔
394
            Py_ssize_t offset = static_cast<Py_ssize_t>(batch.buffer.size());
120✔
395
            batch.buffer.insert(batch.buffer.end(), sv.begin(), sv.end());
120!
396
            batch.offsets.push_back(offset);
120!
397
            batch.lengths.push_back(static_cast<Py_ssize_t>(sv.size()));
120!
398
            ++count;
120✔
399
            if (count >= batch_size) {
120!
UNCOV
400
                if (!co_await producer.send(std::move(batch))) co_return;
×
UNCOV
401
                batch = MemoryViewBatchData{};
×
UNCOV
402
                count = 0;
×
UNCOV
403
            }
×
404
        }
127!
405
        if (count > 0) {
21!
406
            if (!co_await producer.send(std::move(batch))) co_return;
28!
407
        }
7✔
408
    }
281✔
409
    co_return;
4✔
410
}
814✔
411

412
static CoroTask<void> spawn_lines_producers(
10!
413
    CoroScope &child,
414
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
415
    const std::vector<std::string> *files, const std::string *index_dir,
416
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
417
    std::size_t batch_size, std::atomic<bool> *cancelled_ptr,
418
    std::size_t max_workers) {
2!
419
    std::size_t num_workers = std::min(files->size(), max_workers);
2!
420
    auto file_chan =
2✔
421
        dftracer::utils::coro::make_channel<std::string>(num_workers);
2!
422

423
    for (std::size_t i = 0; i < num_workers; ++i) {
6✔
424
        child.spawn([out_chan, fc = file_chan, idx = *index_dir,
8!
425
                     checkpoint_size, auto_build_index, r = *rc, batch_size,
12!
426
                     cancelled_ptr](CoroScope &) {
4✔
427
            return lines_file_worker(fc, out_chan, idx, checkpoint_size,
4!
428
                                     auto_build_index, r, batch_size,
4!
429
                                     cancelled_ptr);
4✔
UNCOV
430
        });
×
431
    }
4✔
432

433
    child.spawn([fc = file_chan, files, cancelled_ptr](CoroScope &) {
4!
434
        return send_files_to_channel(fc, files, cancelled_ptr);
2!
UNCOV
435
    });
×
436
    co_return;
4✔
437
}
2✔
438

439
static CoroTask<void> produce_lines_parallel(
29!
440
    CoroScope &scope, MemoryViewBatchIteratorState *sp, std::string dir_path,
441
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
442
    ReadConfig rc, std::size_t batch_size, std::size_t max_workers) {
3!
443
    try {
444
        PatternDirectoryScannerUtility scanner;
9!
445
        auto scan_input = PatternDirectoryScannerUtilityInput(
18!
446
            dir_path, {".pfw", ".pfw.gz"}, true, false);
9!
447
        auto entries = co_await scope.spawn(scanner, scan_input);
15!
448

449
        std::vector<std::string> files;
7✔
450
        files.reserve(entries.size());
7✔
451
        for (auto &e : entries) files.push_back(e.path.string());
10!
452
        std::sort(files.begin(), files.end());
3✔
453

454
        if (files.empty()) {
7✔
455
            sp->channel->close();
1!
456
            co_return;
1✔
457
        }
458

459
        auto *chan_ptr = sp->channel.get();
6✔
460
        auto *cancelled_ptr = &sp->cancelled;
6✔
461

462
        co_await scope.scope([chan_ptr, &files, &index_dir, checkpoint_size,
46!
463
                              auto_build_index, &rc, batch_size, cancelled_ptr,
18✔
464
                              max_workers](CoroScope &child) -> CoroTask<void> {
8!
465
            co_await spawn_lines_producers(
16!
466
                child, chan_ptr, &files, &index_dir, checkpoint_size,
6✔
467
                auto_build_index, &rc, batch_size, cancelled_ptr, max_workers);
6✔
468
        });
4✔
469
    } catch (...) {
7✔
UNCOV
470
        sp->set_error(std::current_exception());
×
UNCOV
471
    }
×
472
}
30✔
473

474
static CoroTask<void> raw_file_worker(
14!
475
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
476
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
477
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
478
    ReadConfig rc, std::atomic<bool> *cancelled) {
2!
479
    dftracer::utils::coro::ChannelProducer<MemoryViewBatchData> producer(
4!
480
        out_chan);
2✔
481
    auto guard = producer.guard();
2!
482

483
    while (auto file_path = co_await file_chan->receive()) {
13!
484
        if (cancelled->load(std::memory_order_acquire)) co_return;
3!
485
        TraceReaderConfig cfg;
3✔
486
        cfg.file_path = std::move(*file_path);
3✔
487
        cfg.index_dir = index_dir;
3!
488
        cfg.checkpoint_size = checkpoint_size;
3✔
489
        cfg.auto_build_index = auto_build_index;
3✔
490

491
        TraceReader reader(std::move(cfg));
3!
492
        auto gen = reader.read_raw(rc);
3!
493
        while (auto opt = co_await gen.next()) {
252!
494
            if (cancelled->load(std::memory_order_acquire)) co_return;
180!
495
            MemoryViewBatchData batch;
180✔
496
            batch.buffer.assign(opt->data(), opt->data() + opt->size());
180!
497
            batch.offsets.push_back(0);
180!
498
            batch.lengths.push_back(static_cast<Py_ssize_t>(opt->size()));
180!
499
            if (!co_await producer.send(std::move(batch))) co_return;
240!
500
        }
183!
501
    }
11✔
502
    co_return;
2✔
503
}
504✔
504

505
static CoroTask<void> spawn_raw_producers(
5!
506
    CoroScope &child,
507
    dftracer::utils::coro::Channel<MemoryViewBatchData> *out_chan,
508
    const std::vector<std::string> *files, const std::string *index_dir,
509
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
510
    std::atomic<bool> *cancelled_ptr, std::size_t max_workers) {
1!
511
    std::size_t num_workers = std::min(files->size(), max_workers);
1!
512
    auto file_chan =
1✔
513
        dftracer::utils::coro::make_channel<std::string>(num_workers);
1!
514

515
    for (std::size_t i = 0; i < num_workers; ++i) {
3✔
516
        child.spawn([out_chan, fc = file_chan, idx = *index_dir,
4!
517
                     checkpoint_size, auto_build_index, r = *rc,
6!
518
                     cancelled_ptr](CoroScope &) {
2✔
519
            return raw_file_worker(fc, out_chan, idx, checkpoint_size,
2!
520
                                   auto_build_index, r, cancelled_ptr);
2!
UNCOV
521
        });
×
522
    }
2✔
523

524
    child.spawn([fc = file_chan, files, cancelled_ptr](CoroScope &) {
2!
525
        return send_files_to_channel(fc, files, cancelled_ptr);
1!
UNCOV
526
    });
×
527
    co_return;
2✔
528
}
1✔
529

530
static CoroTask<void> produce_raw_parallel(
18!
531
    CoroScope &scope, MemoryViewBatchIteratorState *sp, std::string dir_path,
532
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
533
    ReadConfig rc, std::size_t max_workers) {
2!
534
    try {
535
        PatternDirectoryScannerUtility scanner;
6!
536
        auto scan_input = PatternDirectoryScannerUtilityInput(
12!
537
            dir_path, {".pfw", ".pfw.gz"}, true, false);
6!
538
        auto entries = co_await scope.spawn(scanner, scan_input);
10!
539

540
        std::vector<std::string> files;
4✔
541
        files.reserve(entries.size());
4✔
542
        for (auto &e : entries) files.push_back(e.path.string());
5!
543
        std::sort(files.begin(), files.end());
2✔
544

545
        if (files.empty()) {
4✔
546
            sp->channel->close();
1!
547
            co_return;
1✔
548
        }
549

550
        auto *chan_ptr = sp->channel.get();
3✔
551
        auto *cancelled_ptr = &sp->cancelled;
3✔
552

553
        co_await scope.scope([chan_ptr, &files, &index_dir, checkpoint_size,
20!
554
                              auto_build_index, &rc, cancelled_ptr,
6✔
555
                              max_workers](CoroScope &child) -> CoroTask<void> {
4!
556
            co_await spawn_raw_producers(child, chan_ptr, &files, &index_dir,
8!
557
                                         checkpoint_size, auto_build_index, &rc,
3✔
558
                                         cancelled_ptr, max_workers);
3✔
559
        });
2✔
560
    } catch (...) {
4✔
UNCOV
561
        sp->set_error(std::current_exception());
×
UNCOV
562
    }
×
563
}
18✔
564

565
#ifdef DFTRACER_UTILS_ENABLE_ARROW
566

567
using dftracer::utils::utilities::common::arrow::ArrowExportResult;
568
using dftracer::utils::utilities::common::arrow::ColumnType;
569
using dftracer::utils::utilities::common::arrow::RecordBatchBuilder;
570

571
// Bump arena for string_views that must survive until builder.finish().
572
struct StringArena {
573
    static constexpr std::size_t BLOCK_SIZE = 64 * 1024;
574
    std::vector<std::vector<char>> blocks;
UNCOV
575
    std::size_t pos = 0;
×
576

577
    StringArena() { blocks.emplace_back(BLOCK_SIZE); }
×
578

579
    std::string_view push(const char *data, std::size_t len) {
×
580
        if (pos + len > blocks.back().size()) {
×
581
            blocks.emplace_back(std::max(BLOCK_SIZE, len));
×
582
            pos = 0;
×
UNCOV
583
        }
×
584
        char *dst = blocks.back().data() + pos;
×
585
        std::memcpy(dst, data, len);
×
586
        pos += len;
×
587
        return {dst, len};
×
588
    }
589

590
    void clear() {
×
591
        if (blocks.size() > 1) blocks.resize(1);
×
592
        pos = 0;
×
593
    }
×
594
};
595

596
// --- Row type constants (must match Python TYPE_* constants) ---
597
enum RowType : int8_t {
598
    ROW_EVENT = 0,
599
    ROW_FILE_HASH = 1,
600
    ROW_HOST_HASH = 2,
601
    ROW_STRING_HASH = 3,
602
    ROW_METADATA = 4,
603
    ROW_PROC_METADATA = 5,
604
    ROW_PROFILE = 6,
605
    ROW_SYSTEM = 7,
606
};
607

608
// --- IO category constants (must match Python IOCategory values) ---
609
enum IOCat : int8_t {
610
    IO_READ = 1,
611
    IO_WRITE = 2,
612
    IO_METADATA = 3,
613
    IO_PCTL = 4,
614
    IO_IPC = 5,
615
    IO_OTHER = 6,
616
    IO_SYNC = 7,
617
};
618

619
static int8_t get_io_cat(std::string_view func) {
×
620
    using namespace dftracer::utils::utilities::composites::dft::internal;
621
    for (auto op : posix_ops::READ)
×
622
        if (op == func) return IO_READ;
×
623
    for (auto op : posix_ops::WRITE)
×
624
        if (op == func) return IO_WRITE;
×
625
    for (auto op : posix_ops::SYNC)
×
626
        if (op == func) return IO_SYNC;
×
627
    for (auto op : posix_ops::PCTL)
×
628
        if (op == func) return IO_PCTL;
×
629
    for (auto op : posix_ops::IPC)
×
630
        if (op == func) return IO_IPC;
×
631
    for (auto op : posix_ops::METADATA)
×
632
        if (op == func) return IO_METADATA;
×
633
    return IO_OTHER;
×
UNCOV
634
}
×
635

636
static bool str_iequal(std::string_view a, const char *b) {
×
637
    std::size_t len = std::strlen(b);
×
638
    if (a.size() != len) return false;
×
639
    for (std::size_t i = 0; i < len; ++i) {
×
640
        if (std::tolower(static_cast<unsigned char>(a[i])) !=
×
641
            static_cast<unsigned char>(b[i]))
×
642
            return false;
×
UNCOV
643
    }
×
644
    return true;
×
UNCOV
645
}
×
646

647
static bool str_contains_lower(std::string_view s, const char *needle) {
×
648
    std::size_t nlen = std::strlen(needle);
×
649
    if (s.size() < nlen) return false;
×
650
    for (std::size_t i = 0; i <= s.size() - nlen; ++i) {
×
651
        bool match = true;
×
652
        for (std::size_t j = 0; j < nlen; ++j) {
×
653
            if (std::tolower(static_cast<unsigned char>(s[i + j])) !=
×
654
                static_cast<unsigned char>(needle[j])) {
×
655
                match = false;
×
656
                break;
×
657
            }
UNCOV
658
        }
×
659
        if (match) return true;
×
UNCOV
660
    }
×
661
    return false;
×
UNCOV
662
}
×
663

664
// Fields extracted from a row's "args" object in a single pass.
665
struct ParsedArgs {
666
    std::optional<std::string_view> name, value, hhash, fhash;
667
    std::optional<std::int64_t> epoch, step, size_sum, ret;
668
    std::optional<std::int64_t> offset, image_idx, image_size;
669
    // Other int/float args, kept for profile/sys columns.
670
    std::unordered_map<std::string, std::int64_t> int_map;
671
    std::unordered_map<std::string, double> float_map;
672
};
673

NEW
674
static ParsedArgs parse_row_args(JsonParser &parser) {
×
675
    using SVH = JsonValueHelper;
NEW
676
    ParsedArgs a;
×
677
    parser.rewind();
×
678
    parser.for_each_field(
×
679
        "args", [&](std::string_view key, simdjson::ondemand::value val) {
×
680
            if (key == "name") {
×
NEW
681
                if (auto s = SVH::get_string(val)) a.name = s;
×
682
            } else if (key == "value") {
×
NEW
683
                if (auto s = SVH::get_string(val)) a.value = s;
×
684
            } else if (key == "hhash") {
×
NEW
685
                if (auto s = SVH::get_string(val)) a.hhash = s;
×
686
            } else if (key == "fhash") {
×
NEW
687
                if (auto s = SVH::get_string(val)) a.fhash = s;
×
688
            } else if (key == "epoch") {
×
NEW
689
                if (auto i = SVH::get_int64(val)) a.epoch = i;
×
690
            } else if (key == "step") {
×
NEW
691
                if (auto i = SVH::get_int64(val)) a.step = i;
×
692
            } else if (key == "size_sum") {
×
NEW
693
                if (auto i = SVH::get_int64(val)) a.size_sum = i;
×
694
            } else if (key == "ret") {
×
NEW
695
                if (auto i = SVH::get_int64(val)) a.ret = i;
×
696
            } else if (key == "offset") {
×
NEW
697
                if (auto i = SVH::get_int64(val)) a.offset = i;
×
698
            } else if (key == "image_idx") {
×
NEW
699
                if (auto i = SVH::get_int64(val)) a.image_idx = i;
×
700
            } else if (key == "image_size") {
×
NEW
701
                if (auto i = SVH::get_int64(val)) a.image_size = i;
×
UNCOV
702
            } else {
×
UNCOV
703
                if (auto i = SVH::get_int64(val)) {
×
NEW
704
                    a.int_map[std::string(key)] = *i;
×
705
                } else if (auto d = SVH::get_double(val)) {
×
NEW
706
                    a.float_map[std::string(key)] = *d;
×
UNCOV
707
                }
×
708
            }
709
        });
×
NEW
710
    return a;
×
NEW
711
}
×
712

NEW
713
static void append_profile_columns(
×
714
    RecordBatchBuilder &builder,
715
    const std::unordered_map<std::string, std::int64_t> &int_map) {
716
    static const char *profile_keys[] = {
717
        "count",      "count_max",  "count_min",  "count_sum",  "dft_cnt",
718
        "dur",        "dur_max",    "dur_min",    "dur_sum",    "epoch",
719
        "flags",      "offset",     "offset_max", "offset_min", "offset_sum",
720
        "ret",        "ret_max",    "ret_min",    "ret_sum",    "whence",
721
        "whence_max", "whence_min", "whence_sum", nullptr};
NEW
722
    for (const char **pk = profile_keys; *pk; ++pk) {
×
NEW
723
        auto it = int_map.find(*pk);
×
NEW
724
        if (it != int_map.end()) {
×
NEW
725
            auto idx = builder.add_or_get_column(*pk, ColumnType::INT64);
×
NEW
726
            builder.append_int64(idx, it->second);
×
NEW
727
        }
×
NEW
728
    }
×
NEW
729
}
×
730

NEW
731
static void append_system_columns(
×
732
    RecordBatchBuilder &builder,
733
    const std::unordered_map<std::string, double> &float_map) {
734
    static const char *sys_keys[] = {
735
        "user_pct", "system_pct",  "iowait_pct",   "idle_pct",
736
        "irq_pct",  "softirq_pct", "MemAvailable", "MemFree",
737
        "Cached",   "Dirty",       "Active",       nullptr};
NEW
738
    for (const char **sk = sys_keys; *sk; ++sk) {
×
NEW
739
        auto it = float_map.find(*sk);
×
NEW
740
        if (it != float_map.end()) {
×
NEW
741
            auto idx = builder.add_or_get_column(*sk, ColumnType::DOUBLE);
×
NEW
742
            builder.append_double(idx, it->second);
×
NEW
743
        }
×
NEW
744
    }
×
NEW
745
}
×
746

747
// Normalize a raw JSON row (parsed with simdjson) into the semantic
748
// output schema.  Appends one row to `builder` with the full set of output
749
// columns.  Returns false if the row should be skipped (no valid name).
NEW
750
static bool normalize_row(RecordBatchBuilder &builder, StringArena &arena,
×
751
                          JsonParser &parser) {
752
    // --- Extract top-level fields ---
NEW
753
    auto ph = parser.get_string("ph").value_or(std::string_view{});
×
NEW
754
    auto name_sv = parser.get_string("name").value_or(std::string_view{});
×
NEW
755
    auto cat_sv = parser.get_string("cat").value_or(std::string_view{});
×
NEW
756
    auto pid_opt = parser.get_int64("pid");
×
NEW
757
    auto tid_opt = parser.get_int64("tid");
×
NEW
758
    auto ts_opt = parser.get_int64("ts");
×
NEW
759
    auto dur_opt = parser.get_int64("dur");
×
760

NEW
761
    ParsedArgs args = parse_row_args(parser);
×
762

763
    // --- Type classification ---
764
    bool is_M = (ph == "M");
×
765
    bool is_C = (ph == "C");
×
766
    bool is_event = !is_M && !is_C;
×
767

768
    int8_t row_type = ROW_EVENT;
×
769
    if (is_M) {
×
770
        if (name_sv == "FH")
×
771
            row_type = ROW_FILE_HASH;
×
772
        else if (name_sv == "HH")
×
773
            row_type = ROW_HOST_HASH;
×
774
        else if (name_sv == "SH")
×
775
            row_type = ROW_STRING_HASH;
×
776
        else if (name_sv == "PR")
×
777
            row_type = ROW_PROC_METADATA;
×
778
        else
779
            row_type = ROW_METADATA;
×
780
    } else if (is_C) {
×
781
        row_type = str_iequal(cat_sv, "sys") ? ROW_SYSTEM : ROW_PROFILE;
×
UNCOV
782
    }
×
783
    bool is_hash = (row_type >= ROW_FILE_HASH && row_type <= ROW_STRING_HASH) ||
×
UNCOV
784
                   row_type == ROW_PROC_METADATA;
×
785
    bool is_profile = (row_type == ROW_PROFILE);
×
786
    bool is_sys = (row_type == ROW_SYSTEM);
×
787

788
    // Name: metadata rows use args.name if available
789
    std::string_view out_name = name_sv;
×
NEW
790
    if (is_M && args.name && !args.name->empty()) {
×
NEW
791
        out_name = *args.name;
×
792
    }
×
793
    if (out_name.empty()) return false;  // skip rows without name
×
794

795
    // --- Declare all output columns ---
796
    auto ci_type = builder.add_or_get_column("type", ColumnType::INT64);
×
797
    auto ci_cat = builder.add_or_get_column("cat", ColumnType::STRING);
×
798
    auto ci_name = builder.add_or_get_column("name", ColumnType::STRING);
×
799
    auto ci_pid = builder.add_or_get_column("pid", ColumnType::INT64);
×
800
    auto ci_tid = builder.add_or_get_column("tid", ColumnType::INT64);
×
801
    auto ci_hash = builder.add_or_get_column("hash", ColumnType::STRING);
×
802
    auto ci_value = builder.add_or_get_column("value", ColumnType::STRING);
×
UNCOV
803
    auto ci_host_hash =
×
804
        builder.add_or_get_column("host_hash", ColumnType::STRING);
×
UNCOV
805
    auto ci_file_hash =
×
806
        builder.add_or_get_column("file_hash", ColumnType::STRING);
×
807
    auto ci_epoch = builder.add_or_get_column("epoch", ColumnType::INT64);
×
808
    auto ci_step = builder.add_or_get_column("step", ColumnType::INT64);
×
809
    auto ci_ts = builder.add_or_get_column("ts", ColumnType::INT64);
×
810
    auto ci_dur = builder.add_or_get_column("dur", ColumnType::INT64);
×
811
    auto ci_te = builder.add_or_get_column("te", ColumnType::INT64);
×
UNCOV
812
    [[maybe_unused]] auto ci_trange =
×
813
        builder.add_or_get_column("trange", ColumnType::INT64);
×
814
    auto ci_io_cat = builder.add_or_get_column("io_cat", ColumnType::INT64);
×
815
    auto ci_size = builder.add_or_get_column("size", ColumnType::INT64);
×
816
    auto ci_offset = builder.add_or_get_column("offset", ColumnType::INT64);
×
817
    auto ci_image_id = builder.add_or_get_column("image_id", ColumnType::INT64);
×
818

819
    // --- Populate core columns ---
820
    builder.append_int64(ci_type, row_type);
×
821

822
    // cat (lowercased) - write into arena
823
    if (!cat_sv.empty()) {
×
824
        char lbuf[256];
825
        std::size_t clen = std::min(cat_sv.size(), sizeof(lbuf));
×
826
        for (std::size_t i = 0; i < clen; ++i)
×
827
            lbuf[i] = static_cast<char>(
×
828
                std::tolower(static_cast<unsigned char>(cat_sv[i])));
×
829
        builder.append_string(ci_cat, arena.push(lbuf, clen));
×
UNCOV
830
    } else {
×
831
        builder.append_null(ci_cat);
×
832
    }
833

834
    builder.append_string(ci_name, out_name);
×
835

836
    if (pid_opt) builder.append_int64(ci_pid, *pid_opt);
×
837
    if (tid_opt) builder.append_int64(ci_tid, *tid_opt);
×
838

839
    // hash / value
NEW
840
    if (is_hash && args.value && !args.value->empty())
×
NEW
841
        builder.append_string(ci_hash, *args.value);
×
NEW
842
    if (row_type == ROW_METADATA && args.value && !args.value->empty())
×
NEW
843
        builder.append_string(ci_value, *args.value);
×
844

845
    // host_hash / file_hash
NEW
846
    if (args.hhash && !args.hhash->empty())
×
NEW
847
        builder.append_string(ci_host_hash, *args.hhash);
×
NEW
848
    if (args.fhash && !args.fhash->empty())
×
NEW
849
        builder.append_string(ci_file_hash, *args.fhash);
×
850

851
    // epoch / step
NEW
852
    if (args.epoch && *args.epoch >= 0)
×
NEW
853
        builder.append_int64(ci_epoch, *args.epoch);
×
NEW
854
    if (args.step && *args.step >= 0) builder.append_int64(ci_step, *args.step);
×
855

856
    // --- Temporal ---
857
    bool has_ts = (is_event || is_C) && ts_opt.has_value();
×
858
    bool has_dur = dur_opt.has_value();
×
859
    int64_t ts_val = 0, dur_val = 0;
×
860
    if (has_ts) {
×
861
        ts_val = *ts_opt;
×
862
        builder.append_int64(ci_ts, ts_val);
×
UNCOV
863
    }
×
864
    if (is_event && has_ts && has_dur) {
×
865
        dur_val = *dur_opt;
×
866
        builder.append_int64(ci_dur, dur_val);
×
867
        builder.append_int64(ci_te, ts_val + dur_val);
×
UNCOV
868
    }
×
869

870
    // --- IO columns (events only) ---
871
    if (is_event) {
×
UNCOV
872
        bool is_posix_stdio =
×
873
            str_iequal(cat_sv, "posix") || str_iequal(cat_sv, "stdio");
×
874
        int8_t io_cat = IO_OTHER;
×
875

876
        // size priority: size_sum > POSIX ret > image_size
NEW
877
        if (args.size_sum) {
×
NEW
878
            builder.append_int64(ci_size, *args.size_sum);
×
879
            if (is_posix_stdio) io_cat = get_io_cat(out_name);
×
880
        } else if (is_posix_stdio) {
×
881
            io_cat = get_io_cat(out_name);
×
NEW
882
            if (args.ret && *args.ret > 0 &&
×
883
                (io_cat == IO_READ || io_cat == IO_WRITE))
×
NEW
884
                builder.append_int64(ci_size, *args.ret);
×
NEW
885
            if (args.offset && *args.offset >= 0)
×
NEW
886
                builder.append_int64(ci_offset, *args.offset);
×
887
        } else {
×
NEW
888
            if (args.image_idx && *args.image_idx > 0)
×
NEW
889
                builder.append_int64(ci_image_id, *args.image_idx);
×
NEW
890
            if (args.image_size && *args.image_size > 0 &&
×
891
                !str_contains_lower(out_name, "open"))
×
NEW
892
                builder.append_int64(ci_size, *args.image_size);
×
893
        }
894
        builder.append_int64(ci_io_cat, io_cat);
×
UNCOV
895
    }
×
896

897
    // --- Profile columns ---
898
    if (is_profile) {
×
UNCOV
899
        bool is_posix_stdio =
×
900
            str_iequal(cat_sv, "posix") || str_iequal(cat_sv, "stdio");
×
901
        int8_t io_cat = is_posix_stdio ? get_io_cat(out_name) : IO_OTHER;
×
902
        builder.append_int64(ci_io_cat, io_cat);
×
NEW
903
        append_profile_columns(builder, args.int_map);
×
UNCOV
904
    }
×
905

906
    // --- System columns ---
907
    if (is_sys) {
×
NEW
908
        append_system_columns(builder, args.float_map);
×
UNCOV
909
    }
×
910

911
    builder.end_row();
×
912
    return true;
×
913
}
×
914

915
// Flatten a simdjson object into "prefix.key" columns using native types.
916
// On type mismatch (same key, different type across rows), appends null.
917
static void flatten_object_into(RecordBatchBuilder &builder, StringArena &arena,
918
                                std::string_view prefix,
919
                                simdjson::ondemand::object obj) {
920
    using SVH = JsonValueHelper;
921
    char key_buf[512];
922

923
    for (auto field : obj) {
924
        if (field.error()) continue;
925

926
        auto key_result = field.unescaped_key();
927
        if (key_result.error()) continue;
928
        std::string_view sk = key_result.value_unsafe();
929

930
        auto val_result = field.value();
931
        if (val_result.error()) continue;
932
        auto sub_val = val_result.value_unsafe();
933

934
        std::size_t needed = prefix.size() + 1 + sk.size();
935
        if (needed >= sizeof(key_buf)) continue;
936
        std::memcpy(key_buf, prefix.data(), prefix.size());
937
        key_buf[prefix.size()] = '.';
938
        std::memcpy(key_buf + prefix.size() + 1, sk.data(), sk.size());
939
        std::string_view full_key(key_buf, needed);
940

941
        auto type_result = sub_val.type();
942
        if (type_result.error()) continue;
943
        auto json_type = type_result.value_unsafe();
944

945
        switch (json_type) {
946
            case simdjson::ondemand::json_type::number: {
947
                auto num_result = sub_val.get_number();
948
                if (num_result.error()) break;
949
                auto num = num_result.value_unsafe();
950
                if (num.is_int64()) {
951
                    auto idx =
952
                        builder.add_or_get_column(full_key, ColumnType::INT64);
953
                    if (builder.column_type(idx) == ColumnType::INT64)
954
                        builder.append_int64(idx, num.get_int64());
955
                    else
956
                        builder.append_null(idx);
957
                } else if (num.is_uint64()) {
958
                    auto idx =
959
                        builder.add_or_get_column(full_key, ColumnType::UINT64);
960
                    if (builder.column_type(idx) == ColumnType::UINT64)
961
                        builder.append_uint64(idx, num.get_uint64());
962
                    else
963
                        builder.append_null(idx);
964
                } else {
965
                    auto idx =
966
                        builder.add_or_get_column(full_key, ColumnType::DOUBLE);
967
                    if (builder.column_type(idx) == ColumnType::DOUBLE)
968
                        builder.append_double(idx, num.get_double());
969
                    else
970
                        builder.append_null(idx);
971
                }
972
                break;
973
            }
974
            case simdjson::ondemand::json_type::string: {
975
                auto str_result = sub_val.get_string();
976
                if (str_result.error()) break;
977
                auto str = str_result.value_unsafe();
978
                auto idx =
979
                    builder.add_or_get_column(full_key, ColumnType::STRING);
980
                if (builder.column_type(idx) == ColumnType::STRING)
981
                    builder.append_string(idx, str);
982
                else
983
                    builder.append_null(idx);
984
                break;
985
            }
986
            case simdjson::ondemand::json_type::boolean: {
987
                auto bool_result = sub_val.get_bool();
988
                if (bool_result.error()) break;
989
                auto b = bool_result.value_unsafe();
990
                auto idx =
991
                    builder.add_or_get_column(full_key, ColumnType::BOOL);
992
                if (builder.column_type(idx) == ColumnType::BOOL)
993
                    builder.append_bool(idx, b);
994
                else
995
                    builder.append_null(idx);
996
                break;
997
            }
998
            case simdjson::ondemand::json_type::null: {
999
                auto existing = builder.find_column(full_key);
1000
                if (existing) builder.append_null(*existing);
1001
                break;
1002
            }
1003
            case simdjson::ondemand::json_type::object:
1004
            case simdjson::ondemand::json_type::array: {
1005
                // Serialize nested object/array to JSON string
1006
                auto json_str = SVH::to_json_string(sub_val);
1007
                auto idx =
1008
                    builder.add_or_get_column(full_key, ColumnType::STRING);
1009
                if (json_str) {
1010
                    builder.append_string(
1011
                        idx, arena.push(json_str->data(), json_str->size()));
1012
                } else {
1013
                    builder.append_null(idx);
1014
                }
1015
                break;
1016
            }
1017
            default:
1018
                break;
1019
        }
1020
    }
1021
}
1022

1023
static bool build_arrow_row(RecordBatchBuilder &builder, JsonParser &parser,
×
1024
                            StringArena &arena, bool normalize) {
1025
    if (normalize) return normalize_row(builder, arena, parser);
×
1026

1027
    using SVH = JsonValueHelper;
1028
    parser.for_each_field([&](std::string_view key_sv,
×
1029
                              simdjson::ondemand::value val) {
1030
        auto type_result = val.type();
×
1031
        if (type_result.error()) return;
×
1032
        auto json_type = type_result.value_unsafe();
×
1033
        switch (json_type) {
×
1034
            case simdjson::ondemand::json_type::number: {
1035
                auto num_result = val.get_number();
×
1036
                if (num_result.error()) break;
×
1037
                auto num = num_result.value_unsafe();
×
1038
                if (num.is_int64()) {
×
UNCOV
1039
                    std::size_t idx =
×
1040
                        builder.add_or_get_column(key_sv, ColumnType::INT64);
×
1041
                    builder.append_int64(idx, num.get_int64());
×
1042
                } else if (num.is_uint64()) {
×
UNCOV
1043
                    std::size_t idx =
×
1044
                        builder.add_or_get_column(key_sv, ColumnType::UINT64);
×
1045
                    builder.append_uint64(idx, num.get_uint64());
×
UNCOV
1046
                } else {
×
UNCOV
1047
                    std::size_t idx =
×
1048
                        builder.add_or_get_column(key_sv, ColumnType::DOUBLE);
×
1049
                    builder.append_double(idx, num.get_double());
×
1050
                }
1051
                break;
×
1052
            }
1053
            case simdjson::ondemand::json_type::string: {
1054
                auto str_result = val.get_string();
×
1055
                if (str_result.error()) break;
×
1056
                auto str = str_result.value_unsafe();
×
UNCOV
1057
                std::size_t idx =
×
1058
                    builder.add_or_get_column(key_sv, ColumnType::STRING);
×
1059
                builder.append_string(idx, str);
×
1060
                break;
×
1061
            }
1062
            case simdjson::ondemand::json_type::boolean: {
1063
                auto bool_result = val.get_bool();
×
1064
                if (bool_result.error()) break;
×
1065
                auto b = bool_result.value_unsafe();
×
UNCOV
1066
                std::size_t idx =
×
1067
                    builder.add_or_get_column(key_sv, ColumnType::BOOL);
×
1068
                builder.append_bool(idx, b);
×
1069
                break;
×
1070
            }
1071
            case simdjson::ondemand::json_type::null: {
1072
                auto existing = builder.find_column(key_sv);
×
1073
                if (existing) builder.append_null(*existing);
×
1074
                break;
×
1075
            }
1076
            case simdjson::ondemand::json_type::object:
1077
            case simdjson::ondemand::json_type::array: {
1078
                auto json_str = SVH::to_json_string(val);
×
UNCOV
1079
                std::size_t idx =
×
1080
                    builder.add_or_get_column(key_sv, ColumnType::STRING);
×
1081
                if (json_str) {
×
1082
                    builder.append_string(
×
1083
                        idx, arena.push(json_str->data(), json_str->size()));
×
UNCOV
1084
                } else {
×
1085
                    builder.append_null(idx);
×
1086
                }
1087
                break;
1088
            }
×
1089
            default:
1090
                break;
×
1091
        }
UNCOV
1092
    });
×
1093
    builder.end_row();
×
1094
    return true;
×
UNCOV
1095
}
×
1096

1097
static bool process_json_line(RecordBatchBuilder &builder, JsonParser &parser,
1098
                              StringArena &arena, std::string_view content,
1099
                              bool normalize) {
1100
    const char *trimmed;
1101
    std::size_t trimmed_length;
1102
    if (!dftracer::utils::json_trim_and_validate_with_comma(
1103
            content.data(), content.size(), trimmed, trimmed_length))
1104
        return false;
1105
    if (!parser.parse(std::string_view(trimmed, trimmed_length))) return false;
1106
    return build_arrow_row(builder, parser, arena, normalize);
1107
}
1108

1109
static CoroTask<void> produce_arrow_for_file(
1110
    dftracer::utils::coro::Channel<ArrowExportResult> *chan,
1111
    std::string file_path, std::string index_dir, std::size_t checkpoint_size,
1112
    bool auto_build_index, ReadConfig rc, std::size_t batch_size,
1113
    bool normalize, std::atomic<bool> *cancelled) {
1114
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer(chan);
1115
    auto guard = producer.guard();
1116

1117
    TraceReaderConfig cfg;
1118
    cfg.file_path = std::move(file_path);
1119
    cfg.index_dir = std::move(index_dir);
1120
    cfg.checkpoint_size = checkpoint_size;
1121
    cfg.auto_build_index = auto_build_index;
1122

1123
    TraceReader reader(std::move(cfg));
1124

1125
    // Fast path: non-normalized Arrow build happens inside TraceReader.
1126
    // Normalize still goes through read_json + build_arrow_row for the
1127
    // richer schema derivation.
1128
    if (!normalize) {
1129
        auto batch_gen = reader.read_arrow(rc, batch_size);
1130
        while (auto batch_opt = co_await batch_gen.next()) {
1131
            if (cancelled->load(std::memory_order_acquire)) co_return;
1132
            if (!co_await producer.send(std::move(*batch_opt))) co_return;
1133
        }
1134
        co_return;
1135
    }
1136

1137
    auto gen = reader.read_json(rc);
1138
    RecordBatchBuilder builder;
1139
    builder.reserve(batch_size);
1140
    StringArena arena;
1141

1142
    while (auto opt = co_await gen.next()) {
1143
        if (cancelled->load(std::memory_order_acquire)) co_return;
1144
        if (!build_arrow_row(builder, *opt->parser, arena, normalize)) continue;
1145
        if (builder.num_rows() >= batch_size) {
1146
            auto result = builder.finish();
1147
            arena.clear();
1148
            if (!co_await producer.send(std::move(result))) co_return;
1149
            if (!builder.is_schema_locked()) builder.lock_schema();
1150
            builder.reset(true);
1151
            builder.reserve(batch_size);
1152
        }
1153
    }
1154
    if (builder.num_rows() > 0) {
1155
        co_await producer.send(builder.finish());
1156
    }
1157
    co_return;
1158
}
1159

1160
static CoroTask<void> file_worker(
1161
    std::shared_ptr<dftracer::utils::coro::Channel<std::string>> file_chan,
1162
    dftracer::utils::coro::Channel<ArrowExportResult> *out_chan,
1163
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1164
    ReadConfig rc, std::size_t batch_size, bool normalize,
1165
    std::atomic<bool> *cancelled) {
1166
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer(
1167
        out_chan);
1168
    auto guard = producer.guard();
1169

1170
    while (auto file_path = co_await file_chan->receive()) {
1171
        if (cancelled->load(std::memory_order_acquire)) co_return;
1172
        TraceReaderConfig cfg;
1173
        cfg.file_path = std::move(*file_path);
1174
        cfg.index_dir = index_dir;
1175
        cfg.checkpoint_size = checkpoint_size;
1176
        cfg.auto_build_index = auto_build_index;
1177

1178
        TraceReader reader(std::move(cfg));
1179

1180
        if (!normalize) {
1181
            auto batch_gen = reader.read_arrow(rc, batch_size);
1182
            while (auto batch_opt = co_await batch_gen.next()) {
1183
                if (cancelled->load(std::memory_order_acquire)) co_return;
1184
                if (!co_await producer.send(std::move(*batch_opt))) co_return;
1185
            }
1186
            continue;
1187
        }
1188

1189
        auto gen = reader.read_json(rc);
1190
        RecordBatchBuilder builder;
1191
        builder.reserve(batch_size);
1192
        StringArena arena;
1193

1194
        while (auto opt = co_await gen.next()) {
1195
            if (cancelled->load(std::memory_order_acquire)) co_return;
1196
            if (!build_arrow_row(builder, *opt->parser, arena, normalize))
1197
                continue;
1198
            if (builder.num_rows() >= batch_size) {
1199
                auto result = builder.finish();
1200
                arena.clear();
1201
                if (!co_await producer.send(std::move(result))) co_return;
1202
                if (!builder.is_schema_locked()) builder.lock_schema();
1203
                builder.reset(true);
1204
                builder.reserve(batch_size);
1205
            }
1206
        }
1207
        if (builder.num_rows() > 0) {
1208
            if (!co_await producer.send(builder.finish())) co_return;
1209
        }
1210
    }
1211
    co_return;
1212
}
1213

1214
// Extract AND-of-EQ leaves from a Query AST. Returns nullopt if the predicate
1215
// shape is anything else (NE, range ops, IN, NOT, OR), in which case the
1216
// uniform-match shortcut does not apply.
1217
static std::optional<std::vector<std::pair<std::string, std::string>>>
1218
extract_eq_leaves(
×
1219
    const dftracer::utils::utilities::common::query::QueryNode &node) {
1220
    namespace q_ns = dftracer::utils::utilities::common::query;
1221
    using LeafVec = std::vector<std::pair<std::string, std::string>>;
1222

1223
    auto literal_to_string = [](const q_ns::LiteralNode &lit) -> std::string {
×
UNCOV
1224
        return std::visit(
×
1225
            [](auto &&v) -> std::string {
×
1226
                using T = std::decay_t<decltype(v)>;
1227
                if constexpr (std::is_same_v<T, std::string>)
1228
                    return v;
×
1229
                else if constexpr (std::is_same_v<T, bool>)
1230
                    return v ? "true" : "false";
×
1231
                else if constexpr (std::is_same_v<T, int64_t>)
1232
                    return std::to_string(v);
×
1233
                else if constexpr (std::is_same_v<T, uint64_t>)
1234
                    return std::to_string(v);
×
1235
                else if constexpr (std::is_same_v<T, double>)
1236
                    return std::to_string(v);
×
1237
                else
1238
                    return {};
1239
            },
1240
            lit.value);
×
1241
    };
1242

UNCOV
1243
    return std::visit(
×
1244
        [&](const auto &n) -> std::optional<LeafVec> {
×
1245
            using T = std::decay_t<decltype(n)>;
1246
            if constexpr (std::is_same_v<T, q_ns::CompareNode>) {
1247
                if (n.op != q_ns::CompareOp::EQ) return std::nullopt;
×
1248
                return LeafVec{{n.field.path, literal_to_string(n.value)}};
×
1249
            } else if constexpr (std::is_same_v<T, q_ns::AndNode>) {
1250
                auto l = extract_eq_leaves(*n.left);
×
1251
                if (!l) return std::nullopt;
×
1252
                auto r = extract_eq_leaves(*n.right);
×
1253
                if (!r) return std::nullopt;
×
1254
                l->insert(l->end(), r->begin(), r->end());
×
1255
                return l;
×
1256
            } else {
×
1257
                return std::nullopt;
×
1258
            }
UNCOV
1259
        },
×
1260
        node.data);
×
1261
}
1262

1263
// True iff every checkpoint in `chunk_idxs` has dim_stats min == max == literal
1264
// for every leaf. Empty leaves -> false (no shortcut). Missing dim_stats for
1265
// any (chunk, leaf) -> false (we don't know, play safe).
1266
static bool all_chunks_uniform_match(
×
1267
    const dftracer::utils::utilities::indexer::IndexDatabase &db, int fid,
1268
    const std::vector<std::pair<std::string, std::string>> &leaves,
1269
    const std::vector<std::uint64_t> &chunk_idxs) {
1270
    if (leaves.empty() || chunk_idxs.empty()) return false;
×
1271
    namespace indexing = dftracer::utils::utilities::composites::dft::indexing;
1272

1273
    for (const auto &[dim, val] : leaves) {
×
1274
        auto rows = db.query_chunk_dimension_stats_for_dimension(fid, dim);
×
1275
        if (rows.empty()) return false;
×
1276
        std::unordered_map<std::uint64_t,
1277
                           const indexing::ChunkDimensionStatsResult *>
1278
            by_ckpt;
×
1279
        by_ckpt.reserve(rows.size());
×
1280
        for (const auto &r : rows) by_ckpt.emplace(r.checkpoint_idx, &r);
×
1281
        for (auto cidx : chunk_idxs) {
×
1282
            auto it = by_ckpt.find(cidx);
×
1283
            if (it == by_ckpt.end()) return false;
×
1284
            const auto &ds = *it->second;
×
1285
            if (ds.min_value != val || ds.max_value != val) return false;
×
1286
        }
1287
    }
×
1288
    return true;
×
UNCOV
1289
}
×
1290

1291
// Byte-range work unit for checkpoint-level parallelism. Each unit covers
1292
// one or more consecutive checkpoints from a single file. Decompression of
1293
// a single gz file is sequential per gzip stream, so splitting at
1294
// checkpoint-aligned byte offsets is what lets multiple workers share the
1295
// decode work for one file.
1296
struct ArrowWorkItem {
211✔
1297
    std::string file_path;
1298
    std::size_t start_byte = 0;
211✔
1299
    std::size_t end_byte = 0;
211✔
1300
    bool start_at_checkpoint = false;
211✔
1301
    bool end_at_checkpoint = false;
211✔
1302
    // When true, every kept chunk for this byte range is uniform-matching
1303
    // (dim_stats min == max == predicate literal for every AND-of-EQ leaf),
1304
    // so per-event predicate eval is skippable.
1305
    bool chunk_prune_only = false;
211✔
1306
    // Line-range work items override byte ranges: the worker passes these
1307
    // down as LINE_RANGE on the read, and the gzip stream resolves them to
1308
    // byte offsets via the checkpoint index. 0 = no line constraint.
1309
    std::size_t start_line = 0;
211✔
1310
    std::size_t end_line = 0;
211✔
1311
};
1312

1313
// User-supplied byte/line clip applied to every emitted work item.
1314
struct ClipRange {
1315
    std::size_t start_byte = 0, end_byte = 0;
1316
    std::size_t start_line = 0, end_line = 0;
1317
    bool has_line_clip() const { return start_line > 0 || end_line > 0; }
10✔
1318
    bool has_byte_clip() const { return end_byte > start_byte; }
4✔
1319
};
1320

1321
// Geometry of the pruner chunks for one file, derived from its gzip recovery
1322
// points. Pruner chunk_idx is 0-indexed over uncompressed slices: recovery
1323
// point ckpts[k] sits at the START of chunk (k+1); chunk 0 has no recovery
1324
// point at its start (decoded from gzip stream start). Total chunks =
1325
// ckpts.size()+1.
1326
struct ChunkGeometry {
1327
    const std::vector<dftracer::utils::utilities::indexer::IndexerCheckpoint>
1328
        &ckpts;
1329

1330
    std::size_t total_chunks() const { return ckpts.size() + 1; }
4✔
1331
    std::size_t start_byte(std::uint64_t cidx) const {
6✔
1332
        if (cidx == 0) return 0;
6✔
1333
        return ckpts[cidx - 1].uc_offset;
4✔
1334
    }
6✔
1335
    std::size_t end_byte(std::uint64_t cidx) const {
6✔
1336
        if (cidx == 0) return ckpts.empty() ? 0 : ckpts[0].uc_offset;
6!
1337
        std::size_t k = cidx - 1;
4✔
1338
        return ckpts[k].uc_offset + ckpts[k].uc_size;
4✔
1339
    }
6✔
1340
    // Chunk 0 covers everything before the first recovery point; chunk k>=1
1341
    // spans recovery point (k-1).
1342
    std::size_t first_line(std::uint64_t cidx) const {
6✔
1343
        if (cidx == 0) return 1;
6✔
1344
        return ckpts[cidx - 1].first_line_num;
4✔
1345
    }
6✔
1346
    std::size_t last_line(std::uint64_t cidx) const {
6✔
1347
        if (cidx == 0) {
6✔
1348
            if (ckpts.empty()) return SIZE_MAX;
2!
1349
            return ckpts[0].first_line_num > 0 ? ckpts[0].first_line_num - 1
2!
1350
                                               : 0;
1351
        }
1352
        return ckpts[cidx - 1].last_line_num;
4✔
1353
    }
6✔
1354
};
1355

1356
// Select the chunk indices to scan for one file. Returns empty when the file
1357
// is fully pruned or no chunk overlaps the line clip (caller skips the file).
1358
static std::vector<std::uint64_t> select_kept_chunks(
4✔
1359
    const ChunkGeometry &geo, bool has_query,
1360
    const dftracer::utils::utilities::composites::dft::indexing::
1361
        ChunkPrunerOutput &pr,
1362
    const ClipRange &clip) {
1363
    const std::size_t total_chunks = geo.total_chunks();
4✔
1364
    std::vector<std::uint64_t> keep_chunks;
4✔
1365
    keep_chunks.reserve(total_chunks);
4!
1366
    if (has_query) {
4!
NEW
1367
        if (pr.success && !pr.file_may_match) {
×
NEW
1368
            return keep_chunks;  // whole file pruned
×
1369
        }
NEW
1370
        if (pr.success && !pr.candidate_checkpoints.empty() &&
×
NEW
1371
            pr.candidate_checkpoints.size() < pr.total_checkpoints) {
×
NEW
1372
            for (auto cidx : pr.candidate_checkpoints) {
×
NEW
1373
                if (cidx < total_chunks) keep_chunks.push_back(cidx);
×
1374
            }
NEW
1375
            std::sort(keep_chunks.begin(), keep_chunks.end());
×
NEW
1376
            keep_chunks.erase(
×
NEW
1377
                std::unique(keep_chunks.begin(), keep_chunks.end()),
×
NEW
1378
                keep_chunks.end());
×
NEW
1379
        } else {
×
NEW
1380
            for (std::uint64_t c = 0; c < total_chunks; ++c)
×
NEW
1381
                keep_chunks.push_back(c);
×
1382
        }
NEW
1383
    } else {
×
1384
        for (std::uint64_t c = 0; c < total_chunks; ++c)
12✔
1385
            keep_chunks.push_back(c);
8!
1386
    }
1387

1388
    // Intersect with the user's line range so workers only touch chunks that
1389
    // actually overlap it. Each work item carries the sub-line-range;
1390
    // LINE_RANGE on the read maps it back to bytes via the same checkpoint
1391
    // table the gzip stream uses.
1392
    if (clip.has_line_clip()) {
4!
1393
        std::size_t lo = clip.start_line > 0 ? clip.start_line : 1;
2!
1394
        std::size_t hi = clip.end_line > 0 ? clip.end_line : SIZE_MAX;
2!
1395
        std::vector<std::uint64_t> filtered;
2✔
1396
        filtered.reserve(keep_chunks.size());
2!
1397
        for (auto c : keep_chunks) {
6✔
1398
            std::size_t cf = geo.first_line(c);
4!
1399
            std::size_t cl = geo.last_line(c);
4!
1400
            if (cl < lo || cf > hi) continue;
4!
1401
            filtered.push_back(c);
2!
1402
        }
1403
        keep_chunks = std::move(filtered);
2✔
1404
    }
2✔
1405
    return keep_chunks;
4✔
1406
}
4!
1407

1408
// Group the kept chunks into contiguous work ranges (one per worker slot) and
1409
// append the resulting work items.
1410
static void emit_work_items(std::vector<ArrowWorkItem> &items,
4✔
1411
                            const std::string &fp, const ChunkGeometry &geo,
1412
                            const std::vector<std::uint64_t> &keep_chunks,
1413
                            bool file_pure_match, std::size_t max_workers,
1414
                            const ClipRange &clip) {
1415
    std::size_t target_ranges = std::max<std::size_t>(1, max_workers);
4✔
1416
    std::size_t per_range = std::max<std::size_t>(
4✔
1417
        1, (keep_chunks.size() + target_ranges - 1) / target_ranges);
4✔
1418

1419
    std::size_t group_start = 0;
4✔
1420
    while (group_start < keep_chunks.size()) {
10✔
1421
        std::size_t group_end = group_start;
6✔
1422
        std::size_t emitted = 0;
6✔
1423
        while (group_end < keep_chunks.size() && emitted < per_range) {
12✔
1424
            if (group_end > group_start &&
6!
NEW
1425
                keep_chunks[group_end] != keep_chunks[group_end - 1] + 1) {
×
NEW
1426
                break;
×
1427
            }
1428
            ++group_end;
6✔
1429
            ++emitted;
6✔
1430
        }
1431
        std::uint64_t scidx = keep_chunks[group_start];
6✔
1432
        std::uint64_t ecidx = keep_chunks[group_end - 1];
6✔
1433
        std::size_t start_byte = geo.start_byte(scidx);
6✔
1434
        std::size_t end_byte = geo.end_byte(ecidx);
6✔
1435
        // start_at_checkpoint: a gzip recovery point sits at start_byte (true
1436
        // for any cidx>=1; false for the implicit chunk 0 which decodes from
1437
        // stream start).
1438
        bool start_at_checkpoint = (scidx >= 1);
6✔
1439
        bool end_at_checkpoint = (group_end < keep_chunks.size());
6✔
1440
        if (clip.has_line_clip()) {
6✔
1441
            std::size_t lo = clip.start_line > 0 ? clip.start_line : 1;
2!
1442
            std::size_t hi = clip.end_line > 0 ? clip.end_line : SIZE_MAX;
2!
1443
            std::size_t cluster_first = geo.first_line(scidx);
2✔
1444
            std::size_t cluster_last = geo.last_line(ecidx);
2✔
1445
            std::size_t item_start = std::max<std::size_t>(lo, cluster_first);
2✔
1446
            std::size_t item_end = std::min<std::size_t>(hi, cluster_last);
2✔
1447
            if (item_start > item_end) {
2!
NEW
1448
                group_start = group_end;
×
NEW
1449
                continue;
×
1450
            }
1451
            ArrowWorkItem item;
2✔
1452
            item.file_path = fp;
2!
1453
            item.chunk_prune_only = file_pure_match;
2✔
1454
            item.start_line = item_start;
2✔
1455
            item.end_line = item_end;
2✔
1456
            items.push_back(std::move(item));
2!
1457
            group_start = group_end;
2✔
1458
            continue;
1459
        }
2✔
1460
        if (clip.has_byte_clip()) {
4✔
1461
            if (start_byte < clip.start_byte) {
2!
NEW
1462
                start_byte = clip.start_byte;
×
NEW
1463
                start_at_checkpoint = false;
×
NEW
1464
            }
×
1465
            if (end_byte > clip.end_byte) {
2!
1466
                end_byte = clip.end_byte;
2✔
1467
                end_at_checkpoint = false;
2✔
1468
            }
2✔
1469
            if (start_byte >= end_byte) {
2✔
1470
                group_start = group_end;
1✔
1471
                continue;
1✔
1472
            }
1473
        }
1✔
1474
        items.push_back({fp, start_byte, end_byte, start_at_checkpoint,
9!
1475
                         end_at_checkpoint, file_pure_match});
6✔
1476
        group_start = group_end;
3✔
1477
    }
1478
}
4✔
1479

1480
static std::vector<ArrowWorkItem> enumerate_work_items(
35✔
1481
    const std::vector<std::string> &files, const std::string &index_dir,
1482
    const std::string &query_str, std::size_t max_workers,
1483
    std::size_t clip_start_byte = 0, std::size_t clip_end_byte = 0,
1484
    std::size_t clip_start_line = 0, std::size_t clip_end_line = 0) {
1485
    namespace dft_internal =
1486
        dftracer::utils::utilities::composites::dft::internal;
1487
    namespace indexer_ns = dftracer::utils::utilities::indexer;
1488
    namespace indexing = dftracer::utils::utilities::composites::dft::indexing;
1489

1490
    const ClipRange clip{clip_start_byte, clip_end_byte, clip_start_line,
70✔
1491
                         clip_end_line};
35✔
1492

1493
    std::vector<ArrowWorkItem> items;
35✔
1494
    items.reserve(files.size() * 4);
35!
1495

1496
    auto push_unsplit = [&](const std::string &fp) {
92✔
1497
        ArrowWorkItem item;
57✔
1498
        item.file_path = fp;
57!
1499
        item.start_line = clip.start_line;
57✔
1500
        item.end_line = clip.end_line;
57✔
1501
        items.push_back(std::move(item));
57!
1502
    };
57✔
1503

1504
    // Parse the query once. Pruner input copies a Query, so we keep the
1505
    // parsed form around to feed each ChunkPrunerInput without re-parsing.
1506
    std::optional<dftracer::utils::utilities::common::query::Query> parsed;
35✔
1507
    if (!query_str.empty()) {
35✔
1508
        auto r = dftracer::utils::utilities::common::query::Query::from_string(
3!
1509
            query_str);
3✔
1510
        if (r) parsed = std::move(*r);
3!
1511
    }
3✔
1512

1513
    // All files in a directory-mode scan share the same `.dftindex` root.
1514
    // Group files by their resolved index path so we can open the RocksDB
1515
    // once per index and reuse it to prune every file against that handle.
1516
    std::unordered_map<std::string, std::vector<std::size_t>> by_index;
35✔
1517
    for (std::size_t i = 0; i < files.size(); ++i) {
96✔
1518
        std::string index_path =
1519
            dft_internal::determine_index_path(files[i], index_dir);
60✔
1520
        by_index[index_path].push_back(i);
61!
1521
    }
61✔
1522

1523
    for (auto &entry : by_index) {
75!
1524
        const auto &index_path = entry.first;
39✔
1525
        const auto &file_idxs = entry.second;
39✔
1526
        if (!fs::exists(index_path)) {
39!
1527
            for (auto i : file_idxs) push_unsplit(files[i]);
78!
1528
            continue;
31✔
1529
        }
1530
        std::unique_ptr<indexer_ns::IndexDatabase> idx_db;
8✔
1531
        try {
1532
            idx_db = std::make_unique<indexer_ns::IndexDatabase>(
8!
1533
                index_path,
8✔
1534
                dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadOnly);
8✔
1535
        } catch (...) {
8✔
1536
            for (auto i : file_idxs) push_unsplit(files[i]);
×
1537
            continue;
1538
        }
×
1539

1540
        // Resolve fid + checkpoints per file (cheap queries).
1541
        struct FileCtx {
1542
            std::size_t file_idx;
1543
            int fid;
1544
            std::vector<indexer_ns::IndexerCheckpoint> ckpts;
1545
        };
1546
        std::vector<FileCtx> file_ctxs;
8✔
1547
        file_ctxs.reserve(file_idxs.size());
8!
1548
        for (auto i : file_idxs) {
22✔
1549
            FileCtx fc;
14✔
1550
            fc.file_idx = i;
14✔
1551
            fc.fid = idx_db->get_file_info_id(
14!
1552
                indexer_ns::internal::get_logical_path(files[i]));
14!
1553
            if (fc.fid < 0) {
14!
1554
                push_unsplit(files[i]);
×
1555
                continue;
×
1556
            }
1557
            fc.ckpts = idx_db->query_checkpoints(fc.fid);
14!
1558
            if (fc.ckpts.empty()) {
14✔
1559
                push_unsplit(files[i]);
10!
1560
                continue;
10✔
1561
            }
1562
            std::sort(fc.ckpts.begin(), fc.ckpts.end(),
4!
1563
                      [](const auto &a, const auto &b) {
×
1564
                          return a.first_line_num < b.first_line_num;
×
1565
                      });
1566
            file_ctxs.push_back(std::move(fc));
4!
1567
        }
14✔
1568

1569
        // Batch-prune all files against the shared index: dim_stats and
1570
        // chunk_statistics are loaded in one RocksDB scan each instead of
1571
        // one scan per file.
1572
        std::vector<indexing::ChunkPrunerOutput> pruner_outs(file_ctxs.size());
8!
1573
        if (parsed && !file_ctxs.empty()) {
8!
1574
            indexing::ChunkPrunerBatchInput batch_in;
×
1575
            batch_in.index_path = index_path;
×
1576
            batch_in.external_db = idx_db.get();
×
1577
            batch_in.items.reserve(file_ctxs.size());
×
1578
            for (auto &fc : file_ctxs) {
×
1579
                batch_in.items.push_back({files[fc.file_idx], *parsed});
×
1580
            }
1581
            indexing::ChunkPrunerUtility pruner;
×
1582
            auto batch_out = pruner.process_batch(batch_in);
×
1583
            if (batch_out.success) {
×
1584
                pruner_outs = std::move(batch_out.outputs);
×
UNCOV
1585
            }
×
1586
        }
×
1587

1588
        // For AND-of-EQ predicates, precompute uniform-match leaves once.
1589
        // Per-file pure_match is checked inline below and lets workers skip
1590
        // per-event predicate eval on chunks where dim_stats min == max ==
1591
        // literal for every leaf.
1592
        std::optional<std::vector<std::pair<std::string, std::string>>>
1593
            eq_leaves;
8✔
1594
        if (parsed) eq_leaves = extract_eq_leaves(parsed->root());
8!
1595

1596
        for (std::size_t fc_idx = 0; fc_idx < file_ctxs.size(); ++fc_idx) {
12✔
1597
            auto &fc = file_ctxs[fc_idx];
4✔
1598
            const auto &fp = files[fc.file_idx];
4✔
1599

1600
            ChunkGeometry geo{fc.ckpts};
4✔
1601
            std::vector<std::uint64_t> keep_chunks = select_kept_chunks(
4!
1602
                geo, parsed.has_value(), pruner_outs[fc_idx], clip);
4✔
1603
            if (keep_chunks.empty()) continue;
4!
1604

1605
            // All-or-nothing per file: if every kept chunk is uniform-matching
1606
            // for every leaf, every work item from this file gets the
1607
            // chunk_prune_only fast path. Mixed files fall back to per-event
1608
            // eval to stay safe.
1609
            bool file_pure_match = false;
4✔
1610
            if (eq_leaves && !eq_leaves->empty() && idx_db) {
4!
1611
                file_pure_match = all_chunks_uniform_match(
×
1612
                    *idx_db, fc.fid, *eq_leaves, keep_chunks);
×
UNCOV
1613
            }
×
1614

1615
            emit_work_items(items, fp, geo, keep_chunks, file_pure_match,
8!
1616
                            max_workers, clip);
4✔
1617
        }
4!
1618
    }
8!
1619
    return items;
35✔
1620
}
37!
1621

1622
static CoroTask<void> send_work_items_to_channel(
428!
1623
    std::shared_ptr<dftracer::utils::coro::Channel<ArrowWorkItem>> chan,
1624
    const std::vector<ArrowWorkItem> *items, std::atomic<bool> *cancelled) {
36!
1625
    for (const auto &it : *items) {
346✔
1626
        if (cancelled->load(std::memory_order_acquire)) break;
186!
1627
        if (!co_await chan->send(it)) break;
284!
1628
    }
62✔
1629
    chan->close();
36!
1630
    co_return;
36✔
1631
}
248✔
1632

1633
static CoroTask<void> checkpoint_worker(
295!
1634
    std::shared_ptr<dftracer::utils::coro::Channel<ArrowWorkItem>> work_chan,
1635
    dftracer::utils::coro::Channel<ArrowExportResult> *out_chan,
1636
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1637
    ReadConfig rc, std::size_t batch_size, bool normalize,
1638
    std::atomic<bool> *cancelled) {
52!
1639
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer(
102!
1640
        out_chan);
51✔
1641
    auto guard = producer.guard();
51!
1642

1643
    // Cache readers keyed by file path so we don't re-probe the same file
1644
    // when successive work items land on it.
1645
    std::unordered_map<std::string, std::shared_ptr<TraceReader>> readers;
51✔
1646

1647
    while (auto item = co_await work_chan->receive()) {
325!
1648
        if (cancelled->load(std::memory_order_acquire)) co_return;
186!
1649

1650
        auto &reader_ptr = readers[item->file_path];
186!
1651
        if (!reader_ptr) {
186✔
1652
            TraceReaderConfig cfg;
62✔
1653
            cfg.file_path = item->file_path;
62!
1654
            cfg.index_dir = index_dir;
62!
1655
            cfg.checkpoint_size = checkpoint_size;
62✔
1656
            cfg.auto_build_index = auto_build_index;
62✔
1657
            reader_ptr = std::make_shared<TraceReader>(std::move(cfg));
62!
1658
        }
62✔
1659

1660
        ReadConfig local_rc = rc;
186!
1661
        if (item->start_line > 0 || item->end_line > 0) {
186!
1662
            // Line-range work items: the read drives off LINE_RANGE; the
1663
            // gzip stream resolves it back to byte offsets via checkpoints.
1664
            local_rc.start_line = item->start_line;
127✔
1665
            local_rc.end_line = item->end_line;
127✔
1666
            local_rc.start_byte = 0;
127✔
1667
            local_rc.end_byte = 0;
127✔
1668
            local_rc.start_at_checkpoint = false;
127✔
1669
            local_rc.end_at_checkpoint = false;
127✔
1670
        } else {
127✔
1671
            local_rc.start_byte = item->start_byte;
59✔
1672
            local_rc.end_byte = item->end_byte;
59✔
1673
            local_rc.start_at_checkpoint = item->start_at_checkpoint;
59✔
1674
            local_rc.end_at_checkpoint = item->end_at_checkpoint;
59✔
1675
        }
1676
        // Pruning already happened at enumeration time; avoid the per-
1677
        // work-item RocksDB opens that would otherwise dwarf the actual
1678
        // read cost at directory scale (256 files * N ranges).
1679
        local_rc.skip_pruning = true;
186✔
1680
        // chunks pre-classified as uniform-matching skip per-event eval.
1681
        if (item->chunk_prune_only) local_rc.chunk_prune_only = true;
186!
1682

1683
        if (!normalize) {
186!
1684
            auto batch_gen = reader_ptr->read_arrow(local_rc, batch_size);
186!
1685
            while (auto batch_opt = co_await batch_gen.next()) {
570!
1686
                if (cancelled->load(std::memory_order_acquire)) co_return;
243!
1687
                if (!co_await producer.send(std::move(*batch_opt))) co_return;
324!
1688
            }
304!
1689
            continue;
62✔
1690
        }
62✔
1691

UNCOV
1692
        auto gen = reader_ptr->read_json(local_rc);
×
UNCOV
1693
        RecordBatchBuilder builder;
×
UNCOV
1694
        builder.reserve(batch_size);
×
UNCOV
1695
        StringArena arena;
×
1696

UNCOV
1697
        while (auto opt = co_await gen.next()) {
×
UNCOV
1698
            if (cancelled->load(std::memory_order_acquire)) co_return;
×
UNCOV
1699
            if (!build_arrow_row(builder, *opt->parser, arena, normalize))
×
UNCOV
1700
                continue;
×
UNCOV
1701
            if (builder.num_rows() >= batch_size) {
×
UNCOV
1702
                auto result = builder.finish();
×
UNCOV
1703
                arena.clear();
×
UNCOV
1704
                if (!co_await producer.send(std::move(result))) co_return;
×
UNCOV
1705
                if (!builder.is_schema_locked()) builder.lock_schema();
×
UNCOV
1706
                builder.reset(true);
×
UNCOV
1707
                builder.reserve(batch_size);
×
UNCOV
1708
            }
×
UNCOV
1709
        }
×
UNCOV
1710
        if (builder.num_rows() > 0) {
×
UNCOV
1711
            if (!co_await producer.send(builder.finish())) co_return;
×
UNCOV
1712
        }
×
1713
    }
238!
1714
    co_return;
52✔
1715
}
1,175✔
1716

1717
static CoroTask<void> spawn_arrow_producers(
180!
1718
    CoroScope &child,
1719
    dftracer::utils::coro::Channel<ArrowExportResult> *out_chan,
1720
    const std::vector<ArrowWorkItem> *work_items, const std::string *index_dir,
1721
    std::size_t checkpoint_size, bool auto_build_index, const ReadConfig *rc,
1722
    std::size_t batch_size, bool normalize, std::atomic<bool> *cancelled_ptr,
1723
    std::size_t max_workers) {
36!
1724
    std::size_t num_workers = std::min(work_items->size(), max_workers);
36!
1725
    if (num_workers == 0) num_workers = 1;
36!
1726
    auto work_chan =
36✔
1727
        dftracer::utils::coro::make_channel<ArrowWorkItem>(num_workers);
36!
1728

1729
    for (std::size_t i = 0; i < num_workers; ++i) {
88✔
1730
        child.spawn([out_chan, wc = work_chan, idx = *index_dir,
104!
1731
                     checkpoint_size, auto_build_index, r = *rc, batch_size,
156!
1732
                     normalize, cancelled_ptr](CoroScope &) {
104✔
1733
            return checkpoint_worker(wc, out_chan, idx, checkpoint_size,
52!
1734
                                     auto_build_index, r, batch_size, normalize,
52!
1735
                                     cancelled_ptr);
52✔
UNCOV
1736
        });
×
1737
    }
52✔
1738

1739
    child.spawn([wc = work_chan, work_items, cancelled_ptr](CoroScope &) {
72!
1740
        return send_work_items_to_channel(wc, work_items, cancelled_ptr);
36!
UNCOV
1741
    });
×
1742
    co_return;
72✔
1743
}
36✔
1744

1745
static CoroTask<void> produce_arrow_batches_for_files(
257!
1746
    CoroScope &scope, ArrowIteratorState *sp, std::vector<std::string> files,
1747
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1748
    ReadConfig rc, std::size_t batch_size, bool normalize,
1749
    std::size_t max_workers) {
37!
1750
    try {
1751
        if (files.empty()) {
109✔
1752
            sp->channel->close();
1!
1753
            co_return;
38✔
1754
        }
1755

1756
        auto work_items = enumerate_work_items(
216!
1757
            files, index_dir, rc.query, max_workers, rc.start_byte, rc.end_byte,
108✔
1758
            rc.start_line, rc.end_line);
108✔
1759
        if (work_items.empty()) {
108!
UNCOV
1760
            sp->channel->close();
×
UNCOV
1761
            co_return;
×
1762
        }
1763

1764
        auto *chan_ptr = sp->channel.get();
108✔
1765
        auto *cancelled_ptr = &sp->cancelled;
108✔
1766

1767
        co_await scope.scope([chan_ptr, &work_items, &index_dir,
1,044!
1768
                              checkpoint_size, auto_build_index, &rc,
216✔
1769
                              batch_size, normalize, cancelled_ptr,
324✔
1770
                              max_workers](CoroScope &child) -> CoroTask<void> {
144!
1771
            co_await spawn_arrow_producers(
288!
1772
                child, chan_ptr, &work_items, &index_dir, checkpoint_size,
108✔
1773
                auto_build_index, &rc, batch_size, normalize, cancelled_ptr,
108✔
1774
                max_workers);
108✔
1775
        });
72✔
1776
    } catch (...) {
36!
UNCOV
1777
        sp->set_error(std::current_exception());
×
UNCOV
1778
    }
×
1779
}
72✔
1780

1781
static CoroTask<void> produce_arrow_batches_parallel(
165!
1782
    CoroScope &scope, ArrowIteratorState *sp, std::string dir_path,
1783
    std::string index_dir, std::size_t checkpoint_size, bool auto_build_index,
1784
    ReadConfig rc, std::size_t batch_size, bool normalize,
1785
    std::size_t max_workers) {
15!
1786
    try {
1787
        PatternDirectoryScannerUtility scanner;
45!
1788
        auto scan_input = PatternDirectoryScannerUtilityInput(
90!
1789
            dir_path, {".pfw", ".pfw.gz"}, true, false);
45!
1790
        auto entries = co_await scope.spawn(scanner, scan_input);
75!
1791

1792
        std::vector<std::string> files;
45✔
1793
        files.reserve(entries.size());
45✔
1794
        for (auto &e : entries) files.push_back(e.path.string());
53!
1795
        std::sort(files.begin(), files.end());
15✔
1796

1797
        co_await produce_arrow_batches_for_files(
105!
1798
            scope, sp, std::move(files), std::move(index_dir), checkpoint_size,
45✔
1799
            auto_build_index, std::move(rc), batch_size, normalize,
45✔
1800
            max_workers);
45✔
1801
    } catch (...) {
45✔
UNCOV
1802
        sp->set_error(std::current_exception());
×
UNCOV
1803
    }
×
1804
}
180✔
1805

1806
CoroTask<void> produce_arrow_batches(
119!
1807
    std::shared_ptr<ArrowIteratorState> state,
1808
    dftracer::utils::coro::ChannelProducer<ArrowExportResult> producer,
1809
    TraceReaderConfig cfg, ReadConfig rc, std::size_t batch_size,
1810
    bool flatten_objects = false, bool normalize = false) {
17!
1811
    (void)flatten_objects;
1812

1813
    auto guard = producer.guard();
51!
1814
    try {
1815
        TraceReader reader(std::move(cfg));
51!
1816

1817
        if (!normalize) {
51!
1818
            auto batch_gen = reader.read_arrow(rc, batch_size);
51!
1819
            while (auto batch_opt = co_await batch_gen.next()) {
213!
1820
                if (state->cancelled.load(std::memory_order_acquire)) break;
97✔
1821
                auto result_bytes =
96✔
1822
                    dftracer::utils::python::byte_size(*batch_opt);
96!
1823
                state->bytes_in_queue.fetch_add(result_bytes,
96✔
1824
                                                std::memory_order_acq_rel);
1825
                if (!co_await producer.send(std::move(*batch_opt))) break;
128!
1826
            }
113!
1827
            co_return;
17✔
1828
        }
17✔
1829

UNCOV
1830
        auto gen = reader.read_json(rc);
×
UNCOV
1831
        RecordBatchBuilder builder;
×
UNCOV
1832
        builder.reserve(batch_size);
×
1833

UNCOV
1834
        StringArena arena;
×
1835

UNCOV
1836
        while (auto opt = co_await gen.next()) {
×
UNCOV
1837
            if (state->cancelled.load(std::memory_order_acquire)) break;
×
UNCOV
1838
            if (!build_arrow_row(builder, *opt->parser, arena, normalize))
×
UNCOV
1839
                continue;
×
1840

UNCOV
1841
            if (builder.num_rows() >= batch_size) {
×
UNCOV
1842
                auto result = builder.finish();
×
UNCOV
1843
                arena.clear();
×
UNCOV
1844
                auto result_bytes = dftracer::utils::python::byte_size(result);
×
UNCOV
1845
                state->bytes_in_queue.fetch_add(result_bytes,
×
1846
                                                std::memory_order_acq_rel);
UNCOV
1847
                if (!co_await producer.send(std::move(result))) break;
×
UNCOV
1848
                if (!builder.is_schema_locked()) {
×
UNCOV
1849
                    builder.lock_schema();
×
UNCOV
1850
                }
×
UNCOV
1851
                builder.reset(true);
×
UNCOV
1852
                builder.reserve(batch_size);
×
UNCOV
1853
            }
×
UNCOV
1854
        }
×
1855

UNCOV
1856
        if (builder.num_rows() > 0 &&
×
UNCOV
1857
            !state->cancelled.load(std::memory_order_acquire)) {
×
UNCOV
1858
            auto result = builder.finish();
×
UNCOV
1859
            auto result_bytes = dftracer::utils::python::byte_size(result);
×
UNCOV
1860
            state->bytes_in_queue.fetch_add(result_bytes,
×
1861
                                            std::memory_order_acq_rel);
UNCOV
1862
            co_await producer.send(std::move(result));
×
UNCOV
1863
        }
×
1864
    } catch (...) {
17!
UNCOV
1865
        state->set_error(std::current_exception());
×
UNCOV
1866
    }
×
1867
}
341!
1868

1869
#endif  // DFTRACER_UTILS_ENABLE_ARROW
1870

1871
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
1872

1873
struct WriteArrowStats {
39✔
1874
    std::unordered_map<std::string, PartitionWriteStats> partitions;
1875
    int64_t total_rows = 0;
39✔
1876
    int64_t total_uncompressed_bytes = 0;
39✔
1877
};
1878

1879
struct WriteArrowResult {
52✔
1880
    WriteArrowStats stats;
1881
    std::string error;
1882
    std::uint64_t chunks_scanned = 0;
39✔
1883
    std::uint64_t chunks_skipped = 0;
39✔
1884
};
1885

1886
CoroTask<WriteArrowResult> write_arrow_pipeline(
177!
1887
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
1888
    std::vector<ViewDefinition> views, std::string output_path,
1889
    int64_t chunk_size_bytes, IpcCompression compression,
1890
    std::size_t event_batch_size) {
13!
1891
    namespace dft_internal =
1892
        dftracer::utils::utilities::composites::dft::internal;
1893
    WriteArrowResult result;
13✔
1894

1895
    try {
1896
        if (views.empty()) {
13✔
1897
            views.push_back(ViewDefinition().with_name("all"));
8!
1898
        }
8✔
1899

1900
        std::string resolved_index =
13✔
1901
            index_path.empty()
26!
1902
                ? dft_internal::determine_index_path(file_path, "")
13!
UNCOV
1903
                : index_path;
×
1904

1905
        auto meta_input = MetadataCollectorUtilityInput::from_file(file_path)
26!
1906
                              .with_checkpoint_size(checkpoint_size)
13!
1907
                              .with_index(resolved_index);
13!
1908
        auto metadata = co_await MetadataCollectorUtility{}.process(meta_input);
39!
1909
        if (!metadata.success) {
13!
UNCOV
1910
            result.error =
×
UNCOV
1911
                "Failed to collect metadata: " + metadata.error_message;
×
UNCOV
1912
            co_return result;
×
1913
        }
1914

1915
        for (const auto &view : views) {
99✔
1916
            std::string view_output = output_path;
30!
1917
            if (views.size() > 1 || view.name != "all") {
30✔
1918
                view_output = output_path + "/" + view.name;
42!
1919
            }
6✔
1920

1921
            PartitionWriter writer;
42!
1922
            int rc_open = co_await writer.open(view_output, chunk_size_bytes,
98!
1923
                                               compression);
42✔
1924
            if (rc_open != 0) {
42!
UNCOV
1925
                result.error =
×
UNCOV
1926
                    "Failed to open partition writer for view: " + view.name;
×
UNCOV
1927
                co_return result;
×
1928
            }
1929

1930
            ViewBuilderInput builder_input;
42✔
1931
            builder_input.with_view(view)
42✔
1932
                .with_file_path(file_path)
14!
1933
                .with_index_path(resolved_index)
14!
1934
                .with_uncompressed_size(metadata.uncompressed_size)
14!
1935
                .with_num_checkpoints(metadata.num_checkpoints);
14✔
1936

1937
            auto build_output =
42✔
1938
                co_await ViewBuilderUtility{}.process(builder_input);
56!
1939
            if (!build_output.success) {
58!
UNCOV
1940
                result.error = "ViewBuilder failed for view: " + view.name;
×
UNCOV
1941
                co_return result;
×
1942
            }
1943

1944
            result.chunks_skipped += build_output.skipped_checkpoints;
58✔
1945

1946
            if (!build_output.file_may_match) {
58✔
1947
                auto stats = co_await writer.close();
24!
1948
                result.stats.partitions[view.name] = std::move(stats);
6!
1949
                continue;
1950
            }
6✔
1951

1952
            RecordBatchBuilder builder;
40!
1953
            bool schema_locked = false;
40✔
1954

1955
            for (const auto &candidate : build_output.candidates) {
64!
1956
                ViewReaderInput reader_input;
40✔
1957
                reader_input.with_file_path(file_path)
40✔
1958
                    .with_index_path(resolved_index)
8!
1959
                    .with_checkpoint_size(checkpoint_size)
8!
1960
                    .with_byte_range(candidate.start_byte, candidate.end_byte)
8!
1961
                    .with_checkpoint_idx(candidate.checkpoint_idx)
8!
1962
                    .with_event_batch_size(event_batch_size)
8!
1963
                    .with_view(view);
8!
1964
                reader_input.query = view.query;
8✔
1965

1966
                ViewReaderUtility reader;
40!
1967
                auto gen = reader.process(reader_input);
40!
1968
                while (auto opt = co_await gen.next()) {
64!
1969
                    auto arrow_batch = opt->to_arrow(builder);
24!
1970
                    int rc_write = co_await writer.write_batch(arrow_batch);
32!
1971
                    if (rc_write != 0) {
8!
UNCOV
1972
                        result.error =
×
UNCOV
1973
                            "Failed to write batch for view: " + view.name;
×
UNCOV
1974
                        co_return result;
×
1975
                    }
1976
                    if (!schema_locked) {
8!
1977
                        builder.lock_schema();
8✔
1978
                        schema_locked = true;
8✔
1979
                    }
8✔
1980
                    builder.reset(true);
8!
1981
                }
32!
1982
                result.chunks_scanned++;
8✔
1983
            }
24✔
1984

1985
            auto stats = co_await writer.close();
32!
1986
            result.stats.partitions[view.name] = std::move(stats);
8!
1987
            result.stats.total_rows +=
8✔
1988
                result.stats.partitions[view.name].total_rows;
8!
1989
            result.stats.total_uncompressed_bytes +=
8✔
1990
                result.stats.partitions[view.name].total_uncompressed_bytes;
8!
1991
        }
86✔
1992
    } catch (const std::exception &e) {
69!
UNCOV
1993
        result.error = e.what();
×
UNCOV
1994
    }
×
1995
    co_return result;
13!
1996
}
379✔
1997

1998
struct ViewChunkInfo {
1999
    std::uint64_t checkpoint_idx;
2000
    std::size_t start_byte;
2001
    std::size_t end_byte;
2002
};
2003

2004
struct GetViewChunksResult {
12✔
2005
    std::vector<ViewChunkInfo> chunks;
2006
    std::uint64_t total_checkpoints = 0;
9✔
2007
    std::uint64_t skipped_checkpoints = 0;
9✔
2008
    bool file_may_match = false;
9✔
2009
    std::string error;
2010
};
2011

2012
CoroTask<GetViewChunksResult> get_view_chunks_pipeline(
27!
2013
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
2014
    ViewDefinition view) {
3!
2015
    namespace dft_internal =
2016
        dftracer::utils::utilities::composites::dft::internal;
2017
    GetViewChunksResult result;
3✔
2018

2019
    try {
2020
        std::string resolved_index =
3✔
2021
            index_path.empty()
6!
2022
                ? dft_internal::determine_index_path(file_path, "")
3!
UNCOV
2023
                : index_path;
×
2024

2025
        auto meta_input = MetadataCollectorUtilityInput::from_file(file_path)
3!
2026
                              .with_checkpoint_size(checkpoint_size)
3✔
2027
                              .with_index(resolved_index);
3!
2028
        auto metadata = co_await MetadataCollectorUtility{}.process(meta_input);
9!
2029
        if (!metadata.success) {
9!
UNCOV
2030
            result.error =
×
UNCOV
2031
                "Failed to collect metadata: " + metadata.error_message;
×
UNCOV
2032
            co_return result;
×
2033
        }
2034

2035
        ViewBuilderInput builder_input;
9✔
2036
        builder_input.with_view(view)
9✔
2037
            .with_file_path(file_path)
3!
2038
            .with_index_path(resolved_index)
3!
2039
            .with_uncompressed_size(metadata.uncompressed_size)
3!
2040
            .with_num_checkpoints(metadata.num_checkpoints);
3✔
2041

2042
        auto build_output =
9✔
2043
            co_await ViewBuilderUtility{}.process(builder_input);
12!
2044
        if (!build_output.success) {
3!
UNCOV
2045
            result.error = "ViewBuilder failed";
×
UNCOV
2046
            co_return result;
×
2047
        }
2048

2049
        result.file_may_match = build_output.file_may_match;
3✔
2050
        result.total_checkpoints = build_output.total_checkpoints;
3✔
2051
        result.skipped_checkpoints = build_output.skipped_checkpoints;
3✔
2052

2053
        for (const auto &candidate : build_output.candidates) {
7✔
2054
            result.chunks.push_back({candidate.checkpoint_idx,
12!
2055
                                     candidate.start_byte, candidate.end_byte});
8✔
2056
        }
4✔
2057
    } catch (const std::exception &e) {
9!
UNCOV
2058
        result.error = e.what();
×
UNCOV
2059
    }
×
2060
    co_return result;
3!
2061
}
33✔
2062

2063
struct WriteViewChunkResult {
16✔
2064
    std::string output_file;
2065
    std::uint64_t events_matched = 0;
12✔
2066
    std::uint64_t events_scanned = 0;
12✔
2067
    int64_t rows_written = 0;
12✔
2068
    int64_t bytes_written = 0;
12✔
2069
    std::string error;
2070
};
2071

2072
CoroTask<WriteViewChunkResult> write_view_chunk_pipeline(
52!
2073
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
2074
    ViewDefinition view, std::uint64_t checkpoint_idx, std::size_t start_byte,
2075
    std::size_t end_byte, std::string output_file, IpcCompression compression,
2076
    std::size_t event_batch_size) {
4!
2077
    namespace dft_internal =
2078
        dftracer::utils::utilities::composites::dft::internal;
2079
    WriteViewChunkResult result;
4✔
2080
    result.output_file = output_file;
4!
2081

2082
    try {
2083
        std::string resolved_index =
4✔
2084
            index_path.empty()
8!
2085
                ? dft_internal::determine_index_path(file_path, "")
4!
UNCOV
2086
                : index_path;
×
2087

2088
        dftracer::utils::utilities::common::arrow::IpcWriter writer;
4!
2089
        int rc_open = co_await writer.open(output_file, compression);
12!
2090
        if (rc_open != 0) {
20!
UNCOV
2091
            result.error = "Failed to open output file";
×
UNCOV
2092
            co_return result;
×
2093
        }
2094

2095
        ViewReaderInput reader_input;
20✔
2096
        reader_input.with_file_path(file_path)
20✔
2097
            .with_index_path(resolved_index)
4!
2098
            .with_checkpoint_size(checkpoint_size)
4!
2099
            .with_byte_range(start_byte, end_byte)
4!
2100
            .with_checkpoint_idx(checkpoint_idx)
4!
2101
            .with_event_batch_size(event_batch_size)
4!
2102
            .with_view(view);
4!
2103
        reader_input.query = view.query;
4✔
2104

2105
        RecordBatchBuilder builder;
20!
2106
        bool schema_locked = false;
20✔
2107

2108
        ViewReaderUtility reader;
20!
2109
        auto gen = reader.process(reader_input);
20!
2110
        while (auto opt = co_await gen.next()) {
32!
2111
            result.events_matched += opt->events_matched;
12✔
2112
            result.events_scanned += opt->events_scanned;
12✔
2113
            auto batch = opt->to_arrow(builder);
12!
2114
            if (batch.valid()) {
12!
2115
                result.rows_written += batch.num_rows();
12✔
2116
                int rc = co_await writer.write_batch(batch);
16!
2117
                if (rc != 0) {
4!
UNCOV
2118
                    result.error = "Failed to write batch";
×
UNCOV
2119
                    co_return result;
×
2120
                }
2121
                if (!schema_locked) {
4!
2122
                    builder.lock_schema();
4✔
2123
                    schema_locked = true;
4✔
2124
                }
4✔
2125
                builder.reset(true);
4!
2126
            }
4!
2127
        }
16!
2128

2129
        int rc = co_await writer.close();
16!
2130
        if (rc != 0) {
4!
UNCOV
2131
            result.error = "Failed to close output file";
×
UNCOV
2132
        }
×
2133
    } catch (const std::exception &e) {
20!
UNCOV
2134
        result.error = e.what();
×
UNCOV
2135
    }
×
2136
    co_return result;
4!
2137
}
108✔
2138

2139
struct ChunkDescriptor {
2140
    std::uint64_t checkpoint_idx;
2141
    std::size_t start_byte;
2142
    std::size_t end_byte;
2143
    std::string output_file;
2144
};
2145

2146
struct WriteViewChunksResult {
3✔
2147
    std::vector<WriteViewChunkResult> results;
2148
    int64_t total_rows = 0;
3✔
2149
    int64_t total_events_matched = 0;
3✔
2150
};
2151

2152
CoroTask<WriteViewChunksResult> write_view_chunks_pipeline(
7!
2153
    std::string file_path, std::string index_path, std::size_t checkpoint_size,
2154
    ViewDefinition view, std::vector<ChunkDescriptor> chunks,
2155
    IpcCompression compression, std::size_t event_batch_size) {
1!
2156
    WriteViewChunksResult result;
3✔
2157

2158
    if (chunks.empty()) {
3!
2159
        co_return result;
1!
2160
    }
2161

2162
    std::vector<CoroTask<WriteViewChunkResult>> tasks;
3✔
2163
    tasks.reserve(chunks.size());
3!
2164

2165
    for (const auto &chunk : chunks) {
6✔
2166
        tasks.push_back(write_view_chunk_pipeline(
6!
2167
            file_path, index_path, checkpoint_size, view, chunk.checkpoint_idx,
3!
2168
            chunk.start_byte, chunk.end_byte, chunk.output_file, compression,
3!
2169
            event_batch_size));
3✔
2170
    }
3✔
2171

2172
    result.results = co_await when_all(std::move(tasks));
4!
2173

2174
    for (const auto &r : result.results) {
4✔
2175
        result.total_rows += r.rows_written;
3✔
2176
        result.total_events_matched += r.events_matched;
3✔
2177
    }
3✔
2178

2179
    co_return result;
1!
2180
}
5!
2181

2182
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
2183

2184
TraceReaderConfig build_config(TraceReaderObject *self) {
151✔
2185
    TraceReaderConfig cfg;
151✔
2186
    cfg.file_path = PyUnicode_AsUTF8(self->file_path);
151!
2187
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
151!
2188
    if (idx) cfg.index_dir = idx;
151!
2189
    cfg.checkpoint_size = self->checkpoint_size;
151✔
2190
    cfg.auto_build_index = self->auto_build_index != 0;
151✔
2191
    return cfg;
151✔
2192
}
151!
2193

2194
static Runtime *get_runtime(TraceReaderObject *self) {
150✔
2195
    if (self->runtime_obj) {
150✔
2196
        return ((RuntimeObject *)self->runtime_obj)->runtime.get();
34✔
2197
    }
2198
    return get_default_runtime();
116✔
2199
}
150✔
2200

2201
static TraceReaderIteratorObject *make_memoryview_iterator(
71✔
2202
    std::shared_ptr<MemoryViewBatchIteratorState> state) {
2203
    TraceReaderIteratorObject *it =
71✔
2204
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
71✔
2205
            &TraceReaderIteratorType, 0);
2206
    if (!it) return NULL;
71!
2207
    new (&it->batch_state)
71✔
2208
        std::shared_ptr<MemoryViewBatchIteratorState>(std::move(state));
71✔
2209
    it->current_batch = NULL;
71✔
2210
    it->batch_index = 0;
71✔
2211
    new (&it->json_dict_state) std::shared_ptr<JsonDictIteratorState>();
71✔
2212
    new (&it->json_dict_current_batch) std::shared_ptr<JsonDictBatch>();
71✔
2213
    it->json_dict_index = 0;
71✔
2214
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2215
    new (&it->arrow_state) std::shared_ptr<ArrowIteratorState>();
71✔
2216
#endif
2217
    it->mode = IteratorMode::MEMORYVIEW;
71✔
2218
    return it;
71✔
2219
}
71✔
2220

2221
static TraceReaderIteratorObject *make_json_dict_iterator(
7✔
2222
    std::shared_ptr<JsonDictIteratorState> state) {
2223
    TraceReaderIteratorObject *it =
7✔
2224
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
7✔
2225
            &TraceReaderIteratorType, 0);
2226
    if (!it) return NULL;
7!
2227
    new (&it->batch_state) std::shared_ptr<MemoryViewBatchIteratorState>();
7✔
2228
    it->current_batch = NULL;
7✔
2229
    it->batch_index = 0;
7✔
2230
    new (&it->json_dict_state)
7✔
2231
        std::shared_ptr<JsonDictIteratorState>(std::move(state));
7✔
2232
    new (&it->json_dict_current_batch) std::shared_ptr<JsonDictBatch>();
7✔
2233
    it->json_dict_index = 0;
7✔
2234
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2235
    new (&it->arrow_state) std::shared_ptr<ArrowIteratorState>();
7✔
2236
#endif
2237
    it->mode = IteratorMode::JSON_DICT;
7✔
2238
    return it;
7✔
2239
}
7✔
2240

2241
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2242
static TraceReaderIteratorObject *make_arrow_iterator(
27✔
2243
    std::shared_ptr<ArrowIteratorState> state) {
2244
    TraceReaderIteratorObject *it =
27✔
2245
        (TraceReaderIteratorObject *)TraceReaderIteratorType.tp_alloc(
27✔
2246
            &TraceReaderIteratorType, 0);
2247
    if (!it) return NULL;
27!
2248
    new (&it->batch_state) std::shared_ptr<MemoryViewBatchIteratorState>();
27✔
2249
    it->current_batch = NULL;
27✔
2250
    it->batch_index = 0;
27✔
2251
    new (&it->json_dict_state) std::shared_ptr<JsonDictIteratorState>();
27✔
2252
    new (&it->json_dict_current_batch) std::shared_ptr<JsonDictBatch>();
27✔
2253
    it->json_dict_index = 0;
27✔
2254
    new (&it->arrow_state)
27✔
2255
        std::shared_ptr<ArrowIteratorState>(std::move(state));
27✔
2256
    it->mode = IteratorMode::ARROW;
27✔
2257
    return it;
27✔
2258
}
27✔
2259
#endif
2260

2261
}  // namespace
2262

2263
static void TraceReader_dealloc(TraceReaderObject *self) {
158✔
2264
    Py_XDECREF(self->file_path);
158✔
2265
    Py_XDECREF(self->index_dir);
158✔
2266
    Py_XDECREF(self->runtime_obj);
158✔
2267
    Py_TYPE(self)->tp_free((PyObject *)self);
158✔
2268
}
158✔
2269

2270
static PyObject *TraceReader_new(PyTypeObject *type, PyObject *args,
158✔
2271
                                 PyObject *kwds) {
2272
    TraceReaderObject *self = (TraceReaderObject *)type->tp_alloc(type, 0);
158✔
2273
    if (self) {
158!
2274
        self->file_path = NULL;
158✔
2275
        self->index_dir = NULL;
158✔
2276
        self->checkpoint_size =
158✔
2277
            dftracer::utils::constants::indexer::DEFAULT_CHECKPOINT_SIZE;
2278
        self->auto_build_index = 0;
158✔
2279
        self->has_index = 0;
158✔
2280
        self->runtime_obj = NULL;
158✔
2281
    }
158✔
2282
    return (PyObject *)self;
158✔
2283
}
2284

2285
static int TraceReader_init(TraceReaderObject *self, PyObject *args,
158✔
2286
                            PyObject *kwds) {
2287
    static const char *kwlist[] = {
2288
        "path",    "index_dir", "checkpoint_size", "auto_build_index",
2289
        "runtime", NULL};
2290

2291
    const char *file_path;
2292
    const char *index_dir = "";
158✔
2293
    std::size_t checkpoint_size =
158✔
2294
        dftracer::utils::constants::indexer::DEFAULT_CHECKPOINT_SIZE;
2295
    int auto_build_index = 0;
158✔
2296
    PyObject *runtime_arg = NULL;
158✔
2297

2298
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|snpO", (char **)kwlist,
158!
2299
                                     &file_path, &index_dir, &checkpoint_size,
2300
                                     &auto_build_index, &runtime_arg)) {
2301
        return -1;
×
2302
    }
2303

2304
    if (runtime_arg && runtime_arg != Py_None) {
158!
2305
        if (PyObject_TypeCheck(runtime_arg, &RuntimeType)) {
32!
2306
            // Direct C++ Runtime object
2307
            Py_INCREF(runtime_arg);
×
2308
            self->runtime_obj = runtime_arg;
×
UNCOV
2309
        } else {
×
2310
            // Python wrapper, extract _native attribute
2311
            PyObject *native = PyObject_GetAttrString(runtime_arg, "_native");
32✔
2312
            if (native && PyObject_TypeCheck(native, &RuntimeType)) {
32!
2313
                self->runtime_obj = native;  // already incref'd by GetAttr
32✔
2314
            } else {
32✔
2315
                Py_XDECREF(native);
×
2316
                PyErr_SetString(PyExc_TypeError,
×
2317
                                "runtime must be a Runtime instance or None");
2318
                return -1;
×
2319
            }
2320
        }
2321
    }
32✔
2322

2323
    self->file_path = PyUnicode_FromString(file_path);
158✔
2324
    if (!self->file_path) return -1;
158!
2325

2326
    self->index_dir = PyUnicode_FromString(index_dir);
158✔
2327
    if (!self->index_dir) {
158!
2328
        Py_DECREF(self->file_path);
×
2329
        self->file_path = NULL;
×
2330
        return -1;
×
2331
    }
2332

2333
    self->checkpoint_size = checkpoint_size;
158✔
2334
    self->auto_build_index = auto_build_index;
158✔
2335

2336
    try {
2337
        TraceReaderConfig cfg;
158✔
2338
        cfg.file_path = file_path;
158!
2339
        cfg.index_dir = index_dir;
158!
2340
        cfg.checkpoint_size = checkpoint_size;
158✔
2341
        cfg.auto_build_index = auto_build_index != 0;
158✔
2342
        TraceReader probe(std::move(cfg));
158!
2343
        self->has_index = probe.has_index() ? 1 : 0;
158!
2344
    } catch (const std::exception &e) {
158!
NEW
2345
        set_typed_py_error(e);
×
2346
        Py_DECREF(self->file_path);
×
2347
        Py_DECREF(self->index_dir);
×
2348
        self->file_path = NULL;
×
2349
        self->index_dir = NULL;
×
2350
        return -1;
×
2351
    }
×
2352

2353
    return 0;
158✔
2354
}
158✔
2355

2356
static PyObject *TraceReader_iter_lines(TraceReaderObject *self, PyObject *args,
62✔
2357
                                        PyObject *kwds) {
2358
    static const char *kwlist[] = {"start_line",    "end_line",    "start_byte",
2359
                                   "end_byte",      "buffer_size", "query",
2360
                                   "memory_budget", NULL};
2361
    Py_ssize_t start_line = 0, end_line = 0;
62✔
2362
    Py_ssize_t start_byte = 0, end_byte = 0;
62✔
2363
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
62✔
2364
    const char *query_str = NULL;
62✔
2365
    Py_ssize_t memory_budget = 0;
62✔
2366

2367
    if (!PyArg_ParseTupleAndKeywords(
62!
2368
            args, kwds, "|nnnnnzn", (char **)kwlist, &start_line, &end_line,
62✔
2369
            &start_byte, &end_byte, &buffer_size, &query_str, &memory_budget)) {
2370
        return NULL;
×
2371
    }
2372

2373
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
62!
2374
        buffer_size <= 0) {
59✔
2375
        PyErr_SetString(
3✔
2376
            PyExc_ValueError,
3✔
2377
            "range arguments must be >= 0; buffer_size must be > 0");
2378
        return NULL;
3✔
2379
    }
2380

2381
    TraceReaderConfig cfg;
59✔
2382
    try {
2383
        cfg = build_config(self);
59!
2384
    } catch (const std::exception &e) {
59!
NEW
2385
        set_typed_py_error(e);
×
2386
        return NULL;
×
2387
    }
×
2388

2389
    ReadConfig rc;
59✔
2390
    rc.start_line = static_cast<std::size_t>(start_line);
59✔
2391
    rc.end_line = static_cast<std::size_t>(end_line);
59✔
2392
    rc.start_byte = static_cast<std::size_t>(start_byte);
59✔
2393
    rc.end_byte = static_cast<std::size_t>(end_byte);
59✔
2394
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
59✔
2395
    if (query_str) rc.query = query_str;
59!
2396

2397
    auto state = std::make_shared<MemoryViewBatchIteratorState>();
59!
2398
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
59!
2399
        static_cast<std::size_t>(memory_budget));
59✔
2400

2401
    Runtime *rt = get_runtime(self);
59!
2402
    std::size_t max_workers = rt->threads();
59!
2403
    constexpr std::size_t LINE_BATCH_SIZE = 1024;
59✔
2404
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
59!
2405
        state->memory_budget_bytes, LINE_BATCH_SIZE * ESTIMATED_BYTES_PER_LINE,
59✔
2406
        max_workers);
59✔
2407
    state->channel =
59✔
2408
        dftracer::utils::coro::make_channel<MemoryViewBatchData>(capacity);
59!
2409
    auto *sp = state.get();
59✔
2410

2411
    try {
2412
        bool is_dir = fs::is_directory(cfg.file_path);
59!
2413
        if (is_dir) {
59✔
2414
            auto handle = rt->scope(
6!
2415
                "iter_lines_parallel",
3!
2416
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
24!
2417
                 checkpoint_size = cfg.checkpoint_size,
3✔
2418
                 auto_build_index = cfg.auto_build_index, rc,
6!
2419
                 max_workers](CoroScope &scope) -> CoroTask<void> {
6!
2420
                    co_await produce_lines_parallel(
24!
2421
                        scope, sp, dir_path, index_dir, checkpoint_size,
9!
2422
                        auto_build_index, rc, LINE_BATCH_SIZE, max_workers);
9!
2423
                });
6✔
2424
            state->task_future = handle.future;
3!
2425
        } else {
3✔
2426
            auto handle = rt->submit(
112!
2427
                produce_lines_batched(state, state->channel->producer(), cfg,
112!
2428
                                      rc, LINE_BATCH_SIZE),
56!
2429
                "iter_lines");
56!
2430
            state->task_future = handle.future;
56!
2431
        }
56✔
2432
    } catch (const std::exception &e) {
59!
NEW
2433
        set_typed_py_error(e);
×
2434
        return NULL;
×
2435
    }
×
2436

2437
    TraceReaderIteratorObject *it = make_memoryview_iterator(std::move(state));
59!
2438
    return (PyObject *)it;
59✔
2439
}
62✔
2440

2441
static PyObject *TraceReader_iter_raw(TraceReaderObject *self, PyObject *args,
13✔
2442
                                      PyObject *kwds) {
2443
    static const char *kwlist[] = {"start_line", "end_line",    "start_byte",
2444
                                   "end_byte",   "buffer_size", "line_aligned",
2445
                                   "multi_line", "query",       "memory_budget",
2446
                                   NULL};
2447
    Py_ssize_t start_line = 0, end_line = 0;
13✔
2448
    Py_ssize_t start_byte = 0, end_byte = 0;
13✔
2449
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
13✔
2450
    int line_aligned = 1;
13✔
2451
    int multi_line = 1;
13✔
2452
    const char *query_str = NULL;
13✔
2453
    Py_ssize_t memory_budget = 0;
13✔
2454

2455
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnppzn", (char **)kwlist,
13!
2456
                                     &start_line, &end_line, &start_byte,
2457
                                     &end_byte, &buffer_size, &line_aligned,
2458
                                     &multi_line, &query_str, &memory_budget)) {
2459
        return NULL;
×
2460
    }
2461

2462
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
13!
2463
        buffer_size <= 0) {
12✔
2464
        PyErr_SetString(
1✔
2465
            PyExc_ValueError,
1✔
2466
            "range arguments must be >= 0; buffer_size must be > 0");
2467
        return NULL;
1✔
2468
    }
2469

2470
    TraceReaderConfig cfg;
12✔
2471
    try {
2472
        cfg = build_config(self);
12!
2473
    } catch (const std::exception &e) {
12!
NEW
2474
        set_typed_py_error(e);
×
2475
        return NULL;
×
2476
    }
×
2477

2478
    ReadConfig rc;
12✔
2479
    rc.start_line = static_cast<std::size_t>(start_line);
12✔
2480
    rc.end_line = static_cast<std::size_t>(end_line);
12✔
2481
    rc.start_byte = static_cast<std::size_t>(start_byte);
12✔
2482
    rc.end_byte = static_cast<std::size_t>(end_byte);
12✔
2483
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
12✔
2484
    rc.line_aligned = line_aligned != 0;
12✔
2485
    rc.multi_line = multi_line != 0;
12✔
2486
    if (query_str) rc.query = query_str;
12!
2487

2488
    auto state = std::make_shared<MemoryViewBatchIteratorState>();
12!
2489
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
12!
2490
        static_cast<std::size_t>(memory_budget));
12✔
2491

2492
    Runtime *rt = get_runtime(self);
12!
2493
    std::size_t max_workers = rt->threads();
12!
2494
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
12!
2495
        state->memory_budget_bytes, ESTIMATED_BYTES_PER_RAW_CHUNK, max_workers);
12✔
2496
    state->channel =
12✔
2497
        dftracer::utils::coro::make_channel<MemoryViewBatchData>(capacity);
12!
2498
    auto *sp = state.get();
12✔
2499

2500
    try {
2501
        bool is_dir = fs::is_directory(cfg.file_path);
12!
2502
        if (is_dir) {
12✔
2503
            auto handle = rt->scope(
4!
2504
                "iter_raw_parallel",
2!
2505
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
16!
2506
                 checkpoint_size = cfg.checkpoint_size,
2✔
2507
                 auto_build_index = cfg.auto_build_index, rc,
4!
2508
                 max_workers](CoroScope &scope) -> CoroTask<void> {
4!
2509
                    co_await produce_raw_parallel(
16!
2510
                        scope, sp, dir_path, index_dir, checkpoint_size,
6!
2511
                        auto_build_index, rc, max_workers);
6!
2512
                });
4✔
2513
            state->task_future = handle.future;
2!
2514
        } else {
2✔
2515
            auto handle = rt->submit(
20!
2516
                produce_raw_batched(state, state->channel->producer(), cfg, rc),
10!
2517
                "iter_raw");
10!
2518
            state->task_future = handle.future;
10!
2519
        }
10✔
2520
    } catch (const std::exception &e) {
12!
NEW
2521
        set_typed_py_error(e);
×
2522
        return NULL;
×
2523
    }
×
2524

2525
    TraceReaderIteratorObject *it = make_memoryview_iterator(std::move(state));
12!
2526
    return (PyObject *)it;
12✔
2527
}
13✔
2528

2529
static PyObject *TraceReader_read_lines(TraceReaderObject *self, PyObject *args,
46✔
2530
                                        PyObject *kwds) {
2531
    PyObject *iter = TraceReader_iter_lines(self, args, kwds);
46✔
2532
    if (!iter) return NULL;
46✔
2533
    PyObject *list = PySequence_List(iter);
44✔
2534
    Py_DECREF(iter);
44✔
2535
    return list;
44✔
2536
}
46✔
2537

2538
static PyObject *TraceReader_iter_json(TraceReaderObject *self, PyObject *args,
7✔
2539
                                       PyObject *kwds) {
2540
    static const char *kwlist[] = {"start_line", "end_line",      "start_byte",
2541
                                   "end_byte",   "buffer_size",   "query",
2542
                                   "batch_size", "memory_budget", NULL};
2543
    Py_ssize_t start_line = 0, end_line = 0;
7✔
2544
    Py_ssize_t start_byte = 0, end_byte = 0;
7✔
2545
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
7✔
2546
    const char *query_str = NULL;
7✔
2547
    Py_ssize_t batch_size = 1024;
7✔
2548
    Py_ssize_t memory_budget = 0;
7✔
2549

2550
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|nnnnnznn", (char **)kwlist,
7!
2551
                                     &start_line, &end_line, &start_byte,
2552
                                     &end_byte, &buffer_size, &query_str,
2553
                                     &batch_size, &memory_budget)) {
2554
        return NULL;
×
2555
    }
2556

2557
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
14!
2558
        buffer_size <= 0 || batch_size <= 0) {
7!
2559
        PyErr_SetString(PyExc_ValueError,
×
2560
                        "range arguments must be >= 0; buffer_size and "
2561
                        "batch_size must be > 0");
2562
        return NULL;
×
2563
    }
2564

2565
    TraceReaderConfig cfg;
7✔
2566
    try {
2567
        cfg = build_config(self);
7!
2568
    } catch (const std::exception &e) {
7!
NEW
2569
        set_typed_py_error(e);
×
2570
        return NULL;
×
2571
    }
×
2572

2573
    ReadConfig rc;
7✔
2574
    rc.start_line = static_cast<std::size_t>(start_line);
7✔
2575
    rc.end_line = static_cast<std::size_t>(end_line);
7✔
2576
    rc.start_byte = static_cast<std::size_t>(start_byte);
7✔
2577
    rc.end_byte = static_cast<std::size_t>(end_byte);
7✔
2578
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
7✔
2579
    if (query_str) rc.query = query_str;
7!
2580

2581
    auto state = std::make_shared<JsonDictIteratorState>();
7!
2582
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
7!
2583
        static_cast<std::size_t>(memory_budget));
7✔
2584

2585
    Runtime *rt = get_runtime(self);
7!
2586
    std::size_t max_workers = rt->threads();
7!
2587
    auto bs = static_cast<std::size_t>(batch_size);
7✔
2588
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
7!
2589
        state->memory_budget_bytes, bs * ESTIMATED_BYTES_PER_JSON_EVENT,
7✔
2590
        max_workers);
7✔
2591
    state->channel =
7✔
2592
        dftracer::utils::coro::make_channel<JsonDictBatch>(capacity);
7!
2593
    auto *sp = state.get();
7✔
2594

2595
    try {
2596
        bool is_dir = fs::is_directory(cfg.file_path);
7!
2597
        if (is_dir) {
7✔
2598
            auto handle = rt->scope(
12!
2599
                "iter_json_parallel",
6!
2600
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
48!
2601
                 checkpoint_size = cfg.checkpoint_size,
6✔
2602
                 auto_build_index = cfg.auto_build_index, rc, bs,
12!
2603
                 max_workers](CoroScope &scope) -> CoroTask<void> {
12!
2604
                    co_await produce_json_dicts_parallel(
48!
2605
                        scope, sp, dir_path, index_dir, checkpoint_size,
18!
2606
                        auto_build_index, rc, bs, max_workers);
18!
2607
                });
12✔
2608
            state->task_future = handle.future;
6!
2609
        } else {
6✔
2610
            auto handle =
2611
                rt->submit(produce_json_dicts(state, state->channel->producer(),
3!
2612
                                              cfg, rc, bs),
1!
2613
                           "iter_json");
1!
2614
            state->task_future = handle.future;
1!
2615
        }
1✔
2616
    } catch (const std::exception &e) {
7!
NEW
2617
        set_typed_py_error(e);
×
2618
        return NULL;
×
2619
    }
×
2620

2621
    TraceReaderIteratorObject *it = make_json_dict_iterator(std::move(state));
7!
2622
    return (PyObject *)it;
7✔
2623
}
7✔
2624

2625
static PyObject *TraceReader_read_json_py(TraceReaderObject *self,
1✔
2626
                                          PyObject *args, PyObject *kwds) {
2627
    PyObject *iter = TraceReader_iter_json(self, args, kwds);
1✔
2628
    if (!iter) return NULL;
1!
2629
    PyObject *list = PySequence_List(iter);
1✔
2630
    Py_DECREF(iter);
1✔
2631
    return list;
1✔
2632
}
1✔
2633

2634
static PyObject *TraceReader_read_raw(TraceReaderObject *self, PyObject *args,
4✔
2635
                                      PyObject *kwds) {
2636
    PyObject *iter = TraceReader_iter_raw(self, args, kwds);
4✔
2637
    if (!iter) return NULL;
4!
2638
    PyObject *list = PySequence_List(iter);
4✔
2639
    Py_DECREF(iter);
4✔
2640
    return list;
4✔
2641
}
4✔
2642

2643
#ifdef DFTRACER_UTILS_ENABLE_ARROW
2644

2645
static PyObject *TraceReader_iter_arrow(TraceReaderObject *self, PyObject *args,
27✔
2646
                                        PyObject *kwds) {
2647
    static const char *kwlist[] = {
2648
        "batch_size", "start_line",    "end_line", "start_byte",
2649
        "end_byte",   "buffer_size",   "query",    "flatten_objects",
2650
        "normalize",  "memory_budget", NULL};
2651
    Py_ssize_t batch_size = 10000;
27✔
2652
    Py_ssize_t start_line = 0, end_line = 0;
27✔
2653
    Py_ssize_t start_byte = 0, end_byte = 0;
27✔
2654
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
27✔
2655
    const char *query_str = NULL;
27✔
2656
    int flatten_objects = 1;  // default: expand top-level objects
27✔
2657
    int normalize = 0;
27✔
2658
    Py_ssize_t memory_budget = 0;
27✔
2659

2660
    if (!PyArg_ParseTupleAndKeywords(
27!
2661
            args, kwds, "|nnnnnnzppn", (char **)kwlist, &batch_size,
27✔
2662
            &start_line, &end_line, &start_byte, &end_byte, &buffer_size,
2663
            &query_str, &flatten_objects, &normalize, &memory_budget)) {
2664
        return NULL;
×
2665
    }
2666

2667
    if (batch_size <= 0) {
27!
2668
        PyErr_SetString(PyExc_ValueError, "batch_size must be > 0");
×
2669
        return NULL;
×
2670
    }
2671
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
27!
2672
        buffer_size <= 0) {
27✔
2673
        PyErr_SetString(
×
UNCOV
2674
            PyExc_ValueError,
×
2675
            "range arguments must be >= 0; buffer_size must be > 0");
2676
        return NULL;
×
2677
    }
2678

2679
    TraceReaderConfig cfg;
27✔
2680
    try {
2681
        cfg = build_config(self);
27!
2682
    } catch (const std::exception &e) {
27!
NEW
2683
        set_typed_py_error(e);
×
2684
        return NULL;
×
2685
    }
×
2686

2687
    ReadConfig rc;
27✔
2688
    rc.start_line = static_cast<std::size_t>(start_line);
27✔
2689
    rc.end_line = static_cast<std::size_t>(end_line);
27✔
2690
    rc.start_byte = static_cast<std::size_t>(start_byte);
27✔
2691
    rc.end_byte = static_cast<std::size_t>(end_byte);
27✔
2692
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
27✔
2693
    rc.flatten_objects = flatten_objects != 0;
27✔
2694
    if (query_str) rc.query = query_str;
27!
2695

2696
    auto state = std::make_shared<ArrowIteratorState>();
27!
2697
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
27!
2698
        static_cast<std::size_t>(memory_budget));
27✔
2699

2700
    Runtime *rt = get_runtime(self);
27!
2701
    std::size_t max_workers = rt->threads();
27!
2702
    auto bs = static_cast<std::size_t>(batch_size);
27✔
2703
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
27!
2704
        state->memory_budget_bytes, bs * ESTIMATED_BYTES_PER_ARROW_ROW,
27✔
2705
        max_workers);
27✔
2706
    state->channel =
27✔
2707
        dftracer::utils::coro::make_channel<ArrowIteratorState::BatchType>(
27!
2708
            capacity);
27✔
2709
    auto *sp = state.get();
27✔
2710

2711
    try {
2712
        bool is_dir = fs::is_directory(cfg.file_path);
27!
2713
        if (is_dir) {
27✔
2714
            auto handle = rt->scope(
10!
2715
                "iter_arrow_parallel",
5!
2716
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
40!
2717
                 checkpoint_size = cfg.checkpoint_size,
5✔
2718
                 auto_build_index = cfg.auto_build_index, rc, bs,
10!
2719
                 norm = normalize != 0,
5✔
2720
                 max_workers](CoroScope &scope) -> CoroTask<void> {
10!
2721
                    co_await produce_arrow_batches_parallel(
40!
2722
                        scope, sp, dir_path, index_dir, checkpoint_size,
15!
2723
                        auto_build_index, rc, bs, norm, max_workers);
15!
2724
                });
10✔
2725
            state->task_future = handle.future;
5!
2726
        } else if (normalize) {
27!
UNCOV
2727
            auto handle = rt->submit(
×
2728
                produce_arrow_batches(state, state->channel->producer(), cfg,
×
UNCOV
2729
                                      rc, static_cast<std::size_t>(batch_size),
×
UNCOV
2730
                                      flatten_objects != 0, normalize != 0),
×
2731
                "iter_arrow");
×
2732
            state->task_future = handle.future;
×
2733
        } else {
×
2734
            std::vector<std::string> files_vec{cfg.file_path};
22!
2735
            auto handle = rt->scope(
44!
2736
                "iter_arrow_parallel",
22!
2737
                [sp, files = std::move(files_vec), index_dir = cfg.index_dir,
176!
2738
                 checkpoint_size = cfg.checkpoint_size,
22✔
2739
                 auto_build_index = cfg.auto_build_index, rc, bs,
44!
2740
                 norm = normalize != 0,
22✔
2741
                 max_workers](CoroScope &scope) mutable -> CoroTask<void> {
44!
2742
                    co_await produce_arrow_batches_for_files(
176!
2743
                        scope, sp, std::move(files), index_dir, checkpoint_size,
66!
2744
                        auto_build_index, rc, bs, norm, max_workers);
66!
2745
                });
44✔
2746
            state->task_future = handle.future;
22!
2747
        }
22✔
2748
    } catch (const std::exception &e) {
27!
NEW
2749
        set_typed_py_error(e);
×
2750
        return NULL;
×
2751
    }
×
2752

2753
    TraceReaderIteratorObject *it = make_arrow_iterator(std::move(state));
27!
2754
    return (PyObject *)it;
27✔
2755
}
27✔
2756

2757
// Build ArrowIteratorState + spawn the producer task. Same plumbing as
2758
// TraceReader_iter_arrow but returns the state so callers can wrap it as
2759
// either a per-batch iterator or an ArrowArrayStream.
2760
static std::shared_ptr<ArrowIteratorState> spawn_arrow_producer(
27✔
2761
    TraceReaderObject *self, PyObject *args, PyObject *kwds) {
2762
    static const char *kwlist[] = {
2763
        "batch_size", "start_line",    "end_line", "start_byte",
2764
        "end_byte",   "buffer_size",   "query",    "flatten_objects",
2765
        "normalize",  "memory_budget", NULL};
2766
    Py_ssize_t batch_size = 10000;
27✔
2767
    Py_ssize_t start_line = 0, end_line = 0;
27✔
2768
    Py_ssize_t start_byte = 0, end_byte = 0;
27✔
2769
    Py_ssize_t buffer_size = 4 * 1024 * 1024;
27✔
2770
    const char *query_str = NULL;
27✔
2771
    int flatten_objects = 1;  // default: expand top-level objects
27✔
2772
    int normalize = 0;
27✔
2773
    Py_ssize_t memory_budget = 0;
27✔
2774

2775
    if (!PyArg_ParseTupleAndKeywords(
27!
2776
            args, kwds, "|nnnnnnzppn", (char **)kwlist, &batch_size,
27✔
2777
            &start_line, &end_line, &start_byte, &end_byte, &buffer_size,
2778
            &query_str, &flatten_objects, &normalize, &memory_budget)) {
2779
        return nullptr;
×
2780
    }
2781

2782
    if (batch_size <= 0) {
27!
2783
        PyErr_SetString(PyExc_ValueError, "batch_size must be > 0");
×
2784
        return nullptr;
×
2785
    }
2786
    if (start_line < 0 || end_line < 0 || start_byte < 0 || end_byte < 0 ||
27!
2787
        buffer_size <= 0) {
27✔
2788
        PyErr_SetString(
×
UNCOV
2789
            PyExc_ValueError,
×
2790
            "range arguments must be >= 0; buffer_size must be > 0");
2791
        return nullptr;
×
2792
    }
2793

2794
    TraceReaderConfig cfg;
27✔
2795
    try {
2796
        cfg = build_config(self);
27!
2797
    } catch (const std::exception &e) {
27!
NEW
2798
        set_typed_py_error(e);
×
2799
        return nullptr;
×
2800
    }
×
2801

2802
    ReadConfig rc;
27✔
2803
    rc.start_line = static_cast<std::size_t>(start_line);
27✔
2804
    rc.end_line = static_cast<std::size_t>(end_line);
27✔
2805
    rc.start_byte = static_cast<std::size_t>(start_byte);
27✔
2806
    rc.end_byte = static_cast<std::size_t>(end_byte);
27✔
2807
    rc.buffer_size = static_cast<std::size_t>(buffer_size);
27✔
2808
    rc.flatten_objects = flatten_objects != 0;
27✔
2809
    if (query_str) rc.query = query_str;
27!
2810

2811
    auto state = std::make_shared<ArrowIteratorState>();
27!
2812
    state->memory_budget_bytes = dftracer::utils::compute_memory_budget(
27!
2813
        static_cast<std::size_t>(memory_budget));
27✔
2814

2815
    Runtime *rt = get_runtime(self);
27!
2816
    std::size_t max_workers = rt->threads();
27!
2817
    auto bs = static_cast<std::size_t>(batch_size);
27✔
2818
    std::size_t capacity = dftracer::utils::compute_channel_capacity(
27!
2819
        state->memory_budget_bytes, bs * ESTIMATED_BYTES_PER_ARROW_ROW,
27✔
2820
        max_workers);
27✔
2821
    state->channel =
27✔
2822
        dftracer::utils::coro::make_channel<ArrowIteratorState::BatchType>(
27!
2823
            capacity);
27✔
2824
    auto *sp = state.get();
27✔
2825

2826
    try {
2827
        bool is_dir = fs::is_directory(cfg.file_path);
27!
2828
        if (is_dir) {
27✔
2829
            auto handle = rt->scope(
20!
2830
                "iter_arrow_parallel",
10!
2831
                [sp, dir_path = cfg.file_path, index_dir = cfg.index_dir,
80!
2832
                 checkpoint_size = cfg.checkpoint_size,
10✔
2833
                 auto_build_index = cfg.auto_build_index, rc, bs,
20!
2834
                 norm = normalize != 0,
10✔
2835
                 max_workers](CoroScope &scope) -> CoroTask<void> {
20!
2836
                    co_await produce_arrow_batches_parallel(
80!
2837
                        scope, sp, dir_path, index_dir, checkpoint_size,
30!
2838
                        auto_build_index, rc, bs, norm, max_workers);
30!
2839
                });
20✔
2840
            state->task_future = handle.future;
10!
2841
        } else {
10✔
2842
            auto handle = rt->submit(
34!
2843
                produce_arrow_batches(state, state->channel->producer(), cfg,
34!
2844
                                      rc, static_cast<std::size_t>(batch_size),
17!
2845
                                      flatten_objects != 0, normalize != 0),
17✔
2846
                "iter_arrow");
17!
2847
            state->task_future = handle.future;
17!
2848
        }
17✔
2849
    } catch (const std::exception &e) {
27!
NEW
2850
        set_typed_py_error(e);
×
2851
        return nullptr;
×
2852
    }
×
2853

2854
    return state;
27✔
2855
}
27✔
2856

2857
static PyObject *TraceReader_iter_arrow_stream(TraceReaderObject *self,
14✔
2858
                                               PyObject *args, PyObject *kwds) {
2859
    auto state = spawn_arrow_producer(self, args, kwds);
14✔
2860
    if (!state) return NULL;
14!
2861
    return make_arrow_batch_stream(std::move(state));
14!
2862
}
14✔
2863

2864
static PyObject *TraceReader_read_arrow(TraceReaderObject *self, PyObject *args,
13✔
2865
                                        PyObject *kwds) {
2866
    auto state = spawn_arrow_producer(self, args, kwds);
13✔
2867
    if (!state) return NULL;
13!
2868
    PyObject *stream = make_arrow_batch_stream(std::move(state));
13!
2869
    if (!stream) return NULL;
13!
2870
    return dftracer::utils::python::wrap_arrow_stream_table(stream);
13!
2871
}
13✔
2872

2873
#endif  // DFTRACER_UTILS_ENABLE_ARROW
2874

2875
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
2876

2877
static int parse_str_list_trace(PyObject *obj, std::vector<std::string> &out,
2878
                                const char *param_name) {
2879
    if (!obj || obj == Py_None) return 0;
2880
    if (!PyList_Check(obj)) {
2881
        PyErr_Format(PyExc_TypeError, "%s must be a list of str", param_name);
2882
        return -1;
2883
    }
2884
    Py_ssize_t n = PyList_Size(obj);
2885
    for (Py_ssize_t i = 0; i < n; i++) {
2886
        const char *s = PyUnicode_AsUTF8(PyList_GetItem(obj, i));
2887
        if (!s) return -1;
2888
        out.emplace_back(s);
2889
    }
2890
    return 0;
2891
}
2892

2893
static PyObject *TraceReader_write_arrow(TraceReaderObject *self,
13✔
2894
                                         PyObject *args, PyObject *kwds) {
2895
    static const char *kwlist[] = {"path",        "views",      "chunk_size_mb",
2896
                                   "compression", "batch_size", NULL};
2897
    const char *path = NULL;
13✔
2898
    PyObject *views_obj = Py_None;
13✔
2899
    int chunk_size_mb = 32;
13✔
2900
    const char *compression_str = "zstd";
13✔
2901
    Py_ssize_t batch_size = 10000;
13✔
2902

2903
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|Oisn", (char **)kwlist,
13!
2904
                                     &path, &views_obj, &chunk_size_mb,
2905
                                     &compression_str, &batch_size)) {
2906
        return NULL;
×
2907
    }
2908

2909
    if (chunk_size_mb < 0) {
13!
2910
        PyErr_SetString(PyExc_ValueError, "chunk_size_mb must be >= 0");
×
2911
        return NULL;
×
2912
    }
2913

2914
    std::vector<ViewDefinition> views;
13✔
2915
    if (views_obj && views_obj != Py_None) {
13!
2916
        if (!PyList_Check(views_obj)) {
5!
2917
            PyErr_SetString(PyExc_TypeError, "views must be a list or None");
×
2918
            return NULL;
×
2919
        }
2920
        Py_ssize_t n = PyList_Size(views_obj);
5!
2921
        for (Py_ssize_t i = 0; i < n; i++) {
11✔
2922
            PyObject *item = PyList_GetItem(views_obj, i);
6!
2923
            ViewDefinition vd;
6✔
2924

2925
            if (PyUnicode_Check(item)) {
6!
2926
                const char *name = PyUnicode_AsUTF8(item);
1!
2927
                if (!name) return NULL;
1!
2928
                std::string name_str(name);
1!
2929
                if (name_str == "io") {
1!
2930
                    vd = ViewDefinition::io_view();
1!
2931
                } else if (name_str == "compute") {
1!
2932
                    vd = ViewDefinition::compute_view();
×
2933
                } else if (name_str == "dlio") {
×
2934
                    vd = ViewDefinition::dlio_view();
×
UNCOV
2935
                } else {
×
2936
                    vd.with_name(name_str);
×
2937
                }
2938
            } else if (PyDict_Check(item)) {
6!
2939
                PyObject *name_obj = PyDict_GetItemString(item, "name");
5!
2940
                if (!name_obj || !PyUnicode_Check(name_obj)) {
5!
2941
                    PyErr_SetString(PyExc_ValueError,
×
2942
                                    "view dict must have 'name' string");
2943
                    return NULL;
×
2944
                }
2945
                vd.with_name(PyUnicode_AsUTF8(name_obj));
5!
2946

2947
                PyObject *query_obj = PyDict_GetItemString(item, "query");
5!
2948
                if (query_obj && query_obj != Py_None) {
5!
2949
                    if (!PyUnicode_Check(query_obj)) {
5!
2950
                        PyErr_SetString(PyExc_ValueError,
×
2951
                                        "view 'query' must be a string");
2952
                        return NULL;
×
2953
                    }
2954
                    vd.with_query(PyUnicode_AsUTF8(query_obj));
5!
2955
                }
5✔
2956

2957
                PyObject *meta_obj =
5✔
2958
                    PyDict_GetItemString(item, "include_metadata");
5!
2959
                if (meta_obj && meta_obj != Py_None) {
5!
2960
                    vd.with_include_metadata(PyObject_IsTrue(meta_obj));
1!
2961
                }
1✔
2962
            } else {
5✔
2963
                PyErr_SetString(PyExc_TypeError,
×
2964
                                "views list must contain strings or dicts");
2965
                return NULL;
×
2966
            }
2967
            views.push_back(std::move(vd));
6!
2968
        }
6!
2969
    }
5✔
2970

2971
    IpcCompression compression = IpcCompression::ZSTD;
13✔
2972
    if (compression_str) {
13!
2973
        std::string comp_lower(compression_str);
13!
2974
        for (auto &c : comp_lower) c = std::tolower(c);
65!
2975
        if (comp_lower == "none") {
13✔
2976
            compression = IpcCompression::NONE;
1✔
2977
        } else if (comp_lower == "zstd") {
13!
2978
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
2979
            compression = IpcCompression::ZSTD;
12✔
2980
#else
2981
            PyErr_SetString(
2982
                PyExc_ValueError,
2983
                "ZSTD compression not available (built without ZSTD)");
2984
            return NULL;
2985
#endif
2986
        } else {
12✔
2987
            PyErr_Format(PyExc_ValueError,
×
2988
                         "Unknown compression: %s (use 'none' or 'zstd')",
UNCOV
2989
                         compression_str);
×
2990
            return NULL;
×
2991
        }
2992
    }
13!
2993

2994
    int64_t chunk_size_bytes =
13✔
2995
        static_cast<int64_t>(chunk_size_mb) * 1024 * 1024;
13✔
2996

2997
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
13!
2998
    std::string index_path;
13✔
2999
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
13!
3000
    if (idx && idx[0] != '\0') {
13!
3001
        index_path = idx;
×
UNCOV
3002
    }
×
3003
    std::size_t checkpoint_size = self->checkpoint_size;
13✔
3004

3005
    std::string output_path(path);
13!
3006
    WriteArrowResult result;
13✔
3007
    std::string error_msg;
13✔
3008

3009
    Py_BEGIN_ALLOW_THREADS try {
13!
3010
        Runtime *rt = get_runtime(self);
13!
3011
        result =
13✔
3012
            rt->submit(write_arrow_pipeline(
39!
3013
                           file_path, index_path, checkpoint_size,
13!
3014
                           std::move(views), output_path, chunk_size_bytes,
13!
3015
                           compression, static_cast<std::size_t>(batch_size)),
13✔
3016
                       "write_arrow")
13!
3017
                .get();
13!
3018
    } catch (const std::exception &e) {
13!
3019
        error_msg = e.what();
×
3020
    }
×
3021
    Py_END_ALLOW_THREADS
13!
3022

3023
        if (!error_msg.empty()) {
13!
3024
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
3025
        return NULL;
×
3026
    }
3027

3028
    if (!result.error.empty()) {
13!
3029
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
3030
        return NULL;
×
3031
    }
3032

3033
    // Build result dict
3034
    PyObject *dict = PyDict_New();
13!
3035
    if (!dict) return NULL;
13!
3036

3037
    // Build files list per partition
3038
    PyObject *partitions_dict = PyDict_New();
13!
3039
    if (!partitions_dict) {
13!
UNCOV
3040
        Py_DECREF(dict);
×
3041
        return NULL;
×
3042
    }
3043

3044
    for (const auto &[partition_name, partition_stats] :
83!
3045
         result.stats.partitions) {
13✔
3046
        PyObject *partition_dict = PyDict_New();
14!
3047
        if (!partition_dict) {
14!
UNCOV
3048
            Py_DECREF(partitions_dict);
×
UNCOV
3049
            Py_DECREF(dict);
×
3050
            return NULL;
×
3051
        }
3052

3053
        PyObject *files_list = PyList_New(0);
14!
3054
        if (!files_list) {
14!
UNCOV
3055
            Py_DECREF(partition_dict);
×
UNCOV
3056
            Py_DECREF(partitions_dict);
×
UNCOV
3057
            Py_DECREF(dict);
×
3058
            return NULL;
×
3059
        }
3060

3061
        for (const auto &f : partition_stats.files) {
22✔
3062
            PyObject *file_str = PyUnicode_FromString(f.c_str());
8!
3063
            if (!file_str || PyList_Append(files_list, file_str) < 0) {
8!
3064
                Py_XDECREF(file_str);
×
UNCOV
3065
                Py_DECREF(files_list);
×
UNCOV
3066
                Py_DECREF(partition_dict);
×
UNCOV
3067
                Py_DECREF(partitions_dict);
×
UNCOV
3068
                Py_DECREF(dict);
×
3069
                return NULL;
×
3070
            }
3071
            Py_DECREF(file_str);
8!
3072
        }
3073

3074
        PyDict_SetItemString(partition_dict, "files", files_list);
14!
3075
        dict_set_steal(partition_dict, "rows",
14!
3076
                       PyLong_FromLongLong(partition_stats.total_rows));
14!
3077
        dict_set_steal(
14!
3078
            partition_dict, "bytes",
14✔
3079
            PyLong_FromLongLong(partition_stats.total_uncompressed_bytes));
14!
3080
        Py_DECREF(files_list);
14!
3081

3082
        PyObject *key = partition_name.empty()
28!
UNCOV
3083
                            ? PyUnicode_FromString("_default")
×
3084
                            : PyUnicode_FromString(partition_name.c_str());
14!
3085
        PyDict_SetItem(partitions_dict, key, partition_dict);
14!
3086
        Py_DECREF(key);
14!
3087
        Py_DECREF(partition_dict);
14!
3088
    }
3089

3090
    PyDict_SetItemString(dict, "partitions", partitions_dict);
13!
3091
    dict_set_steal(dict, "total_rows",
13!
3092
                   PyLong_FromLongLong(result.stats.total_rows));
13!
3093
    dict_set_steal(dict, "total_bytes",
13!
3094
                   PyLong_FromLongLong(result.stats.total_uncompressed_bytes));
13!
3095
    dict_set_steal(dict, "chunks_scanned",
13!
3096
                   PyLong_FromUnsignedLongLong(result.chunks_scanned));
13!
3097
    dict_set_steal(dict, "chunks_skipped",
13!
3098
                   PyLong_FromUnsignedLongLong(result.chunks_skipped));
13!
3099
    Py_DECREF(partitions_dict);
13!
3100

3101
    return dict;
13✔
3102
}
13✔
3103

3104
static PyObject *TraceReader_get_view_chunks(TraceReaderObject *self,
3✔
3105
                                             PyObject *args, PyObject *kwds) {
3106
    static const char *kwlist[] = {"view", NULL};
3107
    PyObject *view_obj = Py_None;
3✔
3108

3109
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char **)kwlist,
3!
3110
                                     &view_obj)) {
3111
        return NULL;
×
3112
    }
3113

3114
    ViewDefinition view;
3✔
3115
    if (view_obj && view_obj != Py_None) {
3!
3116
        if (PyUnicode_Check(view_obj)) {
1!
3117
            const char *name = PyUnicode_AsUTF8(view_obj);
×
3118
            if (!name) return NULL;
×
3119
            std::string name_str(name);
×
3120
            if (name_str == "io") {
×
3121
                view = ViewDefinition::io_view();
×
3122
            } else if (name_str == "compute") {
×
3123
                view = ViewDefinition::compute_view();
×
3124
            } else if (name_str == "dlio") {
×
3125
                view = ViewDefinition::dlio_view();
×
UNCOV
3126
            } else {
×
3127
                view.with_name(name_str);
×
3128
            }
3129
        } else if (PyDict_Check(view_obj)) {
1!
3130
            PyObject *name_obj = PyDict_GetItemString(view_obj, "name");
1!
3131
            if (name_obj && PyUnicode_Check(name_obj)) {
1!
3132
                view.with_name(PyUnicode_AsUTF8(name_obj));
1!
3133
            }
1✔
3134
            PyObject *query_obj = PyDict_GetItemString(view_obj, "query");
1!
3135
            if (query_obj && query_obj != Py_None &&
1!
3136
                PyUnicode_Check(query_obj)) {
1✔
3137
                view.with_query(PyUnicode_AsUTF8(query_obj));
1!
3138
            }
1✔
3139
        } else {
1✔
3140
            PyErr_SetString(PyExc_TypeError, "view must be a string or dict");
×
3141
            return NULL;
×
3142
        }
3143
    }
1✔
3144

3145
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
3!
3146
    std::string index_path;
3✔
3147
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
3!
3148
    if (idx && idx[0] != '\0') {
3!
3149
        index_path = idx;
×
UNCOV
3150
    }
×
3151
    std::size_t checkpoint_size = self->checkpoint_size;
3✔
3152

3153
    GetViewChunksResult result;
3✔
3154
    std::string error_msg;
3✔
3155

3156
    Py_BEGIN_ALLOW_THREADS try {
3!
3157
        Runtime *rt = get_runtime(self);
3!
3158
        result = rt->submit(get_view_chunks_pipeline(file_path, index_path,
12!
3159
                                                     checkpoint_size, view),
3!
3160
                            "get_view_chunks")
3!
3161
                     .get();
3!
3162
    } catch (const std::exception &e) {
3!
3163
        error_msg = e.what();
×
3164
    }
×
3165
    Py_END_ALLOW_THREADS
3!
3166

3167
        if (!error_msg.empty()) {
3!
3168
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
3169
        return NULL;
×
3170
    }
3171

3172
    if (!result.error.empty()) {
3!
3173
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
3174
        return NULL;
×
3175
    }
3176

3177
    PyObject *dict = PyDict_New();
3!
3178
    if (!dict) return NULL;
3!
3179

3180
    PyObject *chunks_list = PyList_New(result.chunks.size());
3!
3181
    if (!chunks_list) {
3!
UNCOV
3182
        Py_DECREF(dict);
×
3183
        return NULL;
×
3184
    }
3185

3186
    for (std::size_t i = 0; i < result.chunks.size(); ++i) {
7✔
3187
        const auto &chunk = result.chunks[i];
4✔
3188
        PyObject *chunk_dict = PyDict_New();
4!
3189
        if (!chunk_dict) {
4!
UNCOV
3190
            Py_DECREF(chunks_list);
×
UNCOV
3191
            Py_DECREF(dict);
×
3192
            return NULL;
×
3193
        }
3194
        dict_set_steal(chunk_dict, "checkpoint_idx",
4!
3195
                       PyLong_FromUnsignedLongLong(chunk.checkpoint_idx));
4!
3196
        dict_set_steal(chunk_dict, "start_byte",
4!
3197
                       PyLong_FromSize_t(chunk.start_byte));
4!
3198
        dict_set_steal(chunk_dict, "end_byte",
4!
3199
                       PyLong_FromSize_t(chunk.end_byte));
4!
3200
        PyList_SetItem(chunks_list, i, chunk_dict);
4!
3201
    }
4✔
3202

3203
    PyDict_SetItemString(dict, "chunks", chunks_list);
3!
3204
    dict_set_steal(dict, "total_checkpoints",
3!
3205
                   PyLong_FromUnsignedLongLong(result.total_checkpoints));
3!
3206
    dict_set_steal(dict, "skipped_checkpoints",
3!
3207
                   PyLong_FromUnsignedLongLong(result.skipped_checkpoints));
3!
3208
    dict_set_steal(dict, "file_may_match",
3!
3209
                   PyBool_FromLong(result.file_may_match ? 1 : 0));
3!
3210
    Py_DECREF(chunks_list);
3!
3211

3212
    return dict;
3✔
3213
}
3✔
3214

3215
static PyObject *TraceReader_write_view_chunk(TraceReaderObject *self,
1✔
3216
                                              PyObject *args, PyObject *kwds) {
3217
    static const char *kwlist[] = {
3218
        "output_file", "checkpoint_idx", "start_byte", "end_byte",
3219
        "view",        "compression",    "batch_size", NULL};
3220
    const char *output_file = NULL;
1✔
3221
    unsigned long long checkpoint_idx = 0;
1✔
3222
    Py_ssize_t start_byte = 0;
1✔
3223
    Py_ssize_t end_byte = 0;
1✔
3224
    PyObject *view_obj = Py_None;
1✔
3225
    const char *compression_str = "zstd";
1✔
3226
    Py_ssize_t batch_size = 10000;
1✔
3227

3228
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "sKnn|Osn", (char **)kwlist,
1!
3229
                                     &output_file, &checkpoint_idx, &start_byte,
3230
                                     &end_byte, &view_obj, &compression_str,
3231
                                     &batch_size)) {
3232
        return NULL;
×
3233
    }
3234

3235
    IpcCompression compression = IpcCompression::ZSTD;
1✔
3236
    if (compression_str) {
1!
3237
        std::string comp_lower(compression_str);
1✔
3238
        for (auto &c : comp_lower) c = std::tolower(c);
5!
3239
        if (comp_lower == "none") {
1!
3240
            compression = IpcCompression::NONE;
×
3241
        } else if (comp_lower == "zstd") {
1!
3242
#ifdef DFTRACER_UTILS_ENABLE_ZSTD
3243
            compression = IpcCompression::ZSTD;
1✔
3244
#else
3245
            PyErr_SetString(PyExc_ValueError, "ZSTD compression not available");
3246
            return NULL;
3247
#endif
3248
        }
1✔
3249
    }
1✔
3250

3251
    ViewDefinition view;
1✔
3252
    if (view_obj && view_obj != Py_None) {
1!
3253
        if (PyUnicode_Check(view_obj)) {
×
3254
            const char *name = PyUnicode_AsUTF8(view_obj);
×
3255
            if (!name) return NULL;
×
3256
            std::string name_str(name);
×
3257
            if (name_str == "io") {
×
3258
                view = ViewDefinition::io_view();
×
3259
            } else if (name_str == "compute") {
×
3260
                view = ViewDefinition::compute_view();
×
3261
            } else if (name_str == "dlio") {
×
3262
                view = ViewDefinition::dlio_view();
×
UNCOV
3263
            } else {
×
3264
                view.with_name(name_str);
×
3265
            }
3266
        } else if (PyDict_Check(view_obj)) {
×
3267
            PyObject *name_obj = PyDict_GetItemString(view_obj, "name");
×
3268
            if (name_obj && PyUnicode_Check(name_obj)) {
×
3269
                view.with_name(PyUnicode_AsUTF8(name_obj));
×
UNCOV
3270
            }
×
3271
            PyObject *query_obj = PyDict_GetItemString(view_obj, "query");
×
3272
            if (query_obj && query_obj != Py_None &&
×
3273
                PyUnicode_Check(query_obj)) {
×
3274
                view.with_query(PyUnicode_AsUTF8(query_obj));
×
UNCOV
3275
            }
×
UNCOV
3276
        }
×
UNCOV
3277
    }
×
3278

3279
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
1!
3280
    std::string index_path;
1✔
3281
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
1!
3282
    if (idx && idx[0] != '\0') {
1!
3283
        index_path = idx;
×
UNCOV
3284
    }
×
3285
    std::size_t checkpoint_size = self->checkpoint_size;
1✔
3286

3287
    WriteViewChunkResult result;
1✔
3288
    std::string error_msg;
1✔
3289

3290
    Py_BEGIN_ALLOW_THREADS try {
1!
3291
        Runtime *rt = get_runtime(self);
1!
3292
        result =
1✔
3293
            rt->submit(write_view_chunk_pipeline(
3!
3294
                           file_path, index_path, checkpoint_size, view,
1!
3295
                           checkpoint_idx, static_cast<std::size_t>(start_byte),
1✔
3296
                           static_cast<std::size_t>(end_byte),
1✔
3297
                           std::string(output_file), compression,
1!
3298
                           static_cast<std::size_t>(batch_size)),
1✔
3299
                       "write_view_chunk")
1!
3300
                .get();
1!
3301
    } catch (const std::exception &e) {
1!
3302
        error_msg = e.what();
×
3303
    }
×
3304
    Py_END_ALLOW_THREADS
1!
3305

3306
        if (!error_msg.empty()) {
1!
3307
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
3308
        return NULL;
×
3309
    }
3310

3311
    if (!result.error.empty()) {
1!
3312
        PyErr_SetString(PyExc_RuntimeError, result.error.c_str());
×
3313
        return NULL;
×
3314
    }
3315

3316
    PyObject *dict = PyDict_New();
1!
3317
    if (!dict) return NULL;
1!
3318

3319
    dict_set_steal(dict, "output_file",
1!
3320
                   PyUnicode_FromString(result.output_file.c_str()));
1!
3321
    dict_set_steal(dict, "events_matched",
1!
3322
                   PyLong_FromUnsignedLongLong(result.events_matched));
1!
3323
    dict_set_steal(dict, "events_scanned",
1!
3324
                   PyLong_FromUnsignedLongLong(result.events_scanned));
1!
3325
    dict_set_steal(dict, "rows_written",
1!
3326
                   PyLong_FromLongLong(result.rows_written));
1!
3327
    dict_set_steal(dict, "bytes_written",
1!
3328
                   PyLong_FromLongLong(result.bytes_written));
1!
3329

3330
    return dict;
1✔
3331
}
1✔
3332

3333
static PyObject *TraceReader_write_view_chunks(TraceReaderObject *self,
1✔
3334
                                               PyObject *args, PyObject *kwds) {
3335
    static const char *kwlist[] = {"chunks",      "output_dir", "view",
3336
                                   "compression", "batch_size", NULL};
3337
    PyObject *chunks_list = NULL;
1✔
3338
    const char *output_dir = NULL;
1✔
3339
    PyObject *view_obj = Py_None;
1✔
3340
    const char *compression_str = "zstd";
1✔
3341
    Py_ssize_t batch_size = 10000;
1✔
3342

3343
    if (!PyArg_ParseTupleAndKeywords(args, kwds, "Os|Osn", (char **)kwlist,
1!
3344
                                     &chunks_list, &output_dir, &view_obj,
3345
                                     &compression_str, &batch_size)) {
3346
        return NULL;
×
3347
    }
3348

3349
    if (!PyList_Check(chunks_list)) {
1!
3350
        PyErr_SetString(PyExc_TypeError, "chunks must be a list");
×
3351
        return NULL;
×
3352
    }
3353

3354
    IpcCompression compression = IpcCompression::ZSTD;
1✔
3355
    if (strcmp(compression_str, "none") == 0) {
1!
3356
        compression = IpcCompression::NONE;
×
3357
    } else if (strcmp(compression_str, "zstd") != 0) {
1!
3358
        PyErr_SetString(PyExc_ValueError,
×
3359
                        "compression must be 'zstd' or 'none'");
3360
        return NULL;
×
3361
    }
3362

3363
    ViewDefinition view;
1✔
3364
    if (view_obj && view_obj != Py_None) {
1!
3365
        if (PyUnicode_Check(view_obj)) {
×
3366
            const char *name = PyUnicode_AsUTF8(view_obj);
×
3367
            if (!name) return NULL;
×
3368
            std::string name_str(name);
×
3369
            if (name_str == "io") {
×
3370
                view = ViewDefinition::io_view();
×
3371
            } else if (name_str == "compute") {
×
3372
                view = ViewDefinition::compute_view();
×
3373
            } else if (name_str == "dlio") {
×
3374
                view = ViewDefinition::dlio_view();
×
UNCOV
3375
            } else {
×
3376
                view.with_name(name_str);
×
3377
            }
3378
        } else if (PyDict_Check(view_obj)) {
×
3379
            PyObject *name_obj = PyDict_GetItemString(view_obj, "name");
×
3380
            if (name_obj && PyUnicode_Check(name_obj)) {
×
3381
                view.with_name(PyUnicode_AsUTF8(name_obj));
×
UNCOV
3382
            }
×
3383
            PyObject *query_obj = PyDict_GetItemString(view_obj, "query");
×
3384
            if (query_obj && query_obj != Py_None &&
×
3385
                PyUnicode_Check(query_obj)) {
×
3386
                view.with_query(PyUnicode_AsUTF8(query_obj));
×
UNCOV
3387
            }
×
UNCOV
3388
        }
×
UNCOV
3389
    }
×
3390

3391
    std::vector<ChunkDescriptor> chunks;
1✔
3392
    Py_ssize_t num_chunks = PyList_Size(chunks_list);
1!
3393
    chunks.reserve(static_cast<std::size_t>(num_chunks));
1!
3394

3395
    for (Py_ssize_t i = 0; i < num_chunks; i++) {
4✔
3396
        PyObject *chunk_dict = PyList_GetItem(chunks_list, i);
3!
3397
        if (!PyDict_Check(chunk_dict)) {
3!
3398
            PyErr_SetString(PyExc_TypeError, "each chunk must be a dict");
×
3399
            return NULL;
×
3400
        }
3401

3402
        ChunkDescriptor desc;
3✔
3403

3404
        PyObject *cp_idx = PyDict_GetItemString(chunk_dict, "checkpoint_idx");
3!
3405
        PyObject *start = PyDict_GetItemString(chunk_dict, "start_byte");
3!
3406
        PyObject *end = PyDict_GetItemString(chunk_dict, "end_byte");
3!
3407

3408
        if (!cp_idx || !start || !end) {
3!
3409
            PyErr_SetString(
×
UNCOV
3410
                PyExc_KeyError,
×
3411
                "chunk must have checkpoint_idx, start_byte, end_byte");
3412
            return NULL;
×
3413
        }
3414

3415
        desc.checkpoint_idx =
3✔
3416
            static_cast<std::uint64_t>(PyLong_AsUnsignedLongLong(cp_idx));
3!
3417
        desc.start_byte =
3✔
3418
            static_cast<std::size_t>(PyLong_AsUnsignedLongLong(start));
3!
3419
        desc.end_byte =
3✔
3420
            static_cast<std::size_t>(PyLong_AsUnsignedLongLong(end));
3!
3421

3422
        char filename[64];
3423
        snprintf(filename, sizeof(filename), "chunk-%05llu.arrow",
6✔
3424
                 (unsigned long long)desc.checkpoint_idx);
3✔
3425
        desc.output_file = std::string(output_dir) + "/" + filename;
3!
3426

3427
        chunks.push_back(std::move(desc));
3!
3428
    }
3!
3429

3430
    std::string file_path = PyUnicode_AsUTF8(self->file_path);
1!
3431
    std::string index_path;
1✔
3432
    const char *idx = PyUnicode_AsUTF8(self->index_dir);
1!
3433
    if (idx && idx[0] != '\0') {
1!
3434
        index_path = idx;
×
UNCOV
3435
    }
×
3436
    std::size_t checkpoint_size = self->checkpoint_size;
1✔
3437

3438
    WriteViewChunksResult result;
1✔
3439
    std::string error_msg;
1✔
3440

3441
    Py_BEGIN_ALLOW_THREADS try {
1!
3442
        Runtime *rt = get_runtime(self);
1!
3443
        result = rt->submit(write_view_chunks_pipeline(
4!
3444
                                file_path, index_path, checkpoint_size, view,
1!
3445
                                std::move(chunks), compression,
1✔
3446
                                static_cast<std::size_t>(batch_size)),
1✔
3447
                            "write_view_chunks")
1!
3448
                     .get();
1!
3449
    } catch (const std::exception &e) {
1!
3450
        error_msg = e.what();
×
3451
    }
×
3452
    Py_END_ALLOW_THREADS
1!
3453

3454
        if (!error_msg.empty()) {
1!
3455
        PyErr_SetString(PyExc_RuntimeError, error_msg.c_str());
×
3456
        return NULL;
×
3457
    }
3458

3459
    PyObject *dict = PyDict_New();
1!
3460
    if (!dict) return NULL;
1!
3461

3462
    PyObject *results_list =
1✔
3463
        PyList_New(static_cast<Py_ssize_t>(result.results.size()));
1!
3464
    if (!results_list) {
1!
UNCOV
3465
        Py_DECREF(dict);
×
3466
        return NULL;
×
3467
    }
3468

3469
    for (std::size_t i = 0; i < result.results.size(); i++) {
4✔
3470
        const auto &r = result.results[i];
3✔
3471
        PyObject *item = PyDict_New();
3!
3472
        if (!item) {
3!
UNCOV
3473
            Py_DECREF(results_list);
×
UNCOV
3474
            Py_DECREF(dict);
×
3475
            return NULL;
×
3476
        }
3477
        dict_set_steal(item, "output_file",
3!
3478
                       PyUnicode_FromString(r.output_file.c_str()));
3!
3479
        dict_set_steal(item, "rows_written",
3!
3480
                       PyLong_FromLongLong(r.rows_written));
3!
3481
        dict_set_steal(item, "events_matched",
3!
3482
                       PyLong_FromUnsignedLongLong(r.events_matched));
3!
3483
        if (!r.error.empty()) {
3!
3484
            dict_set_steal(item, "error",
×
UNCOV
3485
                           PyUnicode_FromString(r.error.c_str()));
×
UNCOV
3486
        }
×
3487
        PyList_SetItem(results_list, static_cast<Py_ssize_t>(i), item);
3!
3488
    }
3✔
3489

3490
    PyDict_SetItemString(dict, "results", results_list);
1!
3491
    Py_DECREF(results_list);
1!
3492
    dict_set_steal(dict, "total_rows", PyLong_FromLongLong(result.total_rows));
1!
3493
    dict_set_steal(dict, "total_events_matched",
1!
3494
                   PyLong_FromLongLong(result.total_events_matched));
1!
3495

3496
    return dict;
1✔
3497
}
1✔
3498

3499
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
3500

3501
static PyObject *TraceReader_enter(TraceReaderObject *self,
58✔
3502
                                   PyObject *Py_UNUSED(ignored)) {
3503
    Py_INCREF(self);
58✔
3504
    return (PyObject *)self;
58✔
3505
}
3506

3507
static PyObject *TraceReader_exit(TraceReaderObject *self, PyObject *args) {
57✔
3508
    Py_RETURN_NONE;
57✔
3509
}
3510

3511
static PyObject *TraceReader_get_file_path(TraceReaderObject *self,
7✔
3512
                                           void *closure) {
3513
    Py_INCREF(self->file_path);
7✔
3514
    return self->file_path;
7✔
3515
}
3516

3517
static PyObject *TraceReader_get_index_dir(TraceReaderObject *self,
3✔
3518
                                           void *closure) {
3519
    Py_INCREF(self->index_dir);
3✔
3520
    return self->index_dir;
3✔
3521
}
3522

3523
static PyObject *TraceReader_get_has_index(TraceReaderObject *self,
6✔
3524
                                           void *closure) {
3525
    return PyBool_FromLong(self->has_index);
6✔
3526
}
3527

3528
static PyObject *TraceReader_get_num_lines_prop(TraceReaderObject *self,
4✔
3529
                                                void *closure) {
3530
    try {
3531
        TraceReaderConfig cfg = build_config(self);
4!
3532
        TraceReader reader(std::move(cfg));
4!
3533
        std::size_t n = reader.get_num_lines();
4!
3534
        if (n > 0) return PyLong_FromSize_t(n);
4!
3535
    } catch (...) {
4!
3536
    }
×
3537
    PyObject *empty_args = PyTuple_New(0);
4✔
3538
    if (!empty_args) return NULL;
4!
3539
    PyObject *list = TraceReader_read_lines(self, empty_args, NULL);
4✔
3540
    Py_DECREF(empty_args);
4✔
3541
    if (!list) return NULL;
4!
3542
    Py_ssize_t n = PyList_GET_SIZE(list);
4✔
3543
    Py_DECREF(list);
4✔
3544
    return PyLong_FromSsize_t(n);
4✔
3545
}
4✔
3546

3547
static PyObject *TraceReader_get_max_bytes(TraceReaderObject *self,
12✔
3548
                                           PyObject *Py_UNUSED(ignored)) {
3549
    try {
3550
        TraceReaderConfig cfg = build_config(self);
12!
3551
        TraceReader reader(std::move(cfg));
12!
3552
        return PyLong_FromSize_t(reader.get_max_bytes());
12!
3553
    } catch (const std::exception &e) {
12!
NEW
3554
        set_typed_py_error(e);
×
3555
        return NULL;
×
3556
    }
×
3557
}
12✔
3558

3559
static PyObject *TraceReader_get_num_lines(TraceReaderObject *self,
3✔
3560
                                           PyObject *Py_UNUSED(ignored)) {
3561
    try {
3562
        TraceReaderConfig cfg = build_config(self);
3!
3563
        TraceReader reader(std::move(cfg));
3!
3564
        return PyLong_FromSize_t(reader.get_num_lines());
3!
3565
    } catch (const std::exception &e) {
3!
NEW
3566
        set_typed_py_error(e);
×
3567
        return NULL;
×
3568
    }
×
3569
}
3✔
3570

3571
static PyMethodDef TraceReader_methods[] = {
3572
    {"iter_lines", (PyCFunction)TraceReader_iter_lines,
3573
     METH_VARARGS | METH_KEYWORDS,
3574
     "Return an iterator over decoded lines.\n"
3575
     "\n"
3576
     "Args:\n"
3577
     "    start_line (int): First line (0 = beginning).\n"
3578
     "    end_line (int): Last line (0 = end of file).\n"
3579
     "    start_byte (int): First byte offset (0 = beginning).\n"
3580
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3581
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3582
    {"iter_raw", (PyCFunction)TraceReader_iter_raw,
3583
     METH_VARARGS | METH_KEYWORDS,
3584
     "Return an iterator over raw byte chunks.\n"
3585
     "\n"
3586
     "Args:\n"
3587
     "    start_line (int): First line (0 = beginning).\n"
3588
     "    end_line (int): Last line (0 = end of file).\n"
3589
     "    start_byte (int): First byte offset (0 = beginning).\n"
3590
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3591
     "    buffer_size (int): Internal read buffer size in bytes.\n"
3592
     "    line_aligned (bool): Align chunks to line boundaries.\n"
3593
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
3594
    {"read_lines", (PyCFunction)TraceReader_read_lines,
3595
     METH_VARARGS | METH_KEYWORDS,
3596
     "Read all lines and return as list.\n"
3597
     "\n"
3598
     "Args:\n"
3599
     "    start_line (int): First line (0 = beginning).\n"
3600
     "    end_line (int): Last line (0 = end of file).\n"
3601
     "    start_byte (int): First byte offset (0 = beginning).\n"
3602
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3603
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3604
    {"iter_json", (PyCFunction)TraceReader_iter_json,
3605
     METH_VARARGS | METH_KEYWORDS,
3606
     "Return an iterator over parsed JSON events as Python dicts.\n"
3607
     "\n"
3608
     "Each event is parsed once in C++ (single-pass simdjson ondemand)\n"
3609
     "and yielded as a Python dict. No double-parsing overhead.\n"
3610
     "\n"
3611
     "Args:\n"
3612
     "    start_line (int): First line (0 = beginning).\n"
3613
     "    end_line (int): Last line (0 = end of file).\n"
3614
     "    start_byte (int): First byte offset (0 = beginning).\n"
3615
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3616
     "    buffer_size (int): Internal read buffer size in bytes.\n"
3617
     "    query (str): Optional query filter.\n"
3618
     "    batch_size (int): Events per internal batch (default 1024).\n"},
3619
    {"read_json", (PyCFunction)TraceReader_read_json_py,
3620
     METH_VARARGS | METH_KEYWORDS,
3621
     "Read all events as parsed Python dicts (list).\n"
3622
     "\n"
3623
     "Equivalent to list(iter_json(...)).\n"},
3624
    {"read_raw", (PyCFunction)TraceReader_read_raw,
3625
     METH_VARARGS | METH_KEYWORDS,
3626
     "Read all raw chunks and return as list.\n"
3627
     "\n"
3628
     "Args:\n"
3629
     "    start_line (int): First line (0 = beginning).\n"
3630
     "    end_line (int): Last line (0 = end of file).\n"
3631
     "    start_byte (int): First byte offset (0 = beginning).\n"
3632
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3633
     "    buffer_size (int): Internal read buffer size in bytes.\n"
3634
     "    line_aligned (bool): Align chunks to line boundaries.\n"
3635
     "    multi_line (bool): Allow multiple lines per chunk.\n"},
3636
#ifdef DFTRACER_UTILS_ENABLE_ARROW
3637
    {"iter_arrow", (PyCFunction)TraceReader_iter_arrow,
3638
     METH_VARARGS | METH_KEYWORDS,
3639
     "Return an iterator over Arrow record batches.\n"
3640
     "\n"
3641
     "Args:\n"
3642
     "    batch_size (int): Maximum rows per Arrow batch.\n"
3643
     "    start_line (int): First line (0 = beginning).\n"
3644
     "    end_line (int): Last line (0 = end of file).\n"
3645
     "    start_byte (int): First byte offset (0 = beginning).\n"
3646
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3647
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3648
    {"iter_arrow_stream", (PyCFunction)TraceReader_iter_arrow_stream,
3649
     METH_VARARGS | METH_KEYWORDS,
3650
     "Return an _ArrowBatchStream that exposes Arrow record batches\n"
3651
     "via the Arrow C Data Interface stream protocol\n"
3652
     "(__arrow_c_stream__). PyArrow can drain the producer channel\n"
3653
     "with a single call, without per-batch Python iteration.\n"},
3654
    {"read_arrow", (PyCFunction)TraceReader_read_arrow,
3655
     METH_VARARGS | METH_KEYWORDS,
3656
     "Read all events as a materialized ArrowTable.\n"
3657
     "\n"
3658
     "Args:\n"
3659
     "    batch_size (int): Maximum rows per Arrow batch.\n"
3660
     "    start_line (int): First line (0 = beginning).\n"
3661
     "    end_line (int): Last line (0 = end of file).\n"
3662
     "    start_byte (int): First byte offset (0 = beginning).\n"
3663
     "    end_byte (int): Last byte offset (0 = end of file).\n"
3664
     "    buffer_size (int): Internal read buffer size in bytes.\n"},
3665
#endif
3666
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
3667
    {"write_arrow", (PyCFunction)TraceReader_write_arrow,
3668
     METH_VARARGS | METH_KEYWORDS,
3669
     "Write trace data to partitioned Arrow IPC files.\n"
3670
     "\n"
3671
     "Args:\n"
3672
     "    path (str): Output directory path.\n"
3673
     "    partition_by (list[str] or None): Column names to partition by.\n"
3674
     "    num_buckets (int): Number of hash buckets (0 = no bucketing).\n"
3675
     "    chunk_size_mb (int): Max uncompressed MB per file (default 32).\n"
3676
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
3677
     "    batch_size (int): Rows per internal batch (default 10000).\n"
3678
     "    normalize (bool): Use normalized schema (default False).\n"
3679
     "\n"
3680
     "Returns:\n"
3681
     "    dict: Statistics including partitions, total_rows, total_bytes.\n"},
3682
    {"get_view_chunks", (PyCFunction)TraceReader_get_view_chunks,
3683
     METH_VARARGS | METH_KEYWORDS,
3684
     "Get candidate chunks for a view after bloom filter pruning.\n"
3685
     "\n"
3686
     "Args:\n"
3687
     "    view (str or dict): View name ('io', 'compute', 'dlio') or\n"
3688
     "                        dict with 'name' and optional 'query'.\n"
3689
     "\n"
3690
     "Returns:\n"
3691
     "    dict: chunks list, total_checkpoints, skipped_checkpoints.\n"},
3692
    {"write_view_chunk", (PyCFunction)TraceReader_write_view_chunk,
3693
     METH_VARARGS | METH_KEYWORDS,
3694
     "Write a single chunk to an Arrow IPC file.\n"
3695
     "\n"
3696
     "Args:\n"
3697
     "    output_file (str): Path to output Arrow IPC file.\n"
3698
     "    checkpoint_idx (int): Checkpoint index.\n"
3699
     "    start_byte (int): Start byte offset.\n"
3700
     "    end_byte (int): End byte offset.\n"
3701
     "    view (str or dict): View definition.\n"
3702
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
3703
     "    batch_size (int): Events per batch (default 10000).\n"
3704
     "\n"
3705
     "Returns:\n"
3706
     "    dict: output_file, events_matched, rows_written, bytes_written.\n"},
3707
    {"write_view_chunks", (PyCFunction)TraceReader_write_view_chunks,
3708
     METH_VARARGS | METH_KEYWORDS,
3709
     "Write multiple chunks to Arrow IPC files in parallel.\n"
3710
     "\n"
3711
     "All chunks are processed concurrently on the Runtime thread pool.\n"
3712
     "\n"
3713
     "Args:\n"
3714
     "    chunks (list): List of dicts with checkpoint_idx, start_byte, "
3715
     "end_byte.\n"
3716
     "    output_dir (str): Directory for output Arrow IPC files.\n"
3717
     "    view (str or dict): View definition.\n"
3718
     "    compression (str): 'zstd' or 'none' (default 'zstd').\n"
3719
     "    batch_size (int): Events per batch (default 10000).\n"
3720
     "\n"
3721
     "Returns:\n"
3722
     "    dict: results list, total_rows, total_events_matched.\n"},
3723
#endif
3724
    {"get_max_bytes", (PyCFunction)TraceReader_get_max_bytes, METH_NOARGS,
3725
     "Get the maximum byte position (0 if unknown for compressed\n"
3726
     "files without index)."},
3727
    {"get_num_lines", (PyCFunction)TraceReader_get_num_lines, METH_NOARGS,
3728
     "Get the total number of lines (0 if unknown for files without\n"
3729
     "index)."},
3730
    {"__enter__", (PyCFunction)TraceReader_enter, METH_NOARGS,
3731
     "Enter the runtime context for the with statement."},
3732
    {"__exit__", (PyCFunction)TraceReader_exit, METH_VARARGS,
3733
     "Exit the runtime context for the with statement.\n"
3734
     "\n"
3735
     "TraceReader does not own the shared RocksDB instance for an index path;\n"
3736
     "any shared DB lifetime remains manager-owned on the native side."},
3737
    {NULL}};
3738

3739
static PyGetSetDef TraceReader_getsetters[] = {
3740
    {"path", (getter)TraceReader_get_file_path, NULL,
3741
     "Path to the trace file or directory", NULL},
3742
    {"index_dir", (getter)TraceReader_get_index_dir, NULL,
3743
     "Directory for index files", NULL},
3744
    {"has_index", (getter)TraceReader_get_has_index, NULL,
3745
     "True if a checkpoint index was found", NULL},
3746
    {"num_lines", (getter)TraceReader_get_num_lines_prop, NULL,
3747
     "Total line count (reads all lines if needed)", NULL},
3748
    {NULL}};
3749

3750
PyTypeObject TraceReaderType = {
3751
    PyVarObject_HEAD_INIT(NULL, 0) "dftracer_utils_ext.TraceReader",
3752
    sizeof(TraceReaderObject),                /* tp_basicsize */
3753
    0,                                        /* tp_itemsize */
3754
    (destructor)TraceReader_dealloc,          /* tp_dealloc */
3755
    0,                                        /* tp_vectorcall_offset */
3756
    0,                                        /* tp_getattr */
3757
    0,                                        /* tp_setattr */
3758
    0,                                        /* tp_as_async */
3759
    0,                                        /* tp_repr */
3760
    0,                                        /* tp_as_number */
3761
    0,                                        /* tp_as_sequence */
3762
    0,                                        /* tp_as_mapping */
3763
    0,                                        /* tp_hash */
3764
    0,                                        /* tp_call */
3765
    0,                                        /* tp_str */
3766
    0,                                        /* tp_getattro */
3767
    0,                                        /* tp_setattro */
3768
    0,                                        /* tp_as_buffer */
3769
    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
3770
    "TraceReader(file_path: str, index_dir: str = '',\n"
3771
    "            checkpoint_size: int = 33554432,\n"
3772
    "            auto_build_index: bool = False,\n"
3773
    "            runtime: Runtime | None = None)\n"
3774
    "--\n"
3775
    "\n"
3776
    "Smart trace file reader that auto-selects sequential or indexed\n"
3777
    "reading based on whether a ``.dftindex`` store exists.\n"
3778
    "\n"
3779
    "Args:\n"
3780
    "    file_path (str): Path to the trace file (.pfw.gz or plain "
3781
    "text).\n"
3782
    "    index_dir (str): Directory to search for ``.dftindex`` "
3783
    "stores.\n"
3784
    "        Empty string (default) searches next to the trace file.\n"
3785
    "    checkpoint_size (int): Checkpoint interval in bytes for index\n"
3786
    "        building (default 32 MB).\n"
3787
    "    auto_build_index (bool): If True, automatically build an "
3788
    "index\n"
3789
    "        when none exists.\n"
3790
    "    runtime (Runtime or None): Runtime instance for thread pool "
3791
    "control.\n"
3792
    "        If None, uses the default global Runtime.\n"
3793
    "\n"
3794
    "Raises:\n"
3795
    "    RuntimeError: If *file_path* does not exist or cannot be "
3796
    "opened.\n",                /* tp_doc */
3797
    0,                          /* tp_traverse */
3798
    0,                          /* tp_clear */
3799
    0,                          /* tp_richcompare */
3800
    0,                          /* tp_weaklistoffset */
3801
    0,                          /* tp_iter */
3802
    0,                          /* tp_iternext */
3803
    TraceReader_methods,        /* tp_methods */
3804
    0,                          /* tp_members */
3805
    TraceReader_getsetters,     /* tp_getset */
3806
    0,                          /* tp_base */
3807
    0,                          /* tp_dict */
3808
    0,                          /* tp_descr_get */
3809
    0,                          /* tp_descr_set */
3810
    0,                          /* tp_dictoffset */
3811
    (initproc)TraceReader_init, /* tp_init */
3812
    0,                          /* tp_alloc */
3813
    TraceReader_new,            /* tp_new */
3814
};
3815

3816
int init_trace_reader(PyObject *m) {
1✔
3817
    if (register_type(m, &TraceReaderType, "TraceReader") < 0) return -1;
1!
3818

3819
    return 0;
1✔
3820
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc