• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

llnl / dftracer-utils / 26139121164

20 May 2026 03:18AM UTC coverage: 50.8% (-1.4%) from 52.2%
26139121164

Pull #67

github

web-flow
Merge 0979ebb80 into 6c9aaa7c9
Pull Request #67: Added Clarion codes.

31484 of 79437 branches covered (39.63%)

Branch coverage included in aggregate %.

32397 of 46313 relevant lines covered (69.95%)

9978.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.31
/src/dftracer/utils/utilities/common/arrow/parallel_reader.cpp
1
#include <dftracer/utils/core/common/config.h>
2
#ifdef DFTRACER_UTILS_ENABLE_ARROW_IPC
3

4
#include <dftracer/utils/core/coro/when_all.h>
5
#include <dftracer/utils/core/tasks/coro_scope.h>
6
#include <dftracer/utils/utilities/common/arrow/ipc_reader.h>
7
#include <dftracer/utils/utilities/common/arrow/parallel_reader.h>
8

9
#include <exception>
10

11
namespace dftracer::utils::utilities::common::arrow {
12

13
using dftracer::utils::coro::CoroTask;
14
using dftracer::utils::coro::when_all;
15

16
CoroTask<ArrowFileReadResult> read_arrow_file_async(std::string path) {
119!
17
    ArrowFileReadResult result;
17!
18
    result.path = path;
17!
19

20
    try {
21
        IpcReader reader;
17✔
22
        int rc = reader.open(path);
17!
23
        if (rc != 0) {
17✔
24
            result.success = false;
1✔
25
            result.error = "Failed to open file: " + path;
1!
26
            co_return result;
18!
27
        }
28

29
        *result.batches = reader.read_all();
16!
30
        for (const auto& batch : *result.batches) {
39✔
31
            result.total_rows += batch.num_rows();
23✔
32
        }
23✔
33

34
        result.success = true;
16✔
35
    } catch (const std::exception& e) {
17!
36
        result.success = false;
×
37
        result.error = e.what();
×
38
    }
×
39

40
    co_return result;
16!
41
}
17✔
42

43
CoroTask<ParallelReadResult> read_arrow_files_parallel(
33!
44
    std::vector<std::string> paths) {
5!
45
    ParallelReadResult result;
13✔
46

47
    if (paths.empty()) {
13✔
48
        co_return result;
6!
49
    }
50

51
    std::vector<CoroTask<ArrowFileReadResult>> tasks;
12✔
52
    tasks.reserve(paths.size());
12!
53

54
    for (auto& path : paths) {
21✔
55
        tasks.push_back(read_arrow_file_async(std::move(path)));
9!
56
    }
9✔
57

58
    result.file_results = co_await when_all(std::move(tasks));
16!
59

60
    for (const auto& fr : result.file_results) {
13✔
61
        if (fr.success) {
9✔
62
            result.files_read++;
8✔
63
            result.total_rows += fr.total_rows;
8✔
64
            result.total_batches += fr.batches->size();
8✔
65
        } else {
8✔
66
            result.files_failed++;
1✔
67
        }
68
    }
9✔
69

70
    co_return result;
4!
71
}
21!
72

73
CoroTask<ParallelReadResult> read_arrow_files_streaming(
14!
74
    CoroScope& /*scope*/, std::vector<std::string> paths,
75
    FileResultCallback callback) {
2!
76
    if (paths.empty()) {
6!
77
        co_return ParallelReadResult{};
2!
78
    }
79

80
    std::vector<CoroTask<ArrowFileReadResult>> tasks;
6✔
81
    tasks.reserve(paths.size());
6!
82

83
    for (auto& path : paths) {
14✔
84
        tasks.push_back(read_arrow_file_async(std::move(path)));
8!
85
    }
8✔
86

87
    auto results = co_await when_all(std::move(tasks));
8!
88

89
    ParallelReadResult summary;
2✔
90
    bool cancelled = false;
2✔
91

92
    for (auto& result : results) {
10✔
93
        if (result.success) {
8!
94
            summary.files_read++;
8✔
95
            summary.total_rows += result.total_rows;
8✔
96
            summary.total_batches += result.batches->size();
8✔
97
        } else {
8✔
98
            summary.files_failed++;
×
99
        }
100

101
        if (!cancelled && !callback(std::move(result))) {
8!
102
            cancelled = true;
1✔
103
        }
1✔
104
    }
8✔
105

106
    co_return summary;
2!
107
}
10!
108

109
}  // namespace dftracer::utils::utilities::common::arrow
110

111
#endif  // DFTRACER_UTILS_ENABLE_ARROW_IPC
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc